Usage Examples
This section provides comprehensive examples showing how to use the Absurd client for various scenarios.
Basic Task Processing
A simple example of spawning and processing tasks:
import psycopg
from absurd_client import AbsurdClient
def process_task(task_name, params):
"""Simulate task processing"""
if task_name == "echo":
return {"output": f"Processed: {params.get('message', '')}"}
return {"result": "unknown"}
# Create client
client = AbsurdClient(queue_name="default_queue")
# Connect to database
with psycopg.connect("your_connection_string") as conn:
# Spawn a task
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="echo",
params={"message": "Hello, World!"}
)
print(f"Spawned task: {task_id}")
# Claim and process tasks
claimed_tasks = client.claim_task(conn, qty=1)
for task_data in claimed_tasks:
run_id, task_id, attempt, task_name, params, *_ = task_data
try:
result = process_task(task_name, params)
client.complete_task(conn, run_id, result)
print(f"Completed task {task_id}")
except Exception as e:
client.fail_task(conn, run_id, str(e))
print(f"Failed task {task_id}: {e}")
Long-Running Tasks with Checkpoints
For tasks that run for extended periods, use checkpoints:
import time
from absurd_client import AbsurdClient
client = AbsurdClient(queue_name="long_running_queue")
def long_running_task(conn, run_id, task_id):
"""Simulate a long-running task with checkpoints"""
# Check if we have a checkpoint from a previous attempt
checkpoint = client.get_checkpoint(conn, task_id, "progress")
start_step = 1
if checkpoint and checkpoint.get("state"):
start_step = checkpoint["state"].get("current_step", 1)
for step in range(start_step, 6): # 5 steps total
# Simulate work
time.sleep(2) # Simulate processing
# Save checkpoint
client.set_checkpoint(
conn=conn,
task_id=task_id,
step_name="progress",
state={"current_step": step, "completed": f"step_{step}"},
owner_run=run_id
)
# Extend claim if needed for long operations
client.extend_claim(conn, run_id, extend_by_seconds=60)
return {"status": "completed", "steps": 5}
with psycopg.connect("your_connection_string") as conn:
# Spawn the long-running task
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="long_running_task",
params={"work": "complex_job"}
)
# Claim and process
claimed_tasks = client.claim_task(conn)
for task_data in claimed_tasks:
run_id, task_id, *_ = task_data
try:
result = long_running_task(conn, run_id, task_id)
client.complete_task(conn, run_id, result)
except Exception as e:
client.fail_task(conn, run_id, str(e))
Event-Driven Workflows
Use events to coordinate between tasks:
from absurd_client import AbsurdClient, AbsurdSleepError
import datetime
client = AbsurdClient(queue_name="event_driven_queue")
def task_that_waits_for_event(conn, run_id, task_id):
"""A task that waits for an event"""
try:
payload = client.wait_for_event(
conn=conn,
run_id=run_id,
event_name="data_ready",
timeout_seconds=3600, # 1 hour
task_id=task_id,
step_name="waiting_for_data"
)
return {"status": "received", "payload": payload}
except AbsurdSleepError:
# This is expected - the orchestrator should handle this
# by marking the task as sleeping
raise
def task_that_emits_event(conn):
"""A task that processes data and emits an event"""
# Simulate data processing
result = {"processed_data": "some_value"}
# Emit the event that other tasks are waiting for
client.emit_event(
conn=conn,
event_name="data_ready",
payload=result
)
return {"status": "event_emitted", "data": result}
# In a real orchestrator, you would handle AbsurdSleepError like this:
def process_task_with_event_handling(conn, run_id, task_id):
try:
return task_that_waits_for_event(conn, run_id, task_id)
except AbsurdSleepError as e:
# Mark the run as sleeping until the event occurs
client.set_run_sleeping(conn, e.run_id, e.event_name)
# Return without completing the task - it will resume when event occurs
return None
Retry Strategies
Handle failures with retry strategies:
from absurd_client import AbsurdClient, spawn_retry_task
client = AbsurdClient(queue_name="retry_queue")
def unreliable_task():
"""A task that sometimes fails"""
import random
if random.random() < 0.7: # 70% chance of failure
raise Exception("Random failure for demonstration")
return {"status": "success"}
with psycopg.connect("your_connection_string") as conn:
# Spawn with retry strategy
task_id, run_id, workflow_run_id = spawn_retry_task(
client=client,
conn=conn,
task_name="unreliable_task",
params={},
max_attempts=5,
retry_kind="exponential",
base_seconds=10,
factor=2.0
)
# Process the task
claimed_tasks = client.claim_task(conn)
for task_data in claimed_tasks:
run_id, task_id, *_ = task_data
try:
result = unreliable_task()
client.complete_task(conn, run_id, result)
except Exception as e:
# The retry logic is handled by Absurd
client.fail_task(conn, run_id, str(e))
Scheduling and Delayed Execution
Schedule tasks for future execution:
from absurd_client import AbsurdClient
from datetime import datetime, timedelta
client = AbsurdClient(queue_name="scheduled_queue")
with psycopg.connect("your_connection_string") as conn:
# Schedule a task to run in 10 minutes
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="delayed_task",
params={"message": "This will run later"}
)
# Schedule the task to run 10 minutes from now
future_time = datetime.now() + timedelta(minutes=10)
client.schedule_task(conn, run_id, future_time)
print(f"Task {task_id} scheduled to run at {future_time}")
Connection Pooling Example
For production applications, use connection pooling:
from psycopg_pool import ConnectionPool
from absurd_client import AbsurdClient
import psycopg
# Create a connection pool
pool = ConnectionPool("your_connection_string", min_size=2, max_size=10)
client = AbsurdClient(queue_name="pooled_queue")
def process_task_with_pooling():
with pool.connection() as conn:
# Spawn a task
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="pooled_task",
params={"data": "value"}
)
# Process any claimed tasks
claimed_tasks = client.claim_task(conn, qty=5)
for task_data in claimed_tasks:
run_id, task_id, attempt, task_name, params, *_ = task_data
try:
result = {"processed": params}
client.complete_task(conn, run_id, result)
except Exception as e:
client.fail_task(conn, run_id, str(e))
Cancellable Tasks
Create tasks with cancellation rules:
from absurd_client import AbsurdClient, spawn_cancellable_task
client = AbsurdClient(queue_name="cancellable_queue")
with psycopg.connect("your_connection_string") as conn:
# Spawn a task that can be cancelled if it takes too long
task_id, run_id, workflow_run_id = spawn_cancellable_task(
client=client,
conn=conn,
task_name="potentially_long_task",
params={"data": "value"},
max_delay_seconds=3600, # Max 1 hour of delay
max_duration_seconds=7200 # Max 2 hours total duration
)
# Later, you can cancel the task if needed
# client.cancel_task(conn, run_id)
Workflow Tracking Example
Track complex workflows across multiple tasks:
from absurd_client import AbsurdClient
import uuid
client = AbsurdClient(queue_name="workflow_tracking_queue")
def run_data_pipeline():
with psycopg.connect("your_connection_string") as conn:
# Create a workflow run to track this pipeline
workflow_run_id = client.create_workflow_run(
conn=conn,
workflow_name="data_pipeline",
workflow_version="1.0.0",
inputs={"source": "s3://bucket/data", "target": "warehouse"},
created_by="pipeline_system"
)
# Update workflow status
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="running",
started_at=datetime.now()
)
# Spawn the extraction task
extract_task_id, extract_run_id, _ = client.spawn_task(
conn=conn,
task_name="extract_data",
params={"workflow_run_id": workflow_run_id, "source": "s3://bucket/data"},
headers={"workflow_run_id": str(workflow_run_id)}
)
# In a real scenario, subsequent tasks would be spawned after previous ones complete
# For this example, we'll just mark the workflow as completed
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="completed",
result={"output": "processed_data"},
completed_at=datetime.now(),
task_count=3 # Assuming 3 total tasks in the workflow
)
run_data_pipeline()