flowrra.scheduler package

Subpackages

Submodules

flowrra.scheduler.cron module

Cron expression parser and evaluator.

Supports standard cron syntax:
  • Minute (0-59)

  • Hour (0-23)

  • Day of month (1-31)

  • Month (1-12)

  • Day of week (0-7, where 0 and 7 are Sunday)

Special characters:
    • (any value)

  • , (value list separator)

    • (range of values)

  • / (step values)

Examples

*/5 * * * *” - Every 5 minutes “0 */2 * * *” - Every 2 hours “0 9 * * 1-5” - 9 AM on weekdays “0 0 1 * *” - First day of every month at midnight

class flowrra.scheduler.cron.CronExpression(expression)[source]

Bases: object

Parse and evaluate cron expressions.

__init__(expression)[source]

Initialize cron expression.

Parameters:

expression (str) – Cron expression string (5 fields: minute hour day month weekday)

Raises:

ValueError – If expression format is invalid

matches(dt)[source]

Check if datetime matches the cron expression.

Parameters:

dt (datetime) – Datetime to check

Return type:

bool

Returns:

True if datetime matches the cron schedule

next_run(after=None)[source]

Calculate the next run time after the given datetime.

Parameters:

after (datetime | None) – Starting datetime (defaults to now)

Return type:

datetime

Returns:

Next datetime matching the cron expression

__str__()[source]

String representation.

Return type:

str

__repr__()[source]

Developer representation.

Return type:

str

flowrra.scheduler.models module

Scheduler models for persistent task scheduling.

class flowrra.scheduler.models.ScheduleType(value)[source]

Bases: Enum

Type of schedule for a task.

CRON = 'cron'
INTERVAL = 'interval'
ONE_TIME = 'one_time'
class flowrra.scheduler.models.ScheduledTask(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)[source]

Bases: object

A task scheduled for execution.

id

Unique identifier for scheduled task

task_name

Name of the registered task to execute

schedule_type

Type of schedule (cron, interval, one_time)

schedule

Schedule definition (cron expression or interval in seconds)

args

Positional arguments for task execution

kwargs

Keyword arguments for task execution

enabled

Whether the scheduled task is active

last_run_at

Timestamp of last execution

next_run_at

Timestamp of next scheduled execution

created_at

When the scheduled task was created

updated_at

When the scheduled task was last updated

description

Optional description of the task

max_retries

Maximum retry attempts on failure

retry_delay

Delay between retries in seconds

priority

Task priority (higher = more important)

id: str
task_name: str
schedule_type: ScheduleType
schedule: str
args: tuple
kwargs: dict
enabled: bool = True
last_run_at: datetime | None = None
next_run_at: datetime | None = None
created_at: datetime
updated_at: datetime
description: str | None = None
max_retries: int = 3
retry_delay: float = 1.0
priority: int = 0
to_dict()[source]

Convert to dictionary for storage.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Create from dictionary.

Return type:

ScheduledTask

__init__(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)

flowrra.scheduler.scheduler module

Scheduler service for managing and executing scheduled tasks.

class flowrra.scheduler.scheduler.Scheduler(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]

Bases: object

Scheduler service for persistent task scheduling.

Features:
  • Cron-based scheduling

  • Interval-based scheduling

  • One-time task scheduling

  • Persistent storage across restarts

  • Automatic next-run calculation

  • Task priority support

  • Automatic routing to IOExecutor or CPUExecutor

Parameters:
  • backend (BaseSchedulerBackend) – Scheduler storage backend

  • registry (TaskRegistry) – Task registry for resolving task names

  • check_interval (float) – How often to check for due tasks (seconds)

  • io_executor (IOExecutor | None) – IOExecutor for async tasks (optional)

  • cpu_executor (CPUExecutor | None) – CPUExecutor for CPU-bound tasks (optional)

Example

# Automatic integration with Flowrra app app = Flowrra.from_urls() scheduler = app.create_scheduler()

# Manual setup scheduler = Scheduler(

backend=backend, registry=registry, io_executor=io_executor, cpu_executor=cpu_executor

)

__init__(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]

Initialize scheduler.

Parameters:
  • backend (BaseSchedulerBackend) – Storage backend for scheduled tasks

  • registry (TaskRegistry) – Task registry for looking up task functions

  • check_interval (float) – Fallback interval in seconds (used on errors). The scheduler uses dynamic sleep calculation by default, waking up precisely when tasks are due. This parameter is only used as a fallback when sleep calculation fails.

  • io_executor (IOExecutor | None) – IOExecutor instance for async tasks

  • cpu_executor (CPUExecutor | None) – CPUExecutor instance for CPU-bound tasks

Note

The scheduler automatically calculates optimal sleep duration based on when the next task is scheduled to run, with a maximum sleep of 1 hour to periodically refresh the schedule list.

set_submit_callback(callback)[source]

Set callback for submitting tasks to executor.

LEGACY API: This method exists for backward compatibility and testing purposes. For production use, prefer passing executor instances to the constructor:

scheduler = Scheduler(

backend=backend, io_executor=io_executor, cpu_executor=cpu_executor

)

The callback approach bypasses executor-based task routing and requires manual implementation of submission logic. Use only when: - Writing unit tests that need to intercept submissions - Implementing custom submission logic outside the executor pattern

Parameters:

callback (Callable) – Async function that accepts (task_func, *args, **kwargs)

Return type:

None

Example (Testing):
async def track_submission(task_func, *args, **kwargs):

submitted_tasks.append(task_func.__name__)

scheduler.set_submit_callback(track_submission)

Example (Modern Approach - Recommended):

# Don’t use callback - pass executors instead scheduler = Scheduler(backend, io_executor=io_executor) # Tasks automatically routed to correct executor

async start()[source]

Start the scheduler service.

Return type:

None

async stop()[source]

Stop the scheduler service.

Return type:

None

async schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]

Schedule a task using cron expression.

Automatically idempotent: if a schedule with the same task_name, cron, args, and kwargs already exists, returns the existing schedule ID.

Parameters:
  • task_name (str) – Name of registered task to execute

  • cron (str) – Cron expression (e.g., “0 9 * * *” for daily at 9 AM)

  • args (tuple) – Positional arguments for task

  • kwargs (dict | None) – Keyword arguments for task

  • enabled (bool) – Whether task is enabled

  • description (str | None) – Optional task description

  • max_retries (int) – Maximum retry attempts

  • retry_delay (float) – Delay between retries in seconds

  • priority (int) – Task priority (higher = more important)

  • task_id (str | None) – Optional custom task ID (generated if not provided)

Return type:

str

Returns:

Task ID (existing or newly created)

Raises:

ValueError – If cron expression is invalid or task not registered

async schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]

Schedule a task to run at fixed intervals.

Automatically idempotent: if a schedule with the same task_name, interval, args, and kwargs already exists, returns the existing schedule ID.

Parameters:
  • task_name (str) – Name of registered task to execute

  • interval (float) – Interval in seconds between executions

  • args (tuple) – Positional arguments for task

  • kwargs (dict | None) – Keyword arguments for task

  • enabled (bool) – Whether task is enabled

  • description (str | None) – Optional task description

  • max_retries (int) – Maximum retry attempts

  • retry_delay (float) – Delay between retries in seconds

  • priority (int) – Task priority (higher = more important)

  • task_id (str | None) – Optional custom task ID (generated if not provided)

Return type:

str

Returns:

Task ID (existing or newly created)

Raises:

ValueError – If interval is invalid or task not registered

async schedule_once(task_name, run_at, args=(), kwargs=None, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]

Schedule a task to run once at a specific time.

Automatically idempotent: if a schedule with the same task_name, run_at, args, and kwargs already exists, returns the existing schedule ID.

Parameters:
  • task_name (str) – Name of registered task to execute

  • run_at (datetime) – When to run the task

  • args (tuple) – Positional arguments for task

  • kwargs (dict | None) – Keyword arguments for task

  • description (str | None) – Optional task description

  • max_retries (int) – Maximum retry attempts

  • retry_delay (float) – Delay between retries in seconds

  • priority (int) – Task priority (higher = more important)

  • task_id (str | None) – Optional custom task ID (generated if not provided)

Return type:

str

Returns:

Task ID (existing or newly created)

Raises:

ValueError – If task not registered

async unschedule(task_id)[source]

Remove a scheduled task.

Parameters:

task_id (str) – ID of task to remove

Return type:

bool

Returns:

True if deleted, False if not found

async enable_task(task_id)[source]

Enable a scheduled task.

Parameters:

task_id (str) – ID of task to enable

Raises:

ValueError – If task not found

Return type:

None

async disable_task(task_id)[source]

Disable a scheduled task.

Parameters:

task_id (str) – ID of task to disable

Raises:

ValueError – If task not found

Return type:

None

async get_scheduled_task(task_id)[source]

Get a scheduled task by ID.

Parameters:

task_id (str) – Task ID

Return type:

ScheduledTask | None

Returns:

ScheduledTask if found, None otherwise

async list_scheduled_tasks()[source]

List all scheduled tasks.

Return type:

list[ScheduledTask]

Returns:

List of all ScheduledTask objects

property is_running: bool

Check if scheduler is running.

Module contents

Persistent task scheduling for Flowrra.

This module provides Celery Beat-like scheduling capabilities with support for cron expressions, intervals, and one-time task execution.

Example

from flowrra import Flowrra from flowrra.scheduler import Scheduler from flowrra.scheduler.backends import get_scheduler_backend

# Create Flowrra app app = Flowrra.from_urls()

# Create scheduler with SQLite backend (default) backend = get_scheduler_backend() scheduler = Scheduler(backend=backend, registry=app.registry)

# Schedule a task with cron expression await scheduler.schedule_cron(

task_name=”my_task”, cron=”0 9 * * *”, # Daily at 9 AM args=(1, 2), kwargs={“foo”: “bar”}

)

# Start scheduler await scheduler.start()

class flowrra.scheduler.Scheduler(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]

Bases: object

Scheduler service for persistent task scheduling.

Features:
  • Cron-based scheduling

  • Interval-based scheduling

  • One-time task scheduling

  • Persistent storage across restarts

  • Automatic next-run calculation

  • Task priority support

  • Automatic routing to IOExecutor or CPUExecutor

Parameters:
  • backend (BaseSchedulerBackend) – Scheduler storage backend

  • registry (TaskRegistry) – Task registry for resolving task names

  • check_interval (float) – How often to check for due tasks (seconds)

  • io_executor (IOExecutor | None) – IOExecutor for async tasks (optional)

  • cpu_executor (CPUExecutor | None) – CPUExecutor for CPU-bound tasks (optional)

Example

# Automatic integration with Flowrra app app = Flowrra.from_urls() scheduler = app.create_scheduler()

# Manual setup scheduler = Scheduler(

backend=backend, registry=registry, io_executor=io_executor, cpu_executor=cpu_executor

)

__init__(backend, registry, check_interval=60.0, io_executor=None, cpu_executor=None)[source]

Initialize scheduler.

Parameters:
  • backend (BaseSchedulerBackend) – Storage backend for scheduled tasks

  • registry (TaskRegistry) – Task registry for looking up task functions

  • check_interval (float) – Fallback interval in seconds (used on errors). The scheduler uses dynamic sleep calculation by default, waking up precisely when tasks are due. This parameter is only used as a fallback when sleep calculation fails.

  • io_executor (IOExecutor | None) – IOExecutor instance for async tasks

  • cpu_executor (CPUExecutor | None) – CPUExecutor instance for CPU-bound tasks

Note

The scheduler automatically calculates optimal sleep duration based on when the next task is scheduled to run, with a maximum sleep of 1 hour to periodically refresh the schedule list.

set_submit_callback(callback)[source]

Set callback for submitting tasks to executor.

LEGACY API: This method exists for backward compatibility and testing purposes. For production use, prefer passing executor instances to the constructor:

scheduler = Scheduler(

backend=backend, io_executor=io_executor, cpu_executor=cpu_executor

)

The callback approach bypasses executor-based task routing and requires manual implementation of submission logic. Use only when: - Writing unit tests that need to intercept submissions - Implementing custom submission logic outside the executor pattern

Parameters:

callback (Callable) – Async function that accepts (task_func, *args, **kwargs)

Return type:

None

Example (Testing):
async def track_submission(task_func, *args, **kwargs):

submitted_tasks.append(task_func.__name__)

scheduler.set_submit_callback(track_submission)

Example (Modern Approach - Recommended):

# Don’t use callback - pass executors instead scheduler = Scheduler(backend, io_executor=io_executor) # Tasks automatically routed to correct executor

async start()[source]

Start the scheduler service.

Return type:

None

async stop()[source]

Stop the scheduler service.

Return type:

None

async schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]

Schedule a task using cron expression.

Automatically idempotent: if a schedule with the same task_name, cron, args, and kwargs already exists, returns the existing schedule ID.

Parameters:
  • task_name (str) – Name of registered task to execute

  • cron (str) – Cron expression (e.g., “0 9 * * *” for daily at 9 AM)

  • args (tuple) – Positional arguments for task

  • kwargs (dict | None) – Keyword arguments for task

  • enabled (bool) – Whether task is enabled

  • description (str | None) – Optional task description

  • max_retries (int) – Maximum retry attempts

  • retry_delay (float) – Delay between retries in seconds

  • priority (int) – Task priority (higher = more important)

  • task_id (str | None) – Optional custom task ID (generated if not provided)

Return type:

str

Returns:

Task ID (existing or newly created)

Raises:

ValueError – If cron expression is invalid or task not registered

async schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]

Schedule a task to run at fixed intervals.

Automatically idempotent: if a schedule with the same task_name, interval, args, and kwargs already exists, returns the existing schedule ID.

Parameters:
  • task_name (str) – Name of registered task to execute

  • interval (float) – Interval in seconds between executions

  • args (tuple) – Positional arguments for task

  • kwargs (dict | None) – Keyword arguments for task

  • enabled (bool) – Whether task is enabled

  • description (str | None) – Optional task description

  • max_retries (int) – Maximum retry attempts

  • retry_delay (float) – Delay between retries in seconds

  • priority (int) – Task priority (higher = more important)

  • task_id (str | None) – Optional custom task ID (generated if not provided)

Return type:

str

Returns:

Task ID (existing or newly created)

Raises:

ValueError – If interval is invalid or task not registered

async schedule_once(task_name, run_at, args=(), kwargs=None, description=None, max_retries=3, retry_delay=1.0, priority=0, task_id=None)[source]

Schedule a task to run once at a specific time.

Automatically idempotent: if a schedule with the same task_name, run_at, args, and kwargs already exists, returns the existing schedule ID.

Parameters:
  • task_name (str) – Name of registered task to execute

  • run_at (datetime) – When to run the task

  • args (tuple) – Positional arguments for task

  • kwargs (dict | None) – Keyword arguments for task

  • description (str | None) – Optional task description

  • max_retries (int) – Maximum retry attempts

  • retry_delay (float) – Delay between retries in seconds

  • priority (int) – Task priority (higher = more important)

  • task_id (str | None) – Optional custom task ID (generated if not provided)

Return type:

str

Returns:

Task ID (existing or newly created)

Raises:

ValueError – If task not registered

async unschedule(task_id)[source]

Remove a scheduled task.

Parameters:

task_id (str) – ID of task to remove

Return type:

bool

Returns:

True if deleted, False if not found

async enable_task(task_id)[source]

Enable a scheduled task.

Parameters:

task_id (str) – ID of task to enable

Raises:

ValueError – If task not found

Return type:

None

async disable_task(task_id)[source]

Disable a scheduled task.

Parameters:

task_id (str) – ID of task to disable

Raises:

ValueError – If task not found

Return type:

None

async get_scheduled_task(task_id)[source]

Get a scheduled task by ID.

Parameters:

task_id (str) – Task ID

Return type:

ScheduledTask | None

Returns:

ScheduledTask if found, None otherwise

async list_scheduled_tasks()[source]

List all scheduled tasks.

Return type:

list[ScheduledTask]

Returns:

List of all ScheduledTask objects

property is_running: bool

Check if scheduler is running.

class flowrra.scheduler.ScheduledTask(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)[source]

Bases: object

A task scheduled for execution.

id

Unique identifier for scheduled task

task_name

Name of the registered task to execute

schedule_type

Type of schedule (cron, interval, one_time)

schedule

Schedule definition (cron expression or interval in seconds)

args

Positional arguments for task execution

kwargs

Keyword arguments for task execution

enabled

Whether the scheduled task is active

last_run_at

Timestamp of last execution

next_run_at

Timestamp of next scheduled execution

created_at

When the scheduled task was created

updated_at

When the scheduled task was last updated

description

Optional description of the task

max_retries

Maximum retry attempts on failure

retry_delay

Delay between retries in seconds

priority

Task priority (higher = more important)

id: str
task_name: str
schedule_type: ScheduleType
schedule: str
args: tuple
kwargs: dict
enabled: bool = True
last_run_at: datetime | None = None
next_run_at: datetime | None = None
created_at: datetime
updated_at: datetime
description: str | None = None
max_retries: int = 3
retry_delay: float = 1.0
priority: int = 0
to_dict()[source]

Convert to dictionary for storage.

Return type:

dict[str, Any]

classmethod from_dict(data)[source]

Create from dictionary.

Return type:

ScheduledTask

__init__(id, task_name, schedule_type, schedule, args=<factory>, kwargs=<factory>, enabled=True, last_run_at=None, next_run_at=None, created_at=<factory>, updated_at=<factory>, description=None, max_retries=3, retry_delay=1.0, priority=0)
class flowrra.scheduler.ScheduleType(value)[source]

Bases: Enum

Type of schedule for a task.

CRON = 'cron'
INTERVAL = 'interval'
ONE_TIME = 'one_time'
class flowrra.scheduler.CronExpression(expression)[source]

Bases: object

Parse and evaluate cron expressions.

__init__(expression)[source]

Initialize cron expression.

Parameters:

expression (str) – Cron expression string (5 fields: minute hour day month weekday)

Raises:

ValueError – If expression format is invalid

matches(dt)[source]

Check if datetime matches the cron expression.

Parameters:

dt (datetime) – Datetime to check

Return type:

bool

Returns:

True if datetime matches the cron schedule

next_run(after=None)[source]

Calculate the next run time after the given datetime.

Parameters:

after (datetime | None) – Starting datetime (defaults to now)

Return type:

datetime

Returns:

Next datetime matching the cron expression

__str__()[source]

String representation.

Return type:

str

__repr__()[source]

Developer representation.

Return type:

str