Flowrra Documentation
Lightweight async task queue built on pure Python asyncio. A Celery-inspired background job executor with zero dependencies, featuring retries, priority queues, and pluggable backends.
Getting Started
API Reference
Additional Resources
Features
Zero Dependencies - Built on pure Python 3.11+ asyncio (only aiosqlite required for scheduler)
Simple API - Decorator-based task registration, just like Celery
Async First - Native async/await support for I/O-bound tasks
CPU-Bound Support - ProcessPoolExecutor integration for compute-heavy tasks
Task Scheduling - Celery Beat-like persistent scheduling with cron, intervals, and one-time tasks
Web UI - Built-in monitoring dashboard for FastAPI, Flask/Quart, and Django
Automatic Retries - Configurable retry logic with exponential backoff
Priority Queues - Control task execution order
Pluggable Backends - Extensible result storage (in-memory included)
Multiple Databases - SQLite, PostgreSQL, and MySQL support for scheduler
Type Safe - Full type hints for better IDE support
Quick Example
import asyncio
from flowrra import Flowrra
# Create Flowrra application
app = Flowrra.from_urls()
# Register I/O-bound tasks (async functions)
@app.task()
async def send_email(to: str, subject: str):
"""Send an email asynchronously."""
await asyncio.sleep(0.1) # Simulate email sending
print(f"Email sent to {to}: {subject}")
return {"status": "sent", "to": to}
@app.task(max_retries=5, retry_delay=2.0)
async def fetch_data(url: str):
"""Fetch data with automatic retries."""
# Your HTTP request logic here
return {"data": "..."}
# Execute tasks
async def main():
async with app:
# Submit tasks
task_id = await app.submit(send_email, "user@example.com", "Hello")
# Wait for result
result = await app.wait_for_result(task_id, timeout=10.0)
print(f"Result: {result.result}")
asyncio.run(main())