Quick Start

This guide will help you get started with Flowrra in minutes.

Basic Usage

Here’s a simple example of creating and running tasks:

import asyncio
from flowrra import Flowrra

# Create Flowrra application
app = Flowrra.from_urls()

# Register tasks
@app.task()
async def send_email(to: str, subject: str):
    """Send an email asynchronously."""
    await asyncio.sleep(0.1)  # Simulate email sending
    print(f"Email sent to {to}: {subject}")
    return {"status": "sent", "to": to}

# Execute tasks
async def main():
    async with app:
        # Submit a task
        task_id = await app.submit(send_email, "user@example.com", "Hello")

        # Wait for result
        result = await app.wait_for_result(task_id, timeout=10.0)
        print(f"Result: {result.result}")

asyncio.run(main())

Default Configuration

When you create a Flowrra application using Flowrra.from_urls() without any parameters, it uses the following defaults:

Task Queue (Broker)

  • In-memory queue (asyncio.PriorityQueue) by default

  • Tasks are stored in memory and will be lost if the application restarts

  • Best for development, testing, and single-process applications

  • For production, Redis broker is strongly recommended for persistence and distributed workers

Result Storage (Backend)

  • InMemoryBackend for I/O-bound tasks

  • Results are stored in memory

  • For CPU-bound tasks, a persistent backend (Redis, PostgreSQL, or MySQL) is required for cross-process result storage

  • Note: CPU tasks can still use the in-memory broker/queue

Executor Configuration

  • I/O Workers: 4 concurrent workers

  • CPU Workers: Defaults to the number of CPU cores (e.g., 8 on an 8-core system)

  • Max Queue Size: 1000 tasks

  • Max Retries: 3 attempts per task

  • Retry Delay: 1.0 second between retries

Example with explicit configuration:

from flowrra import Flowrra, Config, ExecutorConfig

import os

# Explicitly specify the same defaults
config = Config(
    executor=ExecutorConfig(
        io_workers=4,                      # 4 concurrent I/O tasks
        cpu_workers=os.cpu_count(),        # Number of CPU cores
        max_queue_size=1000,               # Queue up to 1000 tasks
        max_retries=3,                     # Retry failed tasks 3 times
        retry_delay=1.0                    # Wait 1 second between retries
    )
)
app = Flowrra(config=config)

For production deployments or distributed workers, use Redis for both broker and backend:

# Both broker and backend using Redis (recommended for production)
app = Flowrra.from_urls(
    broker='redis://localhost:6379/0',   # Persistent queue, supports distributed workers
    backend='redis://localhost:6379/1'   # Result storage
)

# Or just backend for CPU tasks in single-instance deployment
app = Flowrra.from_urls(
    backend='redis://localhost:6379/1'   # Backend only (broker defaults to in-memory)
)

Task Configuration

Configure retry behavior and other options:

@app.task(max_retries=5, retry_delay=2.0, priority=10)
async def fetch_data(url: str):
    """Fetch data with retries and custom priority."""
    # Your HTTP request logic here
    return {"data": "..."}

CPU-Bound Tasks

For CPU-intensive operations, use the cpu_bound parameter.

Important

CPU-bound tasks must be synchronous functions (not async). They run in separate processes using ProcessPoolExecutor and require a persistent backend (Redis, PostgreSQL, or MySQL) for cross-process result storage. The broker/queue can remain in-memory.

@app.task(cpu_bound=True)
def compute_heavy_operation(n: int):  # Note: NOT async
    """CPU-intensive task using ProcessPoolExecutor."""
    result = sum(i * i for i in range(n))
    return result

async def main():
    # CPU tasks require a backend
    app = Flowrra.from_urls(backend='redis://localhost:6379/1')

    async with app:
        task_id = await app.submit(compute_heavy_operation, 1000000)
        result = await app.wait_for_result(task_id)
        print(f"Result: {result.result}")

Task Scheduling

Schedule tasks to run at specific times or intervals:

from flowrra import Flowrra
import asyncio

app = Flowrra.from_urls()

# Create scheduler (automatically integrated with app)
scheduler = app.create_scheduler()

@app.task()
async def cleanup_old_data():
    """Clean up old data."""
    print("Cleaning up...")
    return {"cleaned": 100}

async def main():
    async with app:  # Automatically starts scheduler
        # Schedule with interval (every 30 minutes)
        await scheduler.schedule_interval(
            task_name="cleanup_old_data",
            interval=1800,  # seconds
            description="Every 30 minutes"
        )

        # Hourly at minute 0 (top of every hour)
        await scheduler.schedule_cron(
            task_name="cleanup_old_data",
            cron="0 * * * *",
            description="Hourly at :00"
        )

        # Daily at 2:30 AM
        await scheduler.schedule_cron(
            task_name="cleanup_old_data",
            cron="30 2 * * *",
            description="Daily at 2:30 AM"
        )

        # Weekly on Monday at 9:00 AM
        await scheduler.schedule_cron(
            task_name="cleanup_old_data",
            cron="0 9 * * 1",
            description="Weekly on Monday"
        )

        # Monthly on the 1st at midnight
        await scheduler.schedule_cron(
            task_name="cleanup_old_data",
            cron="0 0 1 * *",
            description="Monthly on 1st"
        )

        # Every weekday (Mon-Fri) at 6:00 PM
        await scheduler.schedule_cron(
            task_name="cleanup_old_data",
            cron="0 18 * * 1-5",
            description="Weekdays at 6 PM"
        )

        # Keep running to execute scheduled tasks
        await asyncio.Event().wait()

asyncio.run(main())

Next Steps