flowrra.scheduler.backends package
Submodules
flowrra.scheduler.backends.base module
Base interface for scheduler storage backends.
- class flowrra.scheduler.backends.base.BaseSchedulerBackend[source]
Bases:
ABCAbstract base class for scheduler storage backends.
Backends are responsible for persisting scheduled task definitions and providing query capabilities for the scheduler service.
- abstractmethod async create(task)[source]
Create a new scheduled task.
- Parameters:
task (
ScheduledTask) – ScheduledTask to store- Raises:
ValueError – If task with same ID already exists
- Return type:
None
- abstractmethod async update(task)[source]
Update an existing scheduled task.
- Parameters:
task (
ScheduledTask) – ScheduledTask with updated fields- Raises:
ValueError – If task doesn’t exist
- Return type:
None
- abstractmethod async delete(task_id)[source]
Delete a scheduled task.
- Parameters:
task_id (
str) – ID of task to delete- Return type:
bool- Returns:
True if deleted, False if not found
- abstractmethod async get(task_id)[source]
Get a scheduled task by ID.
- Parameters:
task_id (
str) – Task ID to retrieve- Return type:
ScheduledTask|None- Returns:
ScheduledTask if found, None otherwise
- abstractmethod async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]- Returns:
List of all ScheduledTask objects
- abstractmethod async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]- Returns:
List of enabled ScheduledTask objects
- abstractmethod async list_due(now=None)[source]
List tasks that are due for execution.
- Parameters:
now (
datetime|None) – Current time (defaults to datetime.now())- Return type:
List[ScheduledTask]- Returns:
List of ScheduledTask objects due for execution
- abstractmethod async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Parameters:
task_id (
str) – ID of the tasklast_run (
datetime) – Timestamp of last executionnext_run (
datetime) – Timestamp of next scheduled execution
- Raises:
ValueError – If task doesn’t exist
- Return type:
None
- abstractmethod async clear()[source]
Clear all scheduled tasks.
- Return type:
int- Returns:
Number of tasks deleted
- abstractmethod async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
flowrra.scheduler.backends.mysql module
MySQL backend for scheduler storage.
This backend uses aiomysql for async MySQL access, ideal for distributed/production deployments.
- class flowrra.scheduler.backends.mysql.MySQLSchedulerBackend(url)[source]
Bases:
BaseSchedulerBackendMySQL-based scheduler backend.
- Features:
High-performance async operations via aiomysql
Connection pooling
Transaction support
JSON column support
Good for distributed/production deployments
Compatible with MariaDB
- Parameters:
url (
str) – MySQL connection URL Format: mysql://user:pass@host:port/dbname
- __init__(url)[source]
Initialize MySQL backend.
- Parameters:
url (
str) – MySQL connection URL- Raises:
ImportError – If aiomysql is not installed
- async get(task_id)[source]
Get a scheduled task by ID.
- Return type:
ScheduledTask|None
- async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_due(now=None)[source]
List tasks that are due for execution.
- Return type:
List[ScheduledTask]
- async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Return type:
None
- async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
flowrra.scheduler.backends.postgresql module
PostgreSQL backend for scheduler storage.
This backend uses asyncpg for high-performance async PostgreSQL access, ideal for distributed/production deployments.
- class flowrra.scheduler.backends.postgresql.PostgreSQLSchedulerBackend(url)[source]
Bases:
BaseSchedulerBackendPostgreSQL-based scheduler backend.
- Features:
High-performance async operations via asyncpg
Connection pooling
Transaction support
JSONB for efficient JSON storage and queries
Good for distributed/production deployments
- Parameters:
url (
str) – PostgreSQL connection URL Format: postgresql://user:pass@host:port/dbname
- __init__(url)[source]
Initialize PostgreSQL backend.
- Parameters:
url (
str) – PostgreSQL connection URL- Raises:
ImportError – If asyncpg is not installed
- async get(task_id)[source]
Get a scheduled task by ID.
- Return type:
ScheduledTask|None
- async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_due(now=None)[source]
List tasks that are due for execution.
- Return type:
List[ScheduledTask]
- async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Return type:
None
- async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
flowrra.scheduler.backends.sqlite module
SQLite backend for scheduler storage.
- class flowrra.scheduler.backends.sqlite.SQLiteSchedulerBackend(database_path='.flowrra_schedule.db')[source]
Bases:
BaseSchedulerBackendSQLite-based scheduler backend for local persistent storage.
- Features:
Local file-based storage
No external dependencies
Automatic schema creation
Transaction support
Good for single-instance deployments
- Parameters:
database_path (
str) – Path to SQLite database file (defaults to “.flowrra_schedule.db”)
- __init__(database_path='.flowrra_schedule.db')[source]
Initialize SQLite backend.
- Parameters:
database_path (
str) – Path to database file
- async get(task_id)[source]
Get a scheduled task by ID.
- Return type:
ScheduledTask|None
- async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_due(now=None)[source]
List tasks that are due for execution.
- Return type:
List[ScheduledTask]
- async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Return type:
None
- async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
Module contents
Scheduler storage backends.
- class flowrra.scheduler.backends.BaseSchedulerBackend[source]
Bases:
ABCAbstract base class for scheduler storage backends.
Backends are responsible for persisting scheduled task definitions and providing query capabilities for the scheduler service.
- abstractmethod async create(task)[source]
Create a new scheduled task.
- Parameters:
task (
ScheduledTask) – ScheduledTask to store- Raises:
ValueError – If task with same ID already exists
- Return type:
None
- abstractmethod async update(task)[source]
Update an existing scheduled task.
- Parameters:
task (
ScheduledTask) – ScheduledTask with updated fields- Raises:
ValueError – If task doesn’t exist
- Return type:
None
- abstractmethod async delete(task_id)[source]
Delete a scheduled task.
- Parameters:
task_id (
str) – ID of task to delete- Return type:
bool- Returns:
True if deleted, False if not found
- abstractmethod async get(task_id)[source]
Get a scheduled task by ID.
- Parameters:
task_id (
str) – Task ID to retrieve- Return type:
ScheduledTask|None- Returns:
ScheduledTask if found, None otherwise
- abstractmethod async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]- Returns:
List of all ScheduledTask objects
- abstractmethod async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]- Returns:
List of enabled ScheduledTask objects
- abstractmethod async list_due(now=None)[source]
List tasks that are due for execution.
- Parameters:
now (
datetime|None) – Current time (defaults to datetime.now())- Return type:
List[ScheduledTask]- Returns:
List of ScheduledTask objects due for execution
- abstractmethod async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Parameters:
task_id (
str) – ID of the tasklast_run (
datetime) – Timestamp of last executionnext_run (
datetime) – Timestamp of next scheduled execution
- Raises:
ValueError – If task doesn’t exist
- Return type:
None
- abstractmethod async clear()[source]
Clear all scheduled tasks.
- Return type:
int- Returns:
Number of tasks deleted
- abstractmethod async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
- class flowrra.scheduler.backends.SQLiteSchedulerBackend(database_path='.flowrra_schedule.db')[source]
Bases:
BaseSchedulerBackendSQLite-based scheduler backend for local persistent storage.
- Features:
Local file-based storage
No external dependencies
Automatic schema creation
Transaction support
Good for single-instance deployments
- Parameters:
database_path (
str) – Path to SQLite database file (defaults to “.flowrra_schedule.db”)
- __init__(database_path='.flowrra_schedule.db')[source]
Initialize SQLite backend.
- Parameters:
database_path (
str) – Path to database file
- async get(task_id)[source]
Get a scheduled task by ID.
- Return type:
ScheduledTask|None
- async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_due(now=None)[source]
List tasks that are due for execution.
- Return type:
List[ScheduledTask]
- async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Return type:
None
- async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
- class flowrra.scheduler.backends.PostgreSQLSchedulerBackend(url)[source]
Bases:
BaseSchedulerBackendPostgreSQL-based scheduler backend.
- Features:
High-performance async operations via asyncpg
Connection pooling
Transaction support
JSONB for efficient JSON storage and queries
Good for distributed/production deployments
- Parameters:
url (
str) – PostgreSQL connection URL Format: postgresql://user:pass@host:port/dbname
- __init__(url)[source]
Initialize PostgreSQL backend.
- Parameters:
url (
str) – PostgreSQL connection URL- Raises:
ImportError – If asyncpg is not installed
- async get(task_id)[source]
Get a scheduled task by ID.
- Return type:
ScheduledTask|None
- async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_due(now=None)[source]
List tasks that are due for execution.
- Return type:
List[ScheduledTask]
- async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Return type:
None
- async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
- class flowrra.scheduler.backends.MySQLSchedulerBackend(url)[source]
Bases:
BaseSchedulerBackendMySQL-based scheduler backend.
- Features:
High-performance async operations via aiomysql
Connection pooling
Transaction support
JSON column support
Good for distributed/production deployments
Compatible with MariaDB
- Parameters:
url (
str) – MySQL connection URL Format: mysql://user:pass@host:port/dbname
- __init__(url)[source]
Initialize MySQL backend.
- Parameters:
url (
str) – MySQL connection URL- Raises:
ImportError – If aiomysql is not installed
- async get(task_id)[source]
Get a scheduled task by ID.
- Return type:
ScheduledTask|None
- async list_all()[source]
List all scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_enabled()[source]
List only enabled scheduled tasks.
- Return type:
List[ScheduledTask]
- async list_due(now=None)[source]
List tasks that are due for execution.
- Return type:
List[ScheduledTask]
- async update_run_times(task_id, last_run, next_run)[source]
Update task run times after execution.
- Return type:
None
- async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]
Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
- Parameters:
task_name (
str) – Task name to matchschedule_type (
ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)schedule (
str) – Schedule expression/valueargs (
tuple) – Task arguments tuplekwargs (
dict|None) – Task keyword arguments dict
- Return type:
ScheduledTask|None- Returns:
Matching ScheduledTask if found, None otherwise
- flowrra.scheduler.backends.get_scheduler_backend(url=None)[source]
Create a scheduler backend from a database URL.
- Parameters:
url (
str|None) – Database connection URL or None for default SQLite- Return type:
- Returns:
Scheduler backend instance
- Raises:
ValueError – If URL scheme is unsupported
- Supported URLs:
None or “sqlite://…” - SQLite backend (default: .flowrra_schedule.db)
“postgresql://…” - PostgreSQL backend
“mysql://…” - MySQL backend
Examples
# Default SQLite backend = get_scheduler_backend()
# Custom SQLite path backend = get_scheduler_backend(“sqlite:///path/to/schedule.db”)
# PostgreSQL backend = get_scheduler_backend(“postgresql://user:pass@localhost/flowrra”)
# MySQL backend = get_scheduler_backend(“mysql://user:pass@localhost/flowrra”)