Working with Tasks
Complete guide to defining, configuring, and executing tasks in Flowrra.
Defining Tasks
Tasks are defined using the @app.task() decorator:
from flowrra import Flowrra
app = Flowrra.from_urls()
@app.task()
async def my_task(arg1: str, arg2: int):
"""Your task description."""
# Task logic here
return {"result": "success"}
Async vs Sync Tasks
Async Tasks (I/O-bound)
Use async def for I/O-bound operations:
@app.task()
async def fetch_api_data(url: str):
async with httpx.AsyncClient() as client:
response = await client.get(url)
return response.json()
Sync Tasks (CPU-bound)
Use regular def with cpu_bound=True for CPU-intensive work:
@app.task(cpu_bound=True)
def process_large_file(file_path: str):
# CPU-intensive operation
with open(file_path) as f:
data = f.read()
# Process data
return len(data)
Task Configuration
The @app.task() decorator accepts several configuration options:
@app.task(
name="custom_task_name", # Custom task identifier
max_retries=3, # Number of retry attempts
retry_delay=5.0, # Initial retry delay in seconds
priority=10, # Higher = executed first
cpu_bound=False, # Set to True for CPU-intensive tasks
timeout=30.0 # Task timeout in seconds
)
async def configured_task():
pass
Retry Configuration
Tasks can retry automatically on failure with exponential backoff:
@app.task(max_retries=5, retry_delay=2.0)
async def may_fail():
# Will retry with delays: 2s, 4s, 8s, 16s, 32s
raise Exception("Temporary failure")
Submitting Tasks
Submit tasks for execution using app.submit():
async def main():
async with app:
# Submit task with arguments
task_id = await app.submit(my_task, "arg1_value", 42)
# Submit with keyword arguments
task_id = await app.submit(
my_task,
arg1="value",
arg2=42
)
Retrieving Results
Wait for Task Completion
async with app:
task_id = await app.submit(my_task, "test", 123)
# Wait for result with timeout
result = await app.wait_for_result(task_id, timeout=10.0)
if result.success:
print(f"Result: {result.result}")
else:
print(f"Error: {result.error}")
Check Task Status
# Get task status without waiting
result = await app.get_result(task_id)
if result:
print(f"Status: {result.status}")
print(f"Result: {result.result}")
Error Handling
Handle task failures gracefully:
@app.task(max_retries=3)
async def risky_operation():
try:
# Risky code
pass
except SpecificError as e:
# Handle specific errors
raise # Re-raise to trigger retry
except Exception as e:
# Log and return error state
return {"error": str(e)}
Task Priority
Control execution order with priorities:
# High priority task (executed first)
await app.submit(urgent_task, priority=100)
# Normal priority task
await app.submit(normal_task, priority=0)
# Low priority task (executed last)
await app.submit(background_task, priority=-10)
Best Practices
Keep Tasks Idempotent - Tasks should be safe to retry
Use Type Hints - Helps with IDE support and documentation
Add Docstrings - Document what your task does
Handle Errors - Don’t let exceptions crash the worker
Set Timeouts - Prevent tasks from running forever
Log Appropriately - Use logging for debugging
Examples
Email Sending Task
@app.task(max_retries=3, retry_delay=10.0)
async def send_email(to: str, subject: str, body: str):
"""Send email with retry logic."""
async with aiosmtplib.SMTP() as smtp:
await smtp.connect()
await smtp.send_message(...)
return {"sent": True, "to": to}
Data Processing Task
@app.task(cpu_bound=True, timeout=300.0)
def process_csv(file_path: str):
"""Process large CSV file."""
import pandas as pd
df = pd.read_csv(file_path)
# Process data
return {"rows_processed": len(df)}