Quick Start
This guide will help you get started with the Python Absurd client quickly. It covers the basic usage patterns and common operations.
Installation
First, install the client:
pip install absurd
Prerequisites
Before installing and using the Python Absurd client, you’ll need:
Python 3.8 or higher
PostgreSQL with the Absurd extension installed
Psycopg3 library for PostgreSQL connectivity
Basic Usage
Here’s a comprehensive example that demonstrates the core functionality:
import psycopg
from absurd_client import AbsurdClient
# Create a client instance
client = AbsurdClient(queue_name="my_queue")
# Connect to your PostgreSQL database
with psycopg.connect("your_connection_string") as conn:
# Create the queue (optional - done automatically when spawning tasks)
client.create_queue(conn)
# Spawn a new task with comprehensive options
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="process_data",
params={"input": "data", "options": {"verbose": True}},
headers={"priority": "high", "region": "us-east-1"},
retry_strategy={
"kind": "exponential",
"base_seconds": 30,
"factor": 2.0,
"max_seconds": 3600
},
max_attempts=3,
cancellation={
"max_delay": 3600, # Max 1 hour delay before starting
"max_duration": 7200 # Max 2 hours total execution time
}
)
print(f"Spawned task: {task_id}, run: {run_id}, workflow run: {workflow_run_id}")
# Claim and process tasks
claimed_tasks = client.claim_task(conn, qty=1, claim_timeout=60)
for task_data in claimed_tasks:
run_id, task_id, attempt, task_name, params, retry_strategy, max_attempts, headers, *_ = task_data
try:
# Process the task
result = process_task(task_name, params)
# Mark as completed
client.complete_task(conn, run_id, result)
print(f"Task {task_id} completed successfully")
except Exception as e:
# Mark as failed
client.fail_task(conn, run_id, str(e))
print(f"Task {task_id} failed: {e}")
Connection Management
For production applications, use connection pooling for better resource management and performance:
from psycopg_pool import ConnectionPool
from absurd_client import AbsurdClient
# Create a connection pool
pool = ConnectionPool("your_connection_string", min_size=2, max_size=10)
client = AbsurdClient(queue_name="my_queue")
# Use connection from pool
with pool.connection() as conn:
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="example_task",
params={"data": "value"}
)
Advanced Task Spawning
Spawn tasks with retry strategies using convenience functions:
from absurd_client import spawn_retry_task
# Using convenience function for retry tasks
task_id, run_id, workflow_run_id = spawn_retry_task(
client=client,
conn=conn,
task_name="reliable_task",
params={"data": "value"},
max_attempts=5,
retry_kind="exponential",
base_seconds=10,
factor=2.0,
max_seconds=3600 # Max 1 hour between retries
)
Or spawn with cancellation rules:
from absurd_client import spawn_cancellable_task
task_id, run_id, workflow_run_id = spawn_cancellable_task(
client=client,
conn=conn,
task_name="long_running_task",
params={"data": "value"},
max_delay_seconds=3600, # Max 1 hour of delay before starting
max_duration_seconds=7200 # Max 2 hours total execution time
)
Event-Driven Workflows
Use events for cross-workflow coordination:
from absurd_client import AbsurdClient, AbsurdSleepError
client = AbsurdClient(queue_name="event_queue")
# Emit an event to signal completion
client.emit_event(
conn=conn,
event_name="data_ready",
payload={"processed_data": "result", "timestamp": "2025-01-01T00:00:00Z"}
)
# Wait for an event (in a task) - this requires error handling
try:
payload = client.wait_for_event(
conn=conn,
run_id=run_id,
event_name="data_ready",
timeout_seconds=3600, # 1 hour
task_id=task_id,
step_name="waiting_step"
)
print(f"Received event payload: {payload}")
except TimeoutError:
print("Event not received within timeout")
except AbsurdSleepError:
# This exception is expected when using wait_for_event
# The orchestrator will handle it and put the task to sleep
pass
Checkpoint Management
For long-running tasks, use checkpoints to save state and resume:
# Set a checkpoint for state persistence
client.set_checkpoint(
conn=conn,
task_id=task_id,
step_name="progress",
state={"current_step": 5, "total_steps": 10, "data": "intermediate_result"},
owner_run=run_id,
extend_claim_by=60 # Extend claim by 60 seconds
)
# Retrieve a checkpoint
checkpoint = client.get_checkpoint(conn, task_id, "progress")
if checkpoint:
print(f"Retrieved checkpoint: {checkpoint}")
Workflow Tracking
Track complex workflows across multiple tasks:
from absurd_client import AbsurdClient
from datetime import datetime
client = AbsurdClient(queue_name="workflow_queue")
# Create a workflow run to track execution
workflow_run_id = client.create_workflow_run(
conn=conn,
workflow_name="data_pipeline",
workflow_version="1.0.0",
inputs={"source": "s3://bucket/data", "target": "warehouse"},
created_by="pipeline_system",
tags={"environment": "production", "priority": "high"}
)
# Spawn the first task in the workflow
task_id, run_id, _ = client.spawn_task(
conn=conn,
task_name="extract_data",
params={"workflow_run_id": workflow_run_id} # Link to workflow
)
# Update workflow status
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="running",
started_at=datetime.now()
)
# Mark workflow as completed when finished
client.update_workflow_run_status(
conn=conn,
workflow_run_id=workflow_run_id,
status="completed",
result={"output": "final_data"},
completed_at=datetime.now()
)
Singleton Pattern
Use the singleton client instance for consistent configuration across your application:
from absurd_client import get_absurd_client
# Get the shared client instance
client = get_absurd_client(queue_name="my_queue")
# Use the client throughout your application
# It maintains consistent configuration across the application
with psycopg.connect("your_connection_string") as conn:
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="example_task",
params={"data": "value"}
)