flowrra.scheduler package
Subpackages
- flowrra.scheduler.backends package
- Submodules
- flowrra.scheduler.backends.base module
BaseSchedulerBackendBaseSchedulerBackend.create()BaseSchedulerBackend.update()BaseSchedulerBackend.delete()BaseSchedulerBackend.get()BaseSchedulerBackend.list_all()BaseSchedulerBackend.list_enabled()BaseSchedulerBackend.list_due()BaseSchedulerBackend.update_run_times()BaseSchedulerBackend.clear()BaseSchedulerBackend.find_by_definition()BaseSchedulerBackend.close()
- flowrra.scheduler.backends.mysql module
MySQLSchedulerBackendMySQLSchedulerBackend.__init__()MySQLSchedulerBackend.create()MySQLSchedulerBackend.update()MySQLSchedulerBackend.delete()MySQLSchedulerBackend.get()MySQLSchedulerBackend.list_all()MySQLSchedulerBackend.list_enabled()MySQLSchedulerBackend.list_due()MySQLSchedulerBackend.update_run_times()MySQLSchedulerBackend.clear()MySQLSchedulerBackend.find_by_definition()MySQLSchedulerBackend.close()
- flowrra.scheduler.backends.postgresql module
PostgreSQLSchedulerBackendPostgreSQLSchedulerBackend.__init__()PostgreSQLSchedulerBackend.create()PostgreSQLSchedulerBackend.update()PostgreSQLSchedulerBackend.delete()PostgreSQLSchedulerBackend.get()PostgreSQLSchedulerBackend.list_all()PostgreSQLSchedulerBackend.list_enabled()PostgreSQLSchedulerBackend.list_due()PostgreSQLSchedulerBackend.update_run_times()PostgreSQLSchedulerBackend.clear()PostgreSQLSchedulerBackend.find_by_definition()PostgreSQLSchedulerBackend.close()
- flowrra.scheduler.backends.sqlite module
SQLiteSchedulerBackendSQLiteSchedulerBackend.__init__()SQLiteSchedulerBackend.create()SQLiteSchedulerBackend.update()SQLiteSchedulerBackend.delete()SQLiteSchedulerBackend.get()SQLiteSchedulerBackend.list_all()SQLiteSchedulerBackend.list_enabled()SQLiteSchedulerBackend.list_due()SQLiteSchedulerBackend.update_run_times()SQLiteSchedulerBackend.clear()SQLiteSchedulerBackend.find_by_definition()SQLiteSchedulerBackend.close()
- Module contents
BaseSchedulerBackendBaseSchedulerBackend.create()BaseSchedulerBackend.update()BaseSchedulerBackend.delete()BaseSchedulerBackend.get()BaseSchedulerBackend.list_all()BaseSchedulerBackend.list_enabled()BaseSchedulerBackend.list_due()BaseSchedulerBackend.update_run_times()BaseSchedulerBackend.clear()BaseSchedulerBackend.find_by_definition()BaseSchedulerBackend.close()
SQLiteSchedulerBackendSQLiteSchedulerBackend.__init__()SQLiteSchedulerBackend.create()SQLiteSchedulerBackend.update()SQLiteSchedulerBackend.delete()SQLiteSchedulerBackend.get()SQLiteSchedulerBackend.list_all()SQLiteSchedulerBackend.list_enabled()SQLiteSchedulerBackend.list_due()SQLiteSchedulerBackend.update_run_times()SQLiteSchedulerBackend.clear()SQLiteSchedulerBackend.find_by_definition()SQLiteSchedulerBackend.close()
PostgreSQLSchedulerBackendPostgreSQLSchedulerBackend.__init__()PostgreSQLSchedulerBackend.create()PostgreSQLSchedulerBackend.update()PostgreSQLSchedulerBackend.delete()PostgreSQLSchedulerBackend.get()PostgreSQLSchedulerBackend.list_all()PostgreSQLSchedulerBackend.list_enabled()PostgreSQLSchedulerBackend.list_due()PostgreSQLSchedulerBackend.update_run_times()PostgreSQLSchedulerBackend.clear()PostgreSQLSchedulerBackend.find_by_definition()PostgreSQLSchedulerBackend.close()
MySQLSchedulerBackendMySQLSchedulerBackend.__init__()MySQLSchedulerBackend.create()MySQLSchedulerBackend.update()MySQLSchedulerBackend.delete()MySQLSchedulerBackend.get()MySQLSchedulerBackend.list_all()MySQLSchedulerBackend.list_enabled()MySQLSchedulerBackend.list_due()MySQLSchedulerBackend.update_run_times()MySQLSchedulerBackend.clear()MySQLSchedulerBackend.find_by_definition()MySQLSchedulerBackend.close()
get_scheduler_backend()
Submodules
flowrra.scheduler.cron module
Cron expression parser and evaluator.
- Supports standard cron syntax:
Minute (0-59)
Hour (0-23)
Day of month (1-31)
Month (1-12)
Day of week (0-7, where 0 and 7 are Sunday)
- Special characters:
(any value)
, (value list separator)
(range of values)
/ (step values)
Examples
“*/5 * * * *” - Every 5 minutes “0 */2 * * *” - Every 2 hours “0 9 * * 1-5” - 9 AM on weekdays “0 0 1 * *” - First day of every month at midnight
- class flowrra.scheduler.cron.CronExpression(expression)[source]
Bases:
objectParse and evaluate cron expressions.
- __init__(expression)[source]
Initialize cron expression.
- Parameters:
expression (
str) – Cron expression string (5 fields: minute hour day month weekday)- Raises:
ValueError – If expression format is invalid
- matches(dt)[source]
Check if datetime matches the cron expression.
- Parameters:
dt (
datetime) – Datetime to check- Return type:
bool- Returns:
True if datetime matches the cron schedule
flowrra.scheduler.models module
Scheduler models for persistent task scheduling.
- class flowrra.scheduler.models.ScheduleType(value)[source]
Bases:
EnumType of schedule for a task.
- CRON = 'cron'
- INTERVAL = 'interval'
- ONE_TIME = 'one_time'
- class flowrra.scheduler.models.ScheduledTask(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)[source]
Bases:
objectA task scheduled for execution.
- id
Unique identifier for scheduled task
- task_name
Name of the registered task to execute
- schedule_type
Type of schedule (cron, interval, one_time)
- schedule
Schedule definition (cron expression or interval in seconds)
- args
Positional arguments for task execution
- kwargs
Keyword arguments for task execution
- enabled
Whether the scheduled task is active
- last_run_at
Timestamp of last execution
- next_run_at
Timestamp of next scheduled execution
- created_at
When the scheduled task was created
- updated_at
When the scheduled task was last updated
- description
Optional description of the task
- max_retries
Maximum retry attempts on failure
- retry_delay
Delay between retries in seconds
- priority
Task priority (higher = more important)
- id: str
- task_name: str
- schedule_type: ScheduleType
- schedule: str
- args: tuple
- kwargs: dict
- enabled: bool = True
- last_run_at: datetime | None = None
- next_run_at: datetime | None = None
- created_at: datetime
- updated_at: datetime
- description: str | None = None
- max_retries: int = 3
- retry_delay: float = 1.0
- priority: int = 0
- __init__(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)
flowrra.scheduler.scheduler module
Scheduler service for managing and executing scheduled tasks.
- class flowrra.scheduler.scheduler.Scheduler(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]
Bases:
objectScheduler service for persistent task scheduling.
- Features:
Cron-based scheduling
Interval-based scheduling
One-time task scheduling
Persistent storage across restarts
Automatic next-run calculation
Task priority support
Automatic routing to IOExecutor or CPUExecutor
- Parameters:
backend (
BaseSchedulerBackend) – Scheduler storage backendregistry (
TaskRegistry) – Task registry for resolving task namescheck_interval (
float) – How often to check for due tasks (seconds)io_executor (IOExecutor | None) – IOExecutor for async tasks (optional)
cpu_executor (CPUExecutor | None) – CPUExecutor for CPU-bound tasks (optional)
Example
# Automatic integration with Flowrra app app = Flowrra.from_urls() scheduler = app.create_scheduler()
# Manual setup scheduler = Scheduler(
backend=backend, registry=registry, io_executor=io_executor, cpu_executor=cpu_executor
)
- __init__(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]
Initialize scheduler.
- Parameters:
backend (
BaseSchedulerBackend) – Storage backend for scheduled tasksregistry (
TaskRegistry) – Task registry for looking up task functionscheck_interval (
float) – Fallback interval in seconds (used on errors). The scheduler uses dynamic sleep calculation by default, waking up precisely when tasks are due. This parameter is only used as a fallback when sleep calculation fails.io_executor (IOExecutor | None) – IOExecutor instance for async tasks
cpu_executor (CPUExecutor | None) – CPUExecutor instance for CPU-bound tasks
Note
The scheduler automatically calculates optimal sleep duration based on when the next task is scheduled to run, with a maximum sleep of 1 hour to periodically refresh the schedule list.
- set_submit_callback(callback)[source]
Set callback for submitting tasks to executor.
LEGACY API: This method exists for backward compatibility and testing purposes. For production use, prefer passing executor instances to the constructor:
- scheduler = Scheduler(
backend=backend, io_executor=io_executor, cpu_executor=cpu_executor
)
The callback approach bypasses executor-based task routing and requires manual implementation of submission logic. Use only when: - Writing unit tests that need to intercept submissions - Implementing custom submission logic outside the executor pattern
- Parameters:
callback (
Callable) – Async function that accepts (task_func, *args, **kwargs)- Return type:
None
- Example (Testing):
-
scheduler.set_submit_callback(track_submission)
- Example (Modern Approach - Recommended):
# Don’t use callback - pass executors instead scheduler = Scheduler(backend, io_executor=io_executor) # Tasks automatically routed to correct executor
- async schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]
Schedule a task using cron expression.
Automatically idempotent: if a schedule with the same task_name, cron, args, and kwargs already exists, returns the existing schedule ID.
- Parameters:
task_name (
str) – Name of registered task to executecron (
str) – Cron expression (e.g., “0 9 * * *” for daily at 9 AM)args (
tuple) – Positional arguments for taskkwargs (
dict|None) – Keyword arguments for taskenabled (
bool) – Whether task is enableddescription (
str|None) – Optional task descriptionmax_retries (
int) – Maximum retry attemptsretry_delay (
float) – Delay between retries in secondspriority (
int) – Task priority (higher = more important)task_id (
str|None) – Optional custom task ID (generated if not provided)
- Return type:
str- Returns:
Task ID (existing or newly created)
- Raises:
ValueError – If cron expression is invalid or task not registered
- async schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]
Schedule a task to run at fixed intervals.
Automatically idempotent: if a schedule with the same task_name, interval, args, and kwargs already exists, returns the existing schedule ID.
- Parameters:
task_name (
str) – Name of registered task to executeinterval (
float) – Interval in seconds between executionsargs (
tuple) – Positional arguments for taskkwargs (
dict|None) – Keyword arguments for taskenabled (
bool) – Whether task is enableddescription (
str|None) – Optional task descriptionmax_retries (
int) – Maximum retry attemptsretry_delay (
float) – Delay between retries in secondspriority (
int) – Task priority (higher = more important)task_id (
str|None) – Optional custom task ID (generated if not provided)
- Return type:
str- Returns:
Task ID (existing or newly created)
- Raises:
ValueError – If interval is invalid or task not registered
- async schedule_once(task_name, run_at, args=(), kwargs=None, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]
Schedule a task to run once at a specific time.
Automatically idempotent: if a schedule with the same task_name, run_at, args, and kwargs already exists, returns the existing schedule ID.
- Parameters:
task_name (
str) – Name of registered task to executerun_at (
datetime) – When to run the taskargs (
tuple) – Positional arguments for taskkwargs (
dict|None) – Keyword arguments for taskdescription (
str|None) – Optional task descriptionmax_retries (
int) – Maximum retry attemptsretry_delay (
float) – Delay between retries in secondspriority (
int) – Task priority (higher = more important)task_id (
str|None) – Optional custom task ID (generated if not provided)
- Return type:
str- Returns:
Task ID (existing or newly created)
- Raises:
ValueError – If task not registered
- async unschedule(task_id)[source]
Remove a scheduled task.
- Parameters:
task_id (
str) – ID of task to remove- Return type:
bool- Returns:
True if deleted, False if not found
- async enable_task(task_id)[source]
Enable a scheduled task.
- Parameters:
task_id (
str) – ID of task to enable- Raises:
ValueError – If task not found
- Return type:
None
- async disable_task(task_id)[source]
Disable a scheduled task.
- Parameters:
task_id (
str) – ID of task to disable- Raises:
ValueError – If task not found
- Return type:
None
- async get_scheduled_task(task_id)[source]
Get a scheduled task by ID.
- Parameters:
task_id (
str) – Task ID- Return type:
ScheduledTask|None- Returns:
ScheduledTask if found, None otherwise
- async list_scheduled_tasks()[source]
List all scheduled tasks.
- Return type:
list[ScheduledTask]- Returns:
List of all ScheduledTask objects
- property is_running: bool
Check if scheduler is running.
Module contents
Persistent task scheduling for Flowrra.
This module provides Celery Beat-like scheduling capabilities with support for cron expressions, intervals, and one-time task execution.
Example
from flowrra import Flowrra from flowrra.scheduler import Scheduler from flowrra.scheduler.backends import get_scheduler_backend
# Create Flowrra app app = Flowrra.from_urls()
# Create scheduler with SQLite backend (default) backend = get_scheduler_backend() scheduler = Scheduler(backend=backend, registry=app.registry)
# Schedule a task with cron expression await scheduler.schedule_cron(
task_name=”my_task”, cron=”0 9 * * *”, # Daily at 9 AM args=(1, 2), kwargs={“foo”: “bar”}
)
# Start scheduler await scheduler.start()
- class flowrra.scheduler.Scheduler(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]
Bases:
objectScheduler service for persistent task scheduling.
- Features:
Cron-based scheduling
Interval-based scheduling
One-time task scheduling
Persistent storage across restarts
Automatic next-run calculation
Task priority support
Automatic routing to IOExecutor or CPUExecutor
- Parameters:
backend (
BaseSchedulerBackend) – Scheduler storage backendregistry (
TaskRegistry) – Task registry for resolving task namescheck_interval (
float) – How often to check for due tasks (seconds)io_executor (IOExecutor | None) – IOExecutor for async tasks (optional)
cpu_executor (CPUExecutor | None) – CPUExecutor for CPU-bound tasks (optional)
Example
# Automatic integration with Flowrra app app = Flowrra.from_urls() scheduler = app.create_scheduler()
# Manual setup scheduler = Scheduler(
backend=backend, registry=registry, io_executor=io_executor, cpu_executor=cpu_executor
)
- __init__(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]
Initialize scheduler.
- Parameters:
backend (
BaseSchedulerBackend) – Storage backend for scheduled tasksregistry (
TaskRegistry) – Task registry for looking up task functionscheck_interval (
float) – Fallback interval in seconds (used on errors). The scheduler uses dynamic sleep calculation by default, waking up precisely when tasks are due. This parameter is only used as a fallback when sleep calculation fails.io_executor (IOExecutor | None) – IOExecutor instance for async tasks
cpu_executor (CPUExecutor | None) – CPUExecutor instance for CPU-bound tasks
Note
The scheduler automatically calculates optimal sleep duration based on when the next task is scheduled to run, with a maximum sleep of 1 hour to periodically refresh the schedule list.
- set_submit_callback(callback)[source]
Set callback for submitting tasks to executor.
LEGACY API: This method exists for backward compatibility and testing purposes. For production use, prefer passing executor instances to the constructor:
- scheduler = Scheduler(
backend=backend, io_executor=io_executor, cpu_executor=cpu_executor
)
The callback approach bypasses executor-based task routing and requires manual implementation of submission logic. Use only when: - Writing unit tests that need to intercept submissions - Implementing custom submission logic outside the executor pattern
- Parameters:
callback (
Callable) – Async function that accepts (task_func, *args, **kwargs)- Return type:
None
- Example (Testing):
-
scheduler.set_submit_callback(track_submission)
- Example (Modern Approach - Recommended):
# Don’t use callback - pass executors instead scheduler = Scheduler(backend, io_executor=io_executor) # Tasks automatically routed to correct executor
- async schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]
Schedule a task using cron expression.
Automatically idempotent: if a schedule with the same task_name, cron, args, and kwargs already exists, returns the existing schedule ID.
- Parameters:
task_name (
str) – Name of registered task to executecron (
str) – Cron expression (e.g., “0 9 * * *” for daily at 9 AM)args (
tuple) – Positional arguments for taskkwargs (
dict|None) – Keyword arguments for taskenabled (
bool) – Whether task is enableddescription (
str|None) – Optional task descriptionmax_retries (
int) – Maximum retry attemptsretry_delay (
float) – Delay between retries in secondspriority (
int) – Task priority (higher = more important)task_id (
str|None) – Optional custom task ID (generated if not provided)
- Return type:
str- Returns:
Task ID (existing or newly created)
- Raises:
ValueError – If cron expression is invalid or task not registered
- async schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]
Schedule a task to run at fixed intervals.
Automatically idempotent: if a schedule with the same task_name, interval, args, and kwargs already exists, returns the existing schedule ID.
- Parameters:
task_name (
str) – Name of registered task to executeinterval (
float) – Interval in seconds between executionsargs (
tuple) – Positional arguments for taskkwargs (
dict|None) – Keyword arguments for taskenabled (
bool) – Whether task is enableddescription (
str|None) – Optional task descriptionmax_retries (
int) – Maximum retry attemptsretry_delay (
float) – Delay between retries in secondspriority (
int) – Task priority (higher = more important)task_id (
str|None) – Optional custom task ID (generated if not provided)
- Return type:
str- Returns:
Task ID (existing or newly created)
- Raises:
ValueError – If interval is invalid or task not registered
- async schedule_once(task_name, run_at, args=(), kwargs=None, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]
Schedule a task to run once at a specific time.
Automatically idempotent: if a schedule with the same task_name, run_at, args, and kwargs already exists, returns the existing schedule ID.
- Parameters:
task_name (
str) – Name of registered task to executerun_at (
datetime) – When to run the taskargs (
tuple) – Positional arguments for taskkwargs (
dict|None) – Keyword arguments for taskdescription (
str|None) – Optional task descriptionmax_retries (
int) – Maximum retry attemptsretry_delay (
float) – Delay between retries in secondspriority (
int) – Task priority (higher = more important)task_id (
str|None) – Optional custom task ID (generated if not provided)
- Return type:
str- Returns:
Task ID (existing or newly created)
- Raises:
ValueError – If task not registered
- async unschedule(task_id)[source]
Remove a scheduled task.
- Parameters:
task_id (
str) – ID of task to remove- Return type:
bool- Returns:
True if deleted, False if not found
- async enable_task(task_id)[source]
Enable a scheduled task.
- Parameters:
task_id (
str) – ID of task to enable- Raises:
ValueError – If task not found
- Return type:
None
- async disable_task(task_id)[source]
Disable a scheduled task.
- Parameters:
task_id (
str) – ID of task to disable- Raises:
ValueError – If task not found
- Return type:
None
- async get_scheduled_task(task_id)[source]
Get a scheduled task by ID.
- Parameters:
task_id (
str) – Task ID- Return type:
ScheduledTask|None- Returns:
ScheduledTask if found, None otherwise
- async list_scheduled_tasks()[source]
List all scheduled tasks.
- Return type:
list[ScheduledTask]- Returns:
List of all ScheduledTask objects
- property is_running: bool
Check if scheduler is running.
- class flowrra.scheduler.ScheduledTask(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)[source]
Bases:
objectA task scheduled for execution.
- id
Unique identifier for scheduled task
- task_name
Name of the registered task to execute
- schedule_type
Type of schedule (cron, interval, one_time)
- schedule
Schedule definition (cron expression or interval in seconds)
- args
Positional arguments for task execution
- kwargs
Keyword arguments for task execution
- enabled
Whether the scheduled task is active
- last_run_at
Timestamp of last execution
- next_run_at
Timestamp of next scheduled execution
- created_at
When the scheduled task was created
- updated_at
When the scheduled task was last updated
- description
Optional description of the task
- max_retries
Maximum retry attempts on failure
- retry_delay
Delay between retries in seconds
- priority
Task priority (higher = more important)
- id: str
- task_name: str
- schedule_type: ScheduleType
- schedule: str
- args: tuple
- kwargs: dict
- enabled: bool = True
- last_run_at: datetime | None = None
- next_run_at: datetime | None = None
- created_at: datetime
- updated_at: datetime
- description: str | None = None
- max_retries: int = 3
- retry_delay: float = 1.0
- priority: int = 0
- __init__(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)
- class flowrra.scheduler.ScheduleType(value)[source]
Bases:
EnumType of schedule for a task.
- CRON = 'cron'
- INTERVAL = 'interval'
- ONE_TIME = 'one_time'
- class flowrra.scheduler.CronExpression(expression)[source]
Bases:
objectParse and evaluate cron expressions.
- __init__(expression)[source]
Initialize cron expression.
- Parameters:
expression (
str) – Cron expression string (5 fields: minute hour day month weekday)- Raises:
ValueError – If expression format is invalid
- matches(dt)[source]
Check if datetime matches the cron expression.
- Parameters:
dt (
datetime) – Datetime to check- Return type:
bool- Returns:
True if datetime matches the cron schedule