flowrra.scheduler.backends package

Submodules

flowrra.scheduler.backends.base module

Base interface for scheduler storage backends.

class flowrra.scheduler.backends.base.BaseSchedulerBackend[source]

Bases: ABC

Abstract base class for scheduler storage backends.

Backends are responsible for persisting scheduled task definitions and providing query capabilities for the scheduler service.

abstractmethod async create(task)[source]

Create a new scheduled task.

Parameters:

task (ScheduledTask) – ScheduledTask to store

Raises:

ValueError – If task with same ID already exists

Return type:

None

abstractmethod async update(task)[source]

Update an existing scheduled task.

Parameters:

task (ScheduledTask) – ScheduledTask with updated fields

Raises:

ValueError – If task doesn’t exist

Return type:

None

abstractmethod async delete(task_id)[source]

Delete a scheduled task.

Parameters:

task_id (str) – ID of task to delete

Return type:

bool

Returns:

True if deleted, False if not found

abstractmethod async get(task_id)[source]

Get a scheduled task by ID.

Parameters:

task_id (str) – Task ID to retrieve

Return type:

ScheduledTask | None

Returns:

ScheduledTask if found, None otherwise

abstractmethod async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

Returns:

List of all ScheduledTask objects

abstractmethod async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

Returns:

List of enabled ScheduledTask objects

abstractmethod async list_due(now=None)[source]

List tasks that are due for execution.

Parameters:

now (datetime | None) – Current time (defaults to datetime.now())

Return type:

List[ScheduledTask]

Returns:

List of ScheduledTask objects due for execution

abstractmethod async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Parameters:
  • task_id (str) – ID of the task

  • last_run (datetime) – Timestamp of last execution

  • next_run (datetime) – Timestamp of next scheduled execution

Raises:

ValueError – If task doesn’t exist

Return type:

None

abstractmethod async clear()[source]

Clear all scheduled tasks.

Return type:

int

Returns:

Number of tasks deleted

abstractmethod async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

abstractmethod async close()[source]

Close backend connections and cleanup resources.

Return type:

None

flowrra.scheduler.backends.mysql module

MySQL backend for scheduler storage.

This backend uses aiomysql for async MySQL access, ideal for distributed/production deployments.

class flowrra.scheduler.backends.mysql.MySQLSchedulerBackend(url)[source]

Bases: BaseSchedulerBackend

MySQL-based scheduler backend.

Features:
  • High-performance async operations via aiomysql

  • Connection pooling

  • Transaction support

  • JSON column support

  • Good for distributed/production deployments

  • Compatible with MariaDB

Parameters:

url (str) – MySQL connection URL Format: mysql://user:pass@host:port/dbname

__init__(url)[source]

Initialize MySQL backend.

Parameters:

url (str) – MySQL connection URL

Raises:

ImportError – If aiomysql is not installed

async create(task)[source]

Create a new scheduled task.

Return type:

None

async update(task)[source]

Update an existing scheduled task.

Return type:

None

async delete(task_id)[source]

Delete a scheduled task.

Return type:

bool

async get(task_id)[source]

Get a scheduled task by ID.

Return type:

ScheduledTask | None

async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

async list_due(now=None)[source]

List tasks that are due for execution.

Return type:

List[ScheduledTask]

async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Return type:

None

async clear()[source]

Clear all scheduled tasks.

Return type:

int

async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

async close()[source]

Close database connection pool.

Return type:

None

flowrra.scheduler.backends.postgresql module

PostgreSQL backend for scheduler storage.

This backend uses asyncpg for high-performance async PostgreSQL access, ideal for distributed/production deployments.

class flowrra.scheduler.backends.postgresql.PostgreSQLSchedulerBackend(url)[source]

Bases: BaseSchedulerBackend

PostgreSQL-based scheduler backend.

Features:
  • High-performance async operations via asyncpg

  • Connection pooling

  • Transaction support

  • JSONB for efficient JSON storage and queries

  • Good for distributed/production deployments

Parameters:

url (str) – PostgreSQL connection URL Format: postgresql://user:pass@host:port/dbname

__init__(url)[source]

Initialize PostgreSQL backend.

Parameters:

url (str) – PostgreSQL connection URL

Raises:

ImportError – If asyncpg is not installed

async create(task)[source]

Create a new scheduled task.

Return type:

None

async update(task)[source]

Update an existing scheduled task.

Return type:

None

async delete(task_id)[source]

Delete a scheduled task.

Return type:

bool

async get(task_id)[source]

Get a scheduled task by ID.

Return type:

ScheduledTask | None

async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

async list_due(now=None)[source]

List tasks that are due for execution.

Return type:

List[ScheduledTask]

async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Return type:

None

async clear()[source]

Clear all scheduled tasks.

Return type:

int

async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

async close()[source]

Close database connection pool.

Return type:

None

flowrra.scheduler.backends.sqlite module

SQLite backend for scheduler storage.

class flowrra.scheduler.backends.sqlite.SQLiteSchedulerBackend(database_path='.flowrra_schedule.db')[source]

Bases: BaseSchedulerBackend

SQLite-based scheduler backend for local persistent storage.

Features:
  • Local file-based storage

  • No external dependencies

  • Automatic schema creation

  • Transaction support

  • Good for single-instance deployments

Parameters:

database_path (str) – Path to SQLite database file (defaults to “.flowrra_schedule.db”)

__init__(database_path='.flowrra_schedule.db')[source]

Initialize SQLite backend.

Parameters:

database_path (str) – Path to database file

async create(task)[source]

Create a new scheduled task.

Return type:

None

async update(task)[source]

Update an existing scheduled task.

Return type:

None

async delete(task_id)[source]

Delete a scheduled task.

Return type:

bool

async get(task_id)[source]

Get a scheduled task by ID.

Return type:

ScheduledTask | None

async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

async list_due(now=None)[source]

List tasks that are due for execution.

Return type:

List[ScheduledTask]

async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Return type:

None

async clear()[source]

Clear all scheduled tasks.

Return type:

int

async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

async close()[source]

Close database connection.

Return type:

None

Module contents

Scheduler storage backends.

class flowrra.scheduler.backends.BaseSchedulerBackend[source]

Bases: ABC

Abstract base class for scheduler storage backends.

Backends are responsible for persisting scheduled task definitions and providing query capabilities for the scheduler service.

abstractmethod async create(task)[source]

Create a new scheduled task.

Parameters:

task (ScheduledTask) – ScheduledTask to store

Raises:

ValueError – If task with same ID already exists

Return type:

None

abstractmethod async update(task)[source]

Update an existing scheduled task.

Parameters:

task (ScheduledTask) – ScheduledTask with updated fields

Raises:

ValueError – If task doesn’t exist

Return type:

None

abstractmethod async delete(task_id)[source]

Delete a scheduled task.

Parameters:

task_id (str) – ID of task to delete

Return type:

bool

Returns:

True if deleted, False if not found

abstractmethod async get(task_id)[source]

Get a scheduled task by ID.

Parameters:

task_id (str) – Task ID to retrieve

Return type:

ScheduledTask | None

Returns:

ScheduledTask if found, None otherwise

abstractmethod async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

Returns:

List of all ScheduledTask objects

abstractmethod async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

Returns:

List of enabled ScheduledTask objects

abstractmethod async list_due(now=None)[source]

List tasks that are due for execution.

Parameters:

now (datetime | None) – Current time (defaults to datetime.now())

Return type:

List[ScheduledTask]

Returns:

List of ScheduledTask objects due for execution

abstractmethod async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Parameters:
  • task_id (str) – ID of the task

  • last_run (datetime) – Timestamp of last execution

  • next_run (datetime) – Timestamp of next scheduled execution

Raises:

ValueError – If task doesn’t exist

Return type:

None

abstractmethod async clear()[source]

Clear all scheduled tasks.

Return type:

int

Returns:

Number of tasks deleted

abstractmethod async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

abstractmethod async close()[source]

Close backend connections and cleanup resources.

Return type:

None

class flowrra.scheduler.backends.SQLiteSchedulerBackend(database_path='.flowrra_schedule.db')[source]

Bases: BaseSchedulerBackend

SQLite-based scheduler backend for local persistent storage.

Features:
  • Local file-based storage

  • No external dependencies

  • Automatic schema creation

  • Transaction support

  • Good for single-instance deployments

Parameters:

database_path (str) – Path to SQLite database file (defaults to “.flowrra_schedule.db”)

__init__(database_path='.flowrra_schedule.db')[source]

Initialize SQLite backend.

Parameters:

database_path (str) – Path to database file

async create(task)[source]

Create a new scheduled task.

Return type:

None

async update(task)[source]

Update an existing scheduled task.

Return type:

None

async delete(task_id)[source]

Delete a scheduled task.

Return type:

bool

async get(task_id)[source]

Get a scheduled task by ID.

Return type:

ScheduledTask | None

async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

async list_due(now=None)[source]

List tasks that are due for execution.

Return type:

List[ScheduledTask]

async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Return type:

None

async clear()[source]

Clear all scheduled tasks.

Return type:

int

async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

async close()[source]

Close database connection.

Return type:

None

class flowrra.scheduler.backends.PostgreSQLSchedulerBackend(url)[source]

Bases: BaseSchedulerBackend

PostgreSQL-based scheduler backend.

Features:
  • High-performance async operations via asyncpg

  • Connection pooling

  • Transaction support

  • JSONB for efficient JSON storage and queries

  • Good for distributed/production deployments

Parameters:

url (str) – PostgreSQL connection URL Format: postgresql://user:pass@host:port/dbname

__init__(url)[source]

Initialize PostgreSQL backend.

Parameters:

url (str) – PostgreSQL connection URL

Raises:

ImportError – If asyncpg is not installed

async create(task)[source]

Create a new scheduled task.

Return type:

None

async update(task)[source]

Update an existing scheduled task.

Return type:

None

async delete(task_id)[source]

Delete a scheduled task.

Return type:

bool

async get(task_id)[source]

Get a scheduled task by ID.

Return type:

ScheduledTask | None

async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

async list_due(now=None)[source]

List tasks that are due for execution.

Return type:

List[ScheduledTask]

async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Return type:

None

async clear()[source]

Clear all scheduled tasks.

Return type:

int

async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

async close()[source]

Close database connection pool.

Return type:

None

class flowrra.scheduler.backends.MySQLSchedulerBackend(url)[source]

Bases: BaseSchedulerBackend

MySQL-based scheduler backend.

Features:
  • High-performance async operations via aiomysql

  • Connection pooling

  • Transaction support

  • JSON column support

  • Good for distributed/production deployments

  • Compatible with MariaDB

Parameters:

url (str) – MySQL connection URL Format: mysql://user:pass@host:port/dbname

__init__(url)[source]

Initialize MySQL backend.

Parameters:

url (str) – MySQL connection URL

Raises:

ImportError – If aiomysql is not installed

async create(task)[source]

Create a new scheduled task.

Return type:

None

async update(task)[source]

Update an existing scheduled task.

Return type:

None

async delete(task_id)[source]

Delete a scheduled task.

Return type:

bool

async get(task_id)[source]

Get a scheduled task by ID.

Return type:

ScheduledTask | None

async list_all()[source]

List all scheduled tasks.

Return type:

List[ScheduledTask]

async list_enabled()[source]

List only enabled scheduled tasks.

Return type:

List[ScheduledTask]

async list_due(now=None)[source]

List tasks that are due for execution.

Return type:

List[ScheduledTask]

async update_run_times(task_id, last_run, next_run)[source]

Update task run times after execution.

Return type:

None

async clear()[source]

Clear all scheduled tasks.

Return type:

int

async find_by_definition(task_name, schedule_type, schedule, args=(), kwargs=None)[source]

Find exact schedule match by complete definition.

Used for idempotency - finds schedules with identical parameters.

Parameters:
  • task_name (str) – Task name to match

  • schedule_type (ScheduleType) – Type of schedule (CRON, INTERVAL, ONE_TIME)

  • schedule (str) – Schedule expression/value

  • args (tuple) – Task arguments tuple

  • kwargs (dict | None) – Task keyword arguments dict

Return type:

ScheduledTask | None

Returns:

Matching ScheduledTask if found, None otherwise

async close()[source]

Close database connection pool.

Return type:

None

flowrra.scheduler.backends.get_scheduler_backend(url=None)[source]

Create a scheduler backend from a database URL.

Parameters:

url (str | None) – Database connection URL or None for default SQLite

Return type:

BaseSchedulerBackend

Returns:

Scheduler backend instance

Raises:

ValueError – If URL scheme is unsupported

Supported URLs:
  • None or “sqlite://…” - SQLite backend (default: .flowrra_schedule.db)

  • “postgresql://…” - PostgreSQL backend

  • “mysql://…” - MySQL backend

Examples

# Default SQLite backend = get_scheduler_backend()

# Custom SQLite path backend = get_scheduler_backend(“sqlite:///path/to/schedule.db”)

# PostgreSQL backend = get_scheduler_backend(“postgresql://user:pass@localhost/flowrra”)

# MySQL backend = get_scheduler_backend(“mysql://user:pass@localhost/flowrra”)