Working with Tasks

Complete guide to defining, configuring, and executing tasks in Flowrra.

Defining Tasks

Tasks are defined using the @app.task() decorator:

from flowrra import Flowrra

app = Flowrra.from_urls()

@app.task()
async def my_task(arg1: str, arg2: int):
    """Your task description."""
    # Task logic here
    return {"result": "success"}

Async vs Sync Tasks

Async Tasks (I/O-bound)

Use async def for I/O-bound operations:

@app.task()
async def fetch_api_data(url: str):
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        return response.json()

Sync Tasks (CPU-bound)

Use regular def with cpu_bound=True for CPU-intensive work:

@app.task(cpu_bound=True)
def process_large_file(file_path: str):
    # CPU-intensive operation
    with open(file_path) as f:
        data = f.read()
        # Process data
    return len(data)

Task Configuration

The @app.task() decorator accepts several configuration options:

@app.task(
    name="custom_task_name",      # Custom task identifier
    max_retries=3,                 # Number of retry attempts
    retry_delay=5.0,               # Initial retry delay in seconds
    priority=10,                   # Higher = executed first
    cpu_bound=False,               # Set to True for CPU-intensive tasks
    timeout=30.0                   # Task timeout in seconds
)
async def configured_task():
    pass

Retry Configuration

Tasks can retry automatically on failure with exponential backoff:

@app.task(max_retries=5, retry_delay=2.0)
async def may_fail():
    # Will retry with delays: 2s, 4s, 8s, 16s, 32s
    raise Exception("Temporary failure")

Submitting Tasks

Submit tasks for execution using app.submit():

async def main():
    async with app:
        # Submit task with arguments
        task_id = await app.submit(my_task, "arg1_value", 42)

        # Submit with keyword arguments
        task_id = await app.submit(
            my_task,
            arg1="value",
            arg2=42
        )

Retrieving Results

Wait for Task Completion

async with app:
    task_id = await app.submit(my_task, "test", 123)

    # Wait for result with timeout
    result = await app.wait_for_result(task_id, timeout=10.0)

    if result.success:
        print(f"Result: {result.result}")
    else:
        print(f"Error: {result.error}")

Check Task Status

# Get task status without waiting
result = await app.get_result(task_id)

if result:
    print(f"Status: {result.status}")
    print(f"Result: {result.result}")

Error Handling

Handle task failures gracefully:

@app.task(max_retries=3)
async def risky_operation():
    try:
        # Risky code
        pass
    except SpecificError as e:
        # Handle specific errors
        raise  # Re-raise to trigger retry
    except Exception as e:
        # Log and return error state
        return {"error": str(e)}

Task Priority

Control execution order with priorities:

# High priority task (executed first)
await app.submit(urgent_task, priority=100)

# Normal priority task
await app.submit(normal_task, priority=0)

# Low priority task (executed last)
await app.submit(background_task, priority=-10)

Best Practices

  1. Keep Tasks Idempotent - Tasks should be safe to retry

  2. Use Type Hints - Helps with IDE support and documentation

  3. Add Docstrings - Document what your task does

  4. Handle Errors - Don’t let exceptions crash the worker

  5. Set Timeouts - Prevent tasks from running forever

  6. Log Appropriately - Use logging for debugging

Examples

Email Sending Task

@app.task(max_retries=3, retry_delay=10.0)
async def send_email(to: str, subject: str, body: str):
    """Send email with retry logic."""
    async with aiosmtplib.SMTP() as smtp:
        await smtp.connect()
        await smtp.send_message(...)
        return {"sent": True, "to": to}

Data Processing Task

@app.task(cpu_bound=True, timeout=300.0)
def process_csv(file_path: str):
    """Process large CSV file."""
    import pandas as pd
    df = pd.read_csv(file_path)
    # Process data
    return {"rows_processed": len(df)}