Workflow Tracking
The Absurd client provides comprehensive workflow tracking capabilities that allow you to monitor and manage complex workflow executions. This feature was introduced in Phase 10 of the development.
Overview
Workflow tracking enables you to:
Create and monitor workflow runs
Track workflow execution status across multiple tasks
Store workflow inputs, outputs, and metadata
Link multiple tasks to a single workflow run
Monitor workflow progress and completion
Support for complex workflow patterns with error handling
Creating Workflow Runs
Use the create_workflow_run method to create a new workflow tracking record:
from absurd_client import AbsurdClient
import uuid
from datetime import datetime
client = AbsurdClient(queue_name="workflow_queue")
# Create a new workflow run
workflow_run_id = client.create_workflow_run(
conn=conn,
workflow_name="data_pipeline", # Must match ^[a-z][a-z0-9_]*$ and not contain '__'
workflow_version="1.0.0", # Must match ^[a-zA-Z0-9._-]+$ and not contain '__'
inputs={"source": "s3://bucket/data", "target": "warehouse"},
created_by="pipeline_system",
tags={"environment": "production", "priority": "high"},
workflow_hash="sha256_hash_of_definition"
)
print(f"Created workflow run: {workflow_run_id}")
Parameters for Workflow Creation
conn: Database connection (transaction-aware)workflow_name: Logical workflow name (must match^[a-z][a-z0-9_]*$, no ‘__’)workflow_version: Workflow version (must match^[a-zA-Z0-9._-]+$, no ‘__’)inputs: Workflow input parameters (optional)absurd_run_id: Optional root Absurd run_id (optional)created_by: Optional user/system identifier (optional)tags: Optional key-value tags for filtering (optional)workflow_hash: Optional SHA-256 hash of workflow definition (optional)
Note
Both workflow_name and workflow_version must not contain double underscores (‘__’)
as this is reserved as a separator for display purposes: {workflow_name}__{version}__{step_name}
Workflow names must match: ^[a-z][a-z0-9_]*$ (lowercase, alphanumeric, single underscore)
Workflow versions must match: ^[a-zA-Z0-9._-]+$ (semver compatible)
Updating Workflow Status
Use the update_workflow_run_status method to update the status of a workflow run:
from datetime import datetime
# Update workflow status to running
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="running",
started_at=datetime.now()
)
# Update workflow status to completed
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="completed",
result={"output": "final_result", "metrics": {"processed_records": 1000, "duration": "5min"}},
completed_at=datetime.now(),
task_count=10 # Number of tasks in the workflow
)
# Update workflow status to failed with detailed error information
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="failed",
error={
"type": "ProcessingError",
"message": "Data source unavailable",
"details": {
"source_url": "s3://bucket/data",
"attempted_at": datetime.now().isoformat(),
"retries_attempted": 3
},
"timestamp": datetime.now().isoformat()
}
)
Parameters for Status Updates
conn: Database connectionworkflow_run_id: UUID of workflow_run to updatestatus: New status (pending, running, completed, failed, cancelled)result: Optional final result (optional)error: Optional error details (optional)started_at: Optional start timestamp (optional)completed_at: Optional completion timestamp (optional)task_count: Optional task count (optional)
Status Values
pending: Workflow has been created but not yet startedrunning: Workflow is currently executingcompleted: Workflow has completed successfullyfailed: Workflow has failedcancelled: Workflow has been cancelled
Integration with Tasks
Link tasks to workflow runs during task creation:
# Spawn a task that's part of a workflow
task_id, run_id, actual_workflow_run_id = client.spawn_task(
conn=conn,
task_name="extract_data",
params={"workflow_run_id": workflow_run_id}, # Link to workflow in parameters
headers={"workflow_run_id": str(workflow_run_id)} # Also store in headers
)
This creates a connection between the Absurd task and the workflow run, allowing you to track the relationship between individual tasks and the overall workflow.
Full Workflow Execution Example
Here’s a complete example of a workflow using tracking:
from absurd_client import AbsurdClient
from datetime import datetime
def execute_data_pipeline():
client = AbsurdClient(queue_name="pipeline_queue")
with psycopg.connect("your_connection_string") as conn:
# Create workflow run
workflow_run_id = client.create_workflow_run(
conn=conn,
workflow_name="data_pipeline",
workflow_version="1.0.0",
inputs={"source": "s3://bucket/data", "target": "warehouse"},
created_by="etl_system",
tags={"environment": "production", "team": "data-eng"}
)
# Update status to running
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="running",
started_at=datetime.now()
)
try:
# Spawn extraction task
extract_task_id, extract_run_id, _ = client.spawn_task(
conn=conn,
task_name="extract_data",
params={"workflow_run_id": workflow_run_id, "source": "s3://bucket/data"},
headers={"workflow_run_id": str(workflow_run_id)}
)
# Process the extraction task
# ... task processing logic ...
# Spawn transformation task after extraction completes
transform_task_id, transform_run_id, _ = client.spawn_task(
conn=conn,
task_name="transform_data",
params={"workflow_run_id": workflow_run_id, "input_format": "raw"},
headers={"workflow_run_id": str(workflow_run_id)}
)
# Process the transformation task
# ... task processing logic ...
# Update workflow status to completed
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="completed",
result={"output": "warehouse", "records_processed": 10000},
completed_at=datetime.now(),
task_count=2
)
except Exception as e:
# Update workflow status to failed
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="failed",
error={
"type": type(e).__name__,
"message": str(e),
"timestamp": datetime.now().isoformat()
}
)
raise
Using with Highway DSL
The workflow tracking features integrate well with the highway-dsl package, which provides a Python-based domain-specific language for defining complex workflows:
from highway_dsl import WorkflowBuilder, RetryPolicy
from datetime import timedelta
# Create a complex ETL workflow with Highway DSL
workflow = (
WorkflowBuilder("data_pipeline")
.set_schedule("0 2 * * *") # Daily at 2 AM
.set_start_date(datetime.now())
.add_tags("etl", "production")
.set_max_active_runs(1)
.set_default_retry_policy(RetryPolicy(
max_retries=3,
delay=timedelta(seconds=30),
backoff_factor=2.0
))
.task("extract", "etl.extract_data", result_key="raw_data")
.task("transform", "etl.transform_data", args=["{{raw_data}}"], result_key="transformed_data")
.parallel("process_branches", {
"branch_a": lambda w: w.task("process_a", "etl.process_part_a", args=["{{transformed_data}}"]),
"branch_b": lambda w: w.task("process_b", "etl.process_part_b", args=["{{transformed_data}}"])
})
.task("load", "etl.load_data", args=["{{transformed_data}}"])
.build()
)
# Export to YAML for use with Absurd
print(workflow.to_yaml())
# This workflow can be executed using the Absurd client
# with proper workflow run tracking