Task Scheduling
Flowrra includes a powerful task scheduling system similar to Celery Beat, allowing you to schedule tasks to run at specific times, intervals, or using cron expressions.
Features
Cron-based scheduling: Use standard cron syntax for flexible scheduling
Interval-based scheduling: Run tasks at fixed intervals
One-time scheduling: Schedule tasks to run once at a specific time
Persistent storage: Schedules survive application restarts
Multiple database backends: SQLite (default), PostgreSQL, or MySQL
Priority support: High-priority tasks execute first
Enable/disable tasks: Turn scheduled tasks on or off without deleting them
Quick Start
Basic Setup
The recommended way to use the scheduler is through the Flowrra app’s create_scheduler() method:
from flowrra import Flowrra
import asyncio
# Create Flowrra app
app = Flowrra.from_urls()
# Define a task
@app.task()
async def send_daily_report():
print("Generating daily report...")
return "Report sent"
# Create scheduler (automatically integrated with app's executors)
scheduler = app.create_scheduler()
# Schedule and start
async def main():
# Schedule the task to run daily at 9 AM
await scheduler.schedule_cron(
task_name="send_daily_report",
cron="0 9 * * *",
description="Daily report at 9 AM"
)
# Start app (automatically starts scheduler too)
await app.start()
# Keep running
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
Note
When you create a scheduler using app.create_scheduler(), it automatically integrates with your app’s executors and starts/stops with the app lifecycle.
Advanced Setup
For more control over the scheduler backend:
from flowrra import Flowrra
app = Flowrra.from_urls()
# Create scheduler with custom backend
scheduler = app.create_scheduler(
backend="postgresql://localhost/flowrra" # Use PostgreSQL
)
Scheduling Methods
1. Cron-based Scheduling
Schedule tasks using standard cron expressions:
# Every day at 9:00 AM
await scheduler.schedule_cron(
task_name="daily_task",
cron="0 9 * * *",
args=(1, 2),
kwargs={"key": "value"}
)
# Every 5 minutes
await scheduler.schedule_cron(
task_name="frequent_task",
cron="*/5 * * * *"
)
# Every Monday at 8:30 AM
await scheduler.schedule_cron(
task_name="weekly_task",
cron="30 8 * * 1"
)
Cron Format: minute hour day month weekday
minute: 0-59hour: 0-23day: 1-31month: 1-12weekday: 0-7 (0 and 7 are Sunday)
Special characters:
*: Any value,: Value list separator (e.g.,1,3,5)-: Range of values (e.g.,1-5)/: Step values (e.g.,*/15= every 15 minutes)
2. Interval-based Scheduling
Run tasks at fixed intervals:
# Every 5 minutes (300 seconds)
await scheduler.schedule_interval(
task_name="periodic_task",
interval=300,
args=(arg1, arg2),
description="Runs every 5 minutes"
)
# Every hour
await scheduler.schedule_interval(
task_name="hourly_task",
interval=3600
)
3. One-time Scheduling
Schedule a task to run once at a specific time:
from datetime import datetime, timedelta
# Run in 1 hour
run_time = datetime.now() + timedelta(hours=1)
await scheduler.schedule_once(
task_name="one_time_task",
run_at=run_time,
args=(data,),
description="Runs once in 1 hour"
)
Task Management
List Scheduled Tasks
# Get all scheduled tasks
all_tasks = await scheduler.list_scheduled_tasks()
for task in all_tasks:
print(f"Task: {task.task_name}")
print(f"Schedule: {task.schedule}")
print(f"Next run: {task.next_run_at}")
print(f"Enabled: {task.enabled}")
Enable/Disable Tasks
# Disable a task (keeps it in database but won't execute)
await scheduler.disable_task(task_id)
# Re-enable a task
await scheduler.enable_task(task_id)
Delete Scheduled Tasks
# Remove a scheduled task permanently
deleted = await scheduler.unschedule(task_id)
if deleted:
print("Task removed successfully")
Database Backends
SQLite (Default)
SQLite is the default backend, perfect for single-instance deployments:
# Default SQLite (creates .flowrra_schedule.db)
scheduler = app.create_scheduler()
# Custom SQLite path
scheduler = app.create_scheduler(backend="sqlite:///path/to/schedule.db")
PostgreSQL
For distributed deployments with multiple scheduler instances:
# Requires: pip install flowrra[postgresql]
scheduler = app.create_scheduler(
backend="postgresql://user:password@localhost:5432/flowrra"
)
MySQL
# Requires: pip install flowrra[mysql]
scheduler = app.create_scheduler(
backend="mysql://user:password@localhost:3306/flowrra"
)
Persistent Storage Examples
For production deployments, using PostgreSQL or MySQL provides better concurrency, distributed worker support, and high availability.
PostgreSQL Production Example
Complete example with PostgreSQL backend:
from flowrra import Flowrra
import asyncio
import os
# Create Flowrra app
app = Flowrra.from_urls(
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Define tasks
@app.task()
async def process_batch(batch_id: int):
"""Process data batch."""
print(f"Processing batch {batch_id}...")
return {"status": "completed", "batch_id": batch_id}
@app.task()
async def cleanup_old_records():
"""Clean up old database records."""
print("Cleaning up old records...")
return {"cleaned": 1000}
# Create scheduler with PostgreSQL backend
scheduler = app.create_scheduler(
backend=os.getenv(
"SCHEDULER_DB",
"postgresql://flowrra:password@localhost:5432/flowrra_prod"
)
)
async def main():
# Schedule hourly batch processing
await scheduler.schedule_cron(
task_name="process_batch",
cron="0 * * * *",
args=(1,),
description="Process data every hour"
)
# Schedule daily cleanup at 2 AM
await scheduler.schedule_cron(
task_name="cleanup_old_records",
cron="0 2 * * *",
description="Daily cleanup at 2 AM"
)
# Start app (automatically starts scheduler)
await app.start()
# Keep running
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
Benefits:
Distributed workers: Run multiple scheduler instances safely
High availability: No single point of failure
Better concurrency: PostgreSQL handles concurrent schedule updates
ACID transactions: Guaranteed consistency
Connection pooling: Efficient resource usage via asyncpg
MySQL/MariaDB Production Example
Complete example with MySQL backend (also compatible with MariaDB):
from flowrra import Flowrra
import asyncio
import os
# Create Flowrra app
app = Flowrra.from_urls(
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# Define tasks
@app.task()
async def send_notifications(user_count: int):
"""Send notifications to users."""
print(f"Sending notifications to {user_count} users...")
return {"sent": user_count}
@app.task()
async def generate_analytics():
"""Generate analytics reports."""
print("Generating analytics...")
return {"status": "completed"}
# Create scheduler with MySQL backend
scheduler = app.create_scheduler(
backend=os.getenv(
"SCHEDULER_DB",
"mysql://flowrra:password@localhost:3306/flowrra_prod"
)
)
async def main():
# Schedule notifications every 15 minutes
await scheduler.schedule_cron(
task_name="send_notifications",
cron="*/15 * * * *",
args=(100,),
description="Send notifications every 15 minutes"
)
# Schedule weekly analytics on Mondays at 9 AM
await scheduler.schedule_cron(
task_name="generate_analytics",
cron="0 9 * * 1",
description="Weekly analytics on Monday"
)
# Start app (automatically starts scheduler)
await app.start()
# Keep running
await asyncio.Event().wait()
if __name__ == "__main__":
asyncio.run(main())
MariaDB Note:
The MySQL backend is fully compatible with MariaDB. Use the same connection string format:
backend="mysql://user:password@localhost:3306/database"
Connection String Formats
# SQLite (default)
backend=None # Creates .flowrra_schedule.db
backend="sqlite:///path/to/schedule.db"
# PostgreSQL
backend="postgresql://user:password@host:5432/database"
backend="postgresql://user:password@host:5432/database?sslmode=require"
# PostgreSQL with connection options
backend="postgresql://user:password@host:5432/database?sslmode=require&pool_min_size=5&pool_max_size=20"
# MySQL/MariaDB
backend="mysql://user:password@host:3306/database"
backend="mysql://user:password@host:3306/database?charset=utf8mb4"
Environment Variables for Production
Use environment variables to manage database credentials securely:
import os
from flowrra import Flowrra
app = Flowrra.from_urls(
broker=os.getenv("REDIS_BROKER_URL"),
backend=os.getenv("REDIS_BACKEND_URL")
)
scheduler = app.create_scheduler(
backend=os.getenv("SCHEDULER_DATABASE_URL")
)
# .env file for production
REDIS_BROKER_URL=redis://localhost:6379/0
REDIS_BACKEND_URL=redis://localhost:6379/1
SCHEDULER_DATABASE_URL=postgresql://flowrra:secret@db.example.com:5432/flowrra_prod
# .env file for staging
REDIS_BROKER_URL=redis://staging-redis:6379/0
REDIS_BACKEND_URL=redis://staging-redis:6379/1
SCHEDULER_DATABASE_URL=postgresql://flowrra:secret@staging-db.example.com:5432/flowrra_staging
When to Use Each Backend
SQLite (Default):
Single-instance deployments
Development and testing
Low to moderate schedule volume
No external database infrastructure
Simple file-based storage
PostgreSQL (Recommended for Production):
Multiple worker instances
High availability requirements
High concurrency needs
Enterprise deployments
Advanced features (JSONB, full-text search)
Complex queries and reporting
Cloud deployments (AWS RDS, Google Cloud SQL, etc.)
MySQL/MariaDB:
Existing MySQL/MariaDB infrastructure
Multi-instance deployments
Good balance of features and simplicity
MariaDB compatibility required
Shared hosting environments
Integration with Executors
Automatic Integration (Recommended)
The scheduler automatically integrates with your app’s executors when created via app.create_scheduler():
from flowrra import Flowrra
app = Flowrra.from_urls()
# Define tasks (async for IOExecutor, sync for CPUExecutor)
@app.task()
async def io_task(message: str):
print(f"IO task executed: {message}")
return "done"
@app.task(cpu_bound=True)
def cpu_task(n: int):
return sum(i ** 2 for i in range(n))
# Create scheduler - automatically routes tasks to correct executor
scheduler = app.create_scheduler()
async def main():
# Schedule IO-bound task
await scheduler.schedule_cron(
task_name="io_task",
cron="*/5 * * * *",
args=("Hello from scheduler!",)
)
# Schedule CPU-bound task
await scheduler.schedule_interval(
task_name="cpu_task",
interval=300,
args=(1000000,)
)
# Start everything (automatically starts scheduler)
await app.start()
await asyncio.Event().wait()
How it works:
The scheduler checks each task’s
cpu_boundattributeIO-bound tasks (async) are automatically routed to IOExecutor
CPU-bound tasks (sync) are automatically routed to CPUExecutor
No manual callback or routing logic needed!
Manual Integration (Advanced)
For advanced use cases, you can create the scheduler manually:
from flowrra.scheduler import Scheduler
from flowrra.scheduler.backends import get_scheduler_backend
# Manual scheduler setup (not recommended for most cases)
scheduler = Scheduler(
backend=get_scheduler_backend(),
registry=app.registry,
io_executor=app._io_executor,
cpu_executor=app._cpu_executor
)
Warning
Manual setup requires manual start/stop management. The recommended approach is to use app.create_scheduler() which automatically handles integration and lifecycle.
Common Patterns
Daily Reports
# Generate report every day at 9 AM
await scheduler.schedule_cron(
task_name="generate_daily_report",
cron="0 9 * * *",
description="Daily report generation"
)
Hourly Cleanup
# Clean up old data every hour
await scheduler.schedule_cron(
task_name="cleanup_old_data",
cron="0 * * * *",
description="Hourly cleanup"
)
Weekday Business Hours
# Send reminders during business hours on weekdays
await scheduler.schedule_cron(
task_name="send_reminders",
cron="0 9-17 * * 1-5", # 9 AM - 5 PM, Mon-Fri
description="Business hours reminders"
)
Best Practices
Use meaningful task names: Make task names descriptive and unique
Add descriptions: Include descriptions for scheduled tasks to document their purpose
Set appropriate priorities: Reserve high priorities for critical tasks
Monitor task execution: Log task executions for debugging and monitoring
Handle errors gracefully: Implement proper error handling in scheduled tasks
Use environment variables: Store database URLs in environment variables for different environments
Test schedules: Verify cron expressions produce expected run times before deploying
Use appropriate databases: SQLite for single-instance, PostgreSQL/MySQL for distributed
See Also
flowrra.scheduler package - Complete API reference
Working with Tasks - Task definition and execution
Executors - Executor configuration