Error Handling
The Absurd client includes comprehensive error handling for various scenarios. Understanding these error patterns is crucial for building robust applications.
AbsurdSleepError
The AbsurdSleepError is a special exception that signals when a run enters SLEEPING state to wait for an event. This is used by the orchestrator to free worker threads while tasks wait for events.
from absurd_client import AbsurdClient, AbsurdSleepError
client = AbsurdClient(queue_name="my_queue")
try:
payload = client.wait_for_event(
conn=conn,
run_id=run_id,
event_name="data_ready",
timeout_seconds=3600,
task_id=task_id,
step_name="waiting_step"
)
except AbsurdSleepError as e:
# The orchestrator should catch this and mark the run as sleeping
print(f"Run {e.run_id} sleeping, waiting for event '{e.event_name}'")
# In a real orchestrator, this would mark the run as sleeping
# and free the worker thread to process other tasks
# In the orchestrator, you would typically handle it like this:
def handle_task_processing():
try:
# Process task here
pass
except AbsurdSleepError as e:
# Mark the run as sleeping
client.set_run_sleeping(conn, e.run_id, e.event_name)
# Free worker thread - return from function without completing task
return
Database Errors
The client handles database errors appropriately:
from absurd_client import AbsurdClient
client = AbsurdClient(queue_name="my_queue")
try:
# Attempt to complete a task
client.complete_task(conn, run_id, result)
except Exception as e:
# Handle database errors or other issues
print(f"Failed to complete task: {e}")
# Handle appropriately based on your application needs
Validation Errors
Queue names are validated to prevent SQL injection:
try:
# This will raise ValueError if queue_name is invalid
client = AbsurdClient(queue_name="invalid-queue-name!") # Invalid character
except ValueError as e:
print(f"Invalid queue name: {e}")
# Valid queue names contain only letters, numbers, and underscores
# and must start with a letter or underscore
client = AbsurdClient(queue_name="valid_queue_name") # Valid
Event Wait Errors
When using wait_for_event, you must specify a timeout to prevent hung workflows:
from absurd_client import AbsurdClient
client = AbsurdClient(queue_name="my_queue")
try:
# This will raise ValueError because no timeout is specified
payload = client.wait_for_event(
conn=conn,
run_id=run_id,
event_name="data_ready",
# Missing timeout_seconds parameter
task_id=task_id,
step_name="waiting_step"
)
except ValueError as e:
print(f"Timeout required: {e}")
# Correct usage with timeout
try:
payload = client.wait_for_event(
conn=conn,
run_id=run_id,
event_name="data_ready",
timeout_seconds=3600, # 1 hour timeout
task_id=task_id,
step_name="waiting_step"
)
except TimeoutError:
print("Event not received within timeout period")
except AbsurdSleepError:
# Expected when waiting for events - orchestrator handles this
pass
Retry and Failure Handling
When tasks fail, you can specify detailed error information:
from absurd_client import AbsurdClient, spawn_retry_task
from datetime import datetime
client = AbsurdClient(queue_name="my_queue")
# Spawn a task with retry strategy
retry_strategy = {
"kind": "exponential",
"base_seconds": 30,
"factor": 2.0,
"max_seconds": 3600
}
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="reliable_task",
params={"data": "value"},
max_attempts=3, # Will attempt up to 3 times
retry_strategy=retry_strategy
)
# Fail a task with detailed error information
try:
# Process task and it fails
raise Exception("Processing failed")
except Exception as e:
client.fail_task(
conn=conn,
run_id=run_id,
reason={
"name": "ProcessingError",
"message": str(e),
"timestamp": datetime.now().isoformat(),
"details": {"input": "data_value", "step": "processing", "attempt": 1}
}
)
Connection Errors
Always ensure proper connection management:
import psycopg
from absurd_client import AbsurdClient
client = AbsurdClient(queue_name="my_queue")
# Use connection context managers to ensure proper cleanup
try:
with psycopg.connect("your_connection_string") as conn:
task_id, run_id, workflow_run_id = client.spawn_task(
conn=conn,
task_name="example_task",
params={"data": "value"}
)
except psycopg.OperationalError as e:
print(f"Database connection error: {e}")
except psycopg.Error as e:
print(f"PostgreSQL error: {e}")
Exception Handling in Task Processing
Properly handle exceptions when processing tasks:
from absurd_client import AbsurdClient, AbsurdSleepError
import logging
client = AbsurdClient(queue_name="error_handling_queue")
logger = logging.getLogger(__name__)
def process_task_with_error_handling(conn, run_id, task_id, task_name, params):
try:
# Process the task
result = execute_task_logic(task_name, params)
client.complete_task(conn, run_id, result)
logger.info(f"Task {task_id} completed successfully")
except AbsurdSleepError as e:
# Handle sleep error - this is expected for event-driven tasks
client.set_run_sleeping(conn, e.run_id, e.event_name)
logger.info(f"Task {task_id} sleeping, waiting for event {e.event_name}")
except Exception as e:
# Handle any other errors by failing the task
error_info = {
"name": type(e).__name__,
"message": str(e),
"timestamp": datetime.now().isoformat(),
"traceback": str(e.__traceback__) if e.__traceback__ else None,
"task_id": task_id,
"run_id": run_id
}
client.fail_task(conn, run_id, error_info)
logger.error(f"Task {task_id} failed: {error_info}")
Advanced Error Handling Patterns
For complex workflows, consider implementing sophisticated error handling:
def process_with_retry_logic(conn, run_id, task_id, task_name, params):
try:
# Process the task with custom business logic
result = custom_task_processor(task_name, params)
client.complete_task(conn, run_id, result)
except AbsurdSleepError:
# Handle event wait (expected behavior)
client.set_run_sleeping(conn, run_id, "expected_event")
except TemporaryError as e:
# For temporary errors, fail with retry
client.fail_task(
conn=conn,
run_id=run_id,
reason={
"name": "TemporaryError",
"message": str(e),
"is_retryable": True,
"timestamp": datetime.now().isoformat()
}
)
except FatalError as e:
# For fatal errors, fail without retry
client.fail_task(
conn=conn,
run_id=run_id,
reason={
"name": "FatalError",
"message": str(e),
"is_retryable": False,
"timestamp": datetime.now().isoformat()
}
)
except Exception as e:
# For unexpected errors, log and fail
logger.error(f"Unexpected error processing task {task_id}: {e}", exc_info=True)
client.fail_task(
conn=conn,
run_id=run_id,
reason={
"name": "UnexpectedError",
"message": str(e),
"is_retryable": True, # Retry for unknown errors
"timestamp": datetime.now().isoformat()
}
)
Best Practices
Always specify timeouts when using
wait_for_eventUse try-catch blocks around all client operations
Handle
AbsurdSleepErrorappropriately in your orchestratorValidate inputs before passing to client methods
Use connection pooling for production applications
Implement proper logging for error diagnosis
Test error handling paths in your application
Use detailed error messages that help with debugging
Distinguish between retryable and non-retryable errors
Implement circuit breaker patterns for external dependencies