Quick Start
This guide will help you get started with Flowrra in minutes.
Basic Usage
Here’s a simple example of creating and running tasks:
import asyncio
from flowrra import Flowrra
# Create Flowrra application
app = Flowrra.from_urls()
# Register tasks
@app.task()
async def send_email(to: str, subject: str):
"""Send an email asynchronously."""
await asyncio.sleep(0.1) # Simulate email sending
print(f"Email sent to {to}: {subject}")
return {"status": "sent", "to": to}
# Execute tasks
async def main():
async with app:
# Submit a task
task_id = await app.submit(send_email, "user@example.com", "Hello")
# Wait for result
result = await app.wait_for_result(task_id, timeout=10.0)
print(f"Result: {result.result}")
asyncio.run(main())
Default Configuration
When you create a Flowrra application using Flowrra.from_urls() without any parameters,
it uses the following defaults:
Task Queue (Broker)
In-memory queue (
asyncio.PriorityQueue) by defaultTasks are stored in memory and will be lost if the application restarts
Best for development, testing, and single-process applications
For production, Redis broker is strongly recommended for persistence and distributed workers
Result Storage (Backend)
InMemoryBackend for I/O-bound tasks
Results are stored in memory
For CPU-bound tasks, a persistent backend (Redis, PostgreSQL, or MySQL) is required for cross-process result storage
Note: CPU tasks can still use the in-memory broker/queue
Executor Configuration
I/O Workers: 4 concurrent workers
CPU Workers: Defaults to the number of CPU cores (e.g., 8 on an 8-core system)
Max Queue Size: 1000 tasks
Max Retries: 3 attempts per task
Retry Delay: 1.0 second between retries
Example with explicit configuration:
from flowrra import Flowrra, Config, ExecutorConfig
import os
# Explicitly specify the same defaults
config = Config(
executor=ExecutorConfig(
io_workers=4, # 4 concurrent I/O tasks
cpu_workers=os.cpu_count(), # Number of CPU cores
max_queue_size=1000, # Queue up to 1000 tasks
max_retries=3, # Retry failed tasks 3 times
retry_delay=1.0 # Wait 1 second between retries
)
)
app = Flowrra(config=config)
For production deployments or distributed workers, use Redis for both broker and backend:
# Both broker and backend using Redis (recommended for production)
app = Flowrra.from_urls(
broker='redis://localhost:6379/0', # Persistent queue, supports distributed workers
backend='redis://localhost:6379/1' # Result storage
)
# Or just backend for CPU tasks in single-instance deployment
app = Flowrra.from_urls(
backend='redis://localhost:6379/1' # Backend only (broker defaults to in-memory)
)
Task Configuration
Configure retry behavior and other options:
@app.task(max_retries=5, retry_delay=2.0, priority=10)
async def fetch_data(url: str):
"""Fetch data with retries and custom priority."""
# Your HTTP request logic here
return {"data": "..."}
CPU-Bound Tasks
For CPU-intensive operations, use the cpu_bound parameter.
Important
CPU-bound tasks must be synchronous functions (not async). They run in separate processes
using ProcessPoolExecutor and require a persistent backend (Redis, PostgreSQL, or MySQL)
for cross-process result storage. The broker/queue can remain in-memory.
@app.task(cpu_bound=True)
def compute_heavy_operation(n: int): # Note: NOT async
"""CPU-intensive task using ProcessPoolExecutor."""
result = sum(i * i for i in range(n))
return result
async def main():
# CPU tasks require a backend
app = Flowrra.from_urls(backend='redis://localhost:6379/1')
async with app:
task_id = await app.submit(compute_heavy_operation, 1000000)
result = await app.wait_for_result(task_id)
print(f"Result: {result.result}")
Task Scheduling
Schedule tasks to run at specific times or intervals:
from flowrra import Flowrra
import asyncio
app = Flowrra.from_urls()
# Create scheduler (automatically integrated with app)
scheduler = app.create_scheduler()
@app.task()
async def cleanup_old_data():
"""Clean up old data."""
print("Cleaning up...")
return {"cleaned": 100}
async def main():
async with app: # Automatically starts scheduler
# Schedule with interval (every 30 minutes)
await scheduler.schedule_interval(
task_name="cleanup_old_data",
interval=1800, # seconds
description="Every 30 minutes"
)
# Hourly at minute 0 (top of every hour)
await scheduler.schedule_cron(
task_name="cleanup_old_data",
cron="0 * * * *",
description="Hourly at :00"
)
# Daily at 2:30 AM
await scheduler.schedule_cron(
task_name="cleanup_old_data",
cron="30 2 * * *",
description="Daily at 2:30 AM"
)
# Weekly on Monday at 9:00 AM
await scheduler.schedule_cron(
task_name="cleanup_old_data",
cron="0 9 * * 1",
description="Weekly on Monday"
)
# Monthly on the 1st at midnight
await scheduler.schedule_cron(
task_name="cleanup_old_data",
cron="0 0 1 * *",
description="Monthly on 1st"
)
# Every weekday (Mon-Fri) at 6:00 PM
await scheduler.schedule_cron(
task_name="cleanup_old_data",
cron="0 18 * * 1-5",
description="Weekdays at 6 PM"
)
# Keep running to execute scheduled tasks
await asyncio.Event().wait()
asyncio.run(main())
Next Steps
Learn more about Working with Tasks
Set up Task Scheduling
Explore Web UI
Check out Result Backends for production setups