Task Scheduling

Flowrra includes a powerful task scheduling system similar to Celery Beat, allowing you to schedule tasks to run at specific times, intervals, or using cron expressions.

Features

  • Cron-based scheduling: Use standard cron syntax for flexible scheduling

  • Interval-based scheduling: Run tasks at fixed intervals

  • One-time scheduling: Schedule tasks to run once at a specific time

  • Persistent storage: Schedules survive application restarts

  • Multiple database backends: SQLite (default), PostgreSQL, or MySQL

  • Priority support: High-priority tasks execute first

  • Enable/disable tasks: Turn scheduled tasks on or off without deleting them

Quick Start

Basic Setup

The recommended way to use the scheduler is through the Flowrra app’s create_scheduler() method:

from flowrra import Flowrra
import asyncio

# Create Flowrra app
app = Flowrra.from_urls()

# Define a task
@app.task()
async def send_daily_report():
    print("Generating daily report...")
    return "Report sent"

# Create scheduler (automatically integrated with app's executors)
scheduler = app.create_scheduler()

# Schedule and start
async def main():
    # Schedule the task to run daily at 9 AM
    await scheduler.schedule_cron(
        task_name="send_daily_report",
        cron="0 9 * * *",
        description="Daily report at 9 AM"
    )

    # Start app (automatically starts scheduler too)
    await app.start()

    # Keep running
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())

Note

When you create a scheduler using app.create_scheduler(), it automatically integrates with your app’s executors and starts/stops with the app lifecycle.

Advanced Setup

For more control over the scheduler backend:

from flowrra import Flowrra

app = Flowrra.from_urls()

# Create scheduler with custom backend
scheduler = app.create_scheduler(
    backend="postgresql://localhost/flowrra"  # Use PostgreSQL
)

Scheduling Methods

1. Cron-based Scheduling

Schedule tasks using standard cron expressions:

# Every day at 9:00 AM
await scheduler.schedule_cron(
    task_name="daily_task",
    cron="0 9 * * *",
    args=(1, 2),
    kwargs={"key": "value"}
)

# Every 5 minutes
await scheduler.schedule_cron(
    task_name="frequent_task",
    cron="*/5 * * * *"
)

# Every Monday at 8:30 AM
await scheduler.schedule_cron(
    task_name="weekly_task",
    cron="30 8 * * 1"
)

Cron Format: minute hour day month weekday

  • minute: 0-59

  • hour: 0-23

  • day: 1-31

  • month: 1-12

  • weekday: 0-7 (0 and 7 are Sunday)

Special characters:

  • *: Any value

  • ,: Value list separator (e.g., 1,3,5)

  • -: Range of values (e.g., 1-5)

  • /: Step values (e.g., */15 = every 15 minutes)

2. Interval-based Scheduling

Run tasks at fixed intervals:

# Every 5 minutes (300 seconds)
await scheduler.schedule_interval(
    task_name="periodic_task",
    interval=300,
    args=(arg1, arg2),
    description="Runs every 5 minutes"
)

# Every hour
await scheduler.schedule_interval(
    task_name="hourly_task",
    interval=3600
)

3. One-time Scheduling

Schedule a task to run once at a specific time:

from datetime import datetime, timedelta

# Run in 1 hour
run_time = datetime.now() + timedelta(hours=1)
await scheduler.schedule_once(
    task_name="one_time_task",
    run_at=run_time,
    args=(data,),
    description="Runs once in 1 hour"
)

Task Management

List Scheduled Tasks

# Get all scheduled tasks
all_tasks = await scheduler.list_scheduled_tasks()

for task in all_tasks:
    print(f"Task: {task.task_name}")
    print(f"Schedule: {task.schedule}")
    print(f"Next run: {task.next_run_at}")
    print(f"Enabled: {task.enabled}")

Enable/Disable Tasks

# Disable a task (keeps it in database but won't execute)
await scheduler.disable_task(task_id)

# Re-enable a task
await scheduler.enable_task(task_id)

Delete Scheduled Tasks

# Remove a scheduled task permanently
deleted = await scheduler.unschedule(task_id)
if deleted:
    print("Task removed successfully")

Database Backends

SQLite (Default)

SQLite is the default backend, perfect for single-instance deployments:

# Default SQLite (creates .flowrra_schedule.db)
scheduler = app.create_scheduler()

# Custom SQLite path
scheduler = app.create_scheduler(backend="sqlite:///path/to/schedule.db")

PostgreSQL

For distributed deployments with multiple scheduler instances:

# Requires: pip install flowrra[postgresql]

scheduler = app.create_scheduler(
    backend="postgresql://user:password@localhost:5432/flowrra"
)

MySQL

# Requires: pip install flowrra[mysql]

scheduler = app.create_scheduler(
    backend="mysql://user:password@localhost:3306/flowrra"
)

Persistent Storage Examples

For production deployments, using PostgreSQL or MySQL provides better concurrency, distributed worker support, and high availability.

PostgreSQL Production Example

Complete example with PostgreSQL backend:

from flowrra import Flowrra
import asyncio
import os

# Create Flowrra app
app = Flowrra.from_urls(
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

# Define tasks
@app.task()
async def process_batch(batch_id: int):
    """Process data batch."""
    print(f"Processing batch {batch_id}...")
    return {"status": "completed", "batch_id": batch_id}

@app.task()
async def cleanup_old_records():
    """Clean up old database records."""
    print("Cleaning up old records...")
    return {"cleaned": 1000}

# Create scheduler with PostgreSQL backend
scheduler = app.create_scheduler(
    backend=os.getenv(
        "SCHEDULER_DB",
        "postgresql://flowrra:password@localhost:5432/flowrra_prod"
    )
)

async def main():
    # Schedule hourly batch processing
    await scheduler.schedule_cron(
        task_name="process_batch",
        cron="0 * * * *",
        args=(1,),
        description="Process data every hour"
    )

    # Schedule daily cleanup at 2 AM
    await scheduler.schedule_cron(
        task_name="cleanup_old_records",
        cron="0 2 * * *",
        description="Daily cleanup at 2 AM"
    )

    # Start app (automatically starts scheduler)
    await app.start()

    # Keep running
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())

Benefits:

  • Distributed workers: Run multiple scheduler instances safely

  • High availability: No single point of failure

  • Better concurrency: PostgreSQL handles concurrent schedule updates

  • ACID transactions: Guaranteed consistency

  • Connection pooling: Efficient resource usage via asyncpg

MySQL/MariaDB Production Example

Complete example with MySQL backend (also compatible with MariaDB):

from flowrra import Flowrra
import asyncio
import os

# Create Flowrra app
app = Flowrra.from_urls(
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

# Define tasks
@app.task()
async def send_notifications(user_count: int):
    """Send notifications to users."""
    print(f"Sending notifications to {user_count} users...")
    return {"sent": user_count}

@app.task()
async def generate_analytics():
    """Generate analytics reports."""
    print("Generating analytics...")
    return {"status": "completed"}

# Create scheduler with MySQL backend
scheduler = app.create_scheduler(
    backend=os.getenv(
        "SCHEDULER_DB",
        "mysql://flowrra:password@localhost:3306/flowrra_prod"
    )
)

async def main():
    # Schedule notifications every 15 minutes
    await scheduler.schedule_cron(
        task_name="send_notifications",
        cron="*/15 * * * *",
        args=(100,),
        description="Send notifications every 15 minutes"
    )

    # Schedule weekly analytics on Mondays at 9 AM
    await scheduler.schedule_cron(
        task_name="generate_analytics",
        cron="0 9 * * 1",
        description="Weekly analytics on Monday"
    )

    # Start app (automatically starts scheduler)
    await app.start()

    # Keep running
    await asyncio.Event().wait()

if __name__ == "__main__":
    asyncio.run(main())

MariaDB Note:

The MySQL backend is fully compatible with MariaDB. Use the same connection string format:

backend="mysql://user:password@localhost:3306/database"

Connection String Formats

# SQLite (default)
backend=None  # Creates .flowrra_schedule.db
backend="sqlite:///path/to/schedule.db"

# PostgreSQL
backend="postgresql://user:password@host:5432/database"
backend="postgresql://user:password@host:5432/database?sslmode=require"

# PostgreSQL with connection options
backend="postgresql://user:password@host:5432/database?sslmode=require&pool_min_size=5&pool_max_size=20"

# MySQL/MariaDB
backend="mysql://user:password@host:3306/database"
backend="mysql://user:password@host:3306/database?charset=utf8mb4"

Environment Variables for Production

Use environment variables to manage database credentials securely:

import os
from flowrra import Flowrra

app = Flowrra.from_urls(
    broker=os.getenv("REDIS_BROKER_URL"),
    backend=os.getenv("REDIS_BACKEND_URL")
)

scheduler = app.create_scheduler(
    backend=os.getenv("SCHEDULER_DATABASE_URL")
)
# .env file for production
REDIS_BROKER_URL=redis://localhost:6379/0
REDIS_BACKEND_URL=redis://localhost:6379/1
SCHEDULER_DATABASE_URL=postgresql://flowrra:secret@db.example.com:5432/flowrra_prod
# .env file for staging
REDIS_BROKER_URL=redis://staging-redis:6379/0
REDIS_BACKEND_URL=redis://staging-redis:6379/1
SCHEDULER_DATABASE_URL=postgresql://flowrra:secret@staging-db.example.com:5432/flowrra_staging

When to Use Each Backend

SQLite (Default):

  • Single-instance deployments

  • Development and testing

  • Low to moderate schedule volume

  • No external database infrastructure

  • Simple file-based storage

PostgreSQL (Recommended for Production):

  • Multiple worker instances

  • High availability requirements

  • High concurrency needs

  • Enterprise deployments

  • Advanced features (JSONB, full-text search)

  • Complex queries and reporting

  • Cloud deployments (AWS RDS, Google Cloud SQL, etc.)

MySQL/MariaDB:

  • Existing MySQL/MariaDB infrastructure

  • Multi-instance deployments

  • Good balance of features and simplicity

  • MariaDB compatibility required

  • Shared hosting environments

Integration with Executors

Manual Integration (Advanced)

For advanced use cases, you can create the scheduler manually:

from flowrra.scheduler import Scheduler
from flowrra.scheduler.backends import get_scheduler_backend

# Manual scheduler setup (not recommended for most cases)
scheduler = Scheduler(
    backend=get_scheduler_backend(),
    registry=app.registry,
    io_executor=app._io_executor,
    cpu_executor=app._cpu_executor
)

Warning

Manual setup requires manual start/stop management. The recommended approach is to use app.create_scheduler() which automatically handles integration and lifecycle.

Common Patterns

Daily Reports

# Generate report every day at 9 AM
await scheduler.schedule_cron(
    task_name="generate_daily_report",
    cron="0 9 * * *",
    description="Daily report generation"
)

Hourly Cleanup

# Clean up old data every hour
await scheduler.schedule_cron(
    task_name="cleanup_old_data",
    cron="0 * * * *",
    description="Hourly cleanup"
)

Weekday Business Hours

# Send reminders during business hours on weekdays
await scheduler.schedule_cron(
    task_name="send_reminders",
    cron="0 9-17 * * 1-5",  # 9 AM - 5 PM, Mon-Fri
    description="Business hours reminders"
)

Best Practices

  1. Use meaningful task names: Make task names descriptive and unique

  2. Add descriptions: Include descriptions for scheduled tasks to document their purpose

  3. Set appropriate priorities: Reserve high priorities for critical tasks

  4. Monitor task execution: Log task executions for debugging and monitoring

  5. Handle errors gracefully: Implement proper error handling in scheduled tasks

  6. Use environment variables: Store database URLs in environment variables for different environments

  7. Test schedules: Verify cron expressions produce expected run times before deploying

  8. Use appropriate databases: SQLite for single-instance, PostgreSQL/MySQL for distributed

See Also