flowrra package
Subpackages
- flowrra.backends package
- flowrra.brokers package
- flowrra.executors package
- flowrra.management package
- Submodules
- flowrra.management.manager module
FlowrraManagerFlowrraManager.__init__()FlowrraManager.get_stats()FlowrraManager.list_registered_tasks()FlowrraManager.get_task_info()FlowrraManager.list_pending_tasks()FlowrraManager.list_running_tasks()FlowrraManager.list_completed_tasks()FlowrraManager.list_failed_tasks()FlowrraManager.get_task_result()FlowrraManager.list_schedules()FlowrraManager.get_schedule()FlowrraManager.get_scheduler_stats()FlowrraManager.create_schedule_cron()FlowrraManager.create_schedule_interval()FlowrraManager.enable_schedule()FlowrraManager.disable_schedule()FlowrraManager.delete_schedule()FlowrraManager.health_check()
- Module contents
FlowrraManagerFlowrraManager.__init__()FlowrraManager.get_stats()FlowrraManager.list_registered_tasks()FlowrraManager.get_task_info()FlowrraManager.list_pending_tasks()FlowrraManager.list_running_tasks()FlowrraManager.list_completed_tasks()FlowrraManager.list_failed_tasks()FlowrraManager.get_task_result()FlowrraManager.list_schedules()FlowrraManager.get_schedule()FlowrraManager.get_scheduler_stats()FlowrraManager.create_schedule_cron()FlowrraManager.create_schedule_interval()FlowrraManager.enable_schedule()FlowrraManager.disable_schedule()FlowrraManager.delete_schedule()FlowrraManager.health_check()
- flowrra.scheduler package
- Subpackages
- Submodules
- flowrra.scheduler.cron module
- flowrra.scheduler.models module
ScheduleTypeScheduledTaskScheduledTask.idScheduledTask.task_nameScheduledTask.schedule_typeScheduledTask.scheduleScheduledTask.argsScheduledTask.kwargsScheduledTask.enabledScheduledTask.last_run_atScheduledTask.next_run_atScheduledTask.created_atScheduledTask.updated_atScheduledTask.descriptionScheduledTask.max_retriesScheduledTask.retry_delayScheduledTask.priorityScheduledTask.idScheduledTask.task_nameScheduledTask.schedule_typeScheduledTask.scheduleScheduledTask.argsScheduledTask.kwargsScheduledTask.enabledScheduledTask.last_run_atScheduledTask.next_run_atScheduledTask.created_atScheduledTask.updated_atScheduledTask.descriptionScheduledTask.max_retriesScheduledTask.retry_delayScheduledTask.priorityScheduledTask.to_dict()ScheduledTask.from_dict()ScheduledTask.__init__()
- flowrra.scheduler.scheduler module
SchedulerScheduler.__init__()Scheduler.set_submit_callback()Scheduler.start()Scheduler.stop()Scheduler.schedule_cron()Scheduler.schedule_interval()Scheduler.schedule_once()Scheduler.unschedule()Scheduler.enable_task()Scheduler.disable_task()Scheduler.get_scheduled_task()Scheduler.list_scheduled_tasks()Scheduler.is_running
- Module contents
SchedulerScheduler.__init__()Scheduler.set_submit_callback()Scheduler.start()Scheduler.stop()Scheduler.schedule_cron()Scheduler.schedule_interval()Scheduler.schedule_once()Scheduler.unschedule()Scheduler.enable_task()Scheduler.disable_task()Scheduler.get_scheduled_task()Scheduler.list_scheduled_tasks()Scheduler.is_running
ScheduledTaskScheduledTask.idScheduledTask.task_nameScheduledTask.schedule_typeScheduledTask.scheduleScheduledTask.argsScheduledTask.kwargsScheduledTask.enabledScheduledTask.last_run_atScheduledTask.next_run_atScheduledTask.created_atScheduledTask.updated_atScheduledTask.descriptionScheduledTask.max_retriesScheduledTask.retry_delayScheduledTask.priorityScheduledTask.idScheduledTask.task_nameScheduledTask.schedule_typeScheduledTask.scheduleScheduledTask.argsScheduledTask.kwargsScheduledTask.enabledScheduledTask.last_run_atScheduledTask.next_run_atScheduledTask.created_atScheduledTask.updated_atScheduledTask.descriptionScheduledTask.max_retriesScheduledTask.retry_delayScheduledTask.priorityScheduledTask.to_dict()ScheduledTask.from_dict()ScheduledTask.__init__()
ScheduleTypeCronExpression
- flowrra.ui package
- Subpackages
- Submodules
- flowrra.ui.django module
- flowrra.ui.django_websocket module
- flowrra.ui.fastapi module
- flowrra.ui.flask module
- flowrra.ui.quart_websocket module
- Module contents
BaseUIAdapterBaseUIAdapter.__init__()BaseUIAdapter.get_routes()BaseUIAdapter.templates_dirBaseUIAdapter.static_dirBaseUIAdapter.get_dashboard_data()BaseUIAdapter.get_tasks_page_data()BaseUIAdapter.get_schedules_page_data()BaseUIAdapter.format_datetime()BaseUIAdapter.format_duration()BaseUIAdapter.get_status_color()BaseUIAdapter.get_stats()BaseUIAdapter.health_check()BaseUIAdapter.list_tasks()BaseUIAdapter.get_task()BaseUIAdapter.list_schedules()BaseUIAdapter.get_schedule()BaseUIAdapter.create_schedule_cron()BaseUIAdapter.enable_schedule()BaseUIAdapter.disable_schedule()BaseUIAdapter.delete_schedule()
FormatterUIServiceScheduleService
Submodules
flowrra.app module
- class flowrra.app.Flowrra(config=None)[source]
Bases:
objectUnified Flowrra application for task execution.
Automatically routes tasks to appropriate executors based on task type.
- Usage:
# Using Config config = Config(
broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’)
) app = Flowrra(config=config)
# Using from_urls() app = Flowrra.from_urls(
broker=’redis://localhost:6379/0’, backend=’redis://localhost:6379/1’
)
# I/O-bound task (default) @app.task() async def fetch_data(url: str):
return await fetch(url)
# CPU-bound task @app.task(cpu_bound=True) def heavy_compute(n: int):
return sum(i**2 for i in range(n))
- classmethod from_urls(broker=None, backend=None)[source]
Convenience method to create Flowrra from URLs.
- Parameters:
broker (
str|None) – Broker connection URL (optional, None = asyncio.PriorityQueue)backend (
str|None) – Backend connection URL (optional, None = InMemoryBackend for IO, required for CPU)
- Return type:
- Returns:
Flowrra instance
- task(name=None, cpu_bound=False, **kwargs)[source]
Register a task with automatic executor routing.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)cpu_bound (
bool) – If True, uses CPUExecutor; if False, uses IOExecutor (default)
- Returns:
Decorator function
- async submit(task_func, *args, **kwargs)[source]
Submit a task for execution.
- Parameters:
task_func (
Callable) – Registered task function*args – Positional arguments for the task
**kwargs – Keyword arguments for the task
- Returns:
Unique identifier for tracking
- Return type:
- async wait_for_result(task_id, timeout=None)[source]
Wait for a task to complete and return its result.
- Parameters:
task_id (
str) – Task identifiertimeout (
float|None) – Max seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult with status, result, and error information
- async get_result(task_id)[source]
- Return type:
TaskResult|None
- create_scheduler(backend=None, check_interval=60.0)[source]
Create a scheduler integrated with this app’s executors.
The scheduler is automatically registered with the app and will start/stop with the app lifecycle. Access it via app.scheduler property or use FlowrraManager for comprehensive schedule management.
- Parameters:
backend (BaseSchedulerBackend | str | None) – Scheduler backend (URL string or instance) - None: Uses default SQLite backend (.flowrra_schedule.db) - String: Database URL (e.g., “postgresql://localhost/db”) - Instance: Custom BaseSchedulerBackend instance
check_interval (
float) – How often to check for due tasks (seconds)
- Return type:
Scheduler
- Returns:
Scheduler instance configured with this app’s executors
Example
app = Flowrra.from_urls()
@app.task() async def send_report():
return “Report sent”
# Create integrated scheduler (auto-registered) scheduler = app.create_scheduler()
# Schedule tasks await scheduler.schedule_cron(
task_name=”send_report”, cron=”0 9 * * *”
)
# Start app (automatically starts scheduler too) await app.start()
# Or use FlowrraManager for comprehensive management from flowrra.management import FlowrraManager manager = FlowrraManager(app) schedules = await manager.list_schedules()
- async stop(wait=True, timeout=30.0)[source]
Stop the Flowrra application and all executors.
- Parameters:
wait (
bool) – If True, wait for pending tasks to completetimeout (
float|None) – Max seconds to wait for pending tasks
- property is_running: bool
- property registry
Get the shared task registry.
- Returns:
TaskRegistry instance
Note
Both IOExecutor and CPUExecutor share the same registry instance
flowrra.config module
- class flowrra.config.BrokerConfig(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)[source]
Bases:
object- url: str
- max_connections: int = 50
- socket_timeout: float = 5.0
- retry_on_timeout: bool = True
- queue_key: str | None = None
- create_broker(queue_suffix='')[source]
Create broker instance from this configuration.
- Parameters:
queue_suffix (
str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)- Return type:
BaseBroker
- Returns:
Broker instance
- Raises:
BrokerError – If URL scheme is unsupported or creation fails
- __init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)
- class flowrra.config.BackendConfig(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)[source]
Bases:
object- url: str
- ttl: int | None = None
- max_connections: int = 50
- socket_timeout: float = 5.0
- retry_on_timeout: bool = True
- create_backend()[source]
Create backend instance from this configuration.
- Return type:
BaseResultBackend
- Returns:
Backend instance
- Raises:
BackendError – If URL scheme is unsupported or creation fails
- __init__(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)
- class flowrra.config.ExecutorConfig(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)[source]
Bases:
object- io_workers: int = 4
- cpu_workers: int = 2
- max_queue_size: int = 1000
- max_retries: int = 3
- retry_delay: float = 1.0
- __init__(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)
- class flowrra.config.SchedulerConfig(database_url=None, check_interval=60.0, enabled=False)[source]
Bases:
objectScheduler configuration for persistent task scheduling.
- Parameters:
database_url (
str|None) – Database URL for scheduler storage - SQLite: “sqlite:///path/to/schedule.db” (default: None for .flowrra_schedule.db) - PostgreSQL: “postgresql://user:pass@host/db” - MySQL: “mysql://user:pass@host/db”check_interval (
float) – How often to check for due tasks (seconds)enabled (
bool) – Whether scheduler is enabled
- database_url: str | None = None
- check_interval: float = 60.0
- enabled: bool = False
- create_backend()[source]
Create scheduler backend from configuration.
- Return type:
BaseSchedulerBackend
- Returns:
Scheduler backend instance
- __init__(database_url=None, check_interval=60.0, enabled=False)
- class flowrra.config.Config(broker=None, backend=None, executor=<factory>, scheduler=None)[source]
Bases:
objectMain Flowrra configuration aggregating component configs.
This class brings together broker, backend, executor, and scheduler configurations into a single, structured configuration object. All components are optional with sensible defaults.
- Parameters:
broker (
BrokerConfig|None) – Broker configuration for task queueing (optional, uses asyncio.PriorityQueue if None)backend (
BackendConfig|None) – Backend configuration for result storage (optional)executor (
ExecutorConfig) – Executor configuration (optional, defaults to ExecutorConfig())scheduler (
SchedulerConfig|None) – Scheduler configuration (optional, disabled by default)
- broker: BrokerConfig | None = None
- backend: BackendConfig | None = None
- executor: ExecutorConfig
- scheduler: SchedulerConfig | None = None
- create_broker(queue_suffix='')[source]
Create broker instance from configuration.
- Parameters:
queue_suffix (
str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)- Return type:
BaseBroker | None
- Returns:
Broker instance if configured, None otherwise
- create_backend()[source]
Create backend instance from configuration.
- Return type:
BaseResultBackend
- Returns:
Backend instance if configured, InMemory otherwise
- create_scheduler_backend()[source]
Create scheduler backend from configuration.
- Returns:
Scheduler backend instance if configured, None otherwise
- classmethod from_env(prefix='FLOWRRA_')[source]
Load configuration from environment variables using mappings.
- Return type:
- __init__(broker=None, backend=None, executor=<factory>, scheduler=None)
flowrra.constants module
Constants for Flowrra UI.
flowrra.events module
- class flowrra.events.EventBus[source]
Bases:
objectThread-safe event bus for broadcasting task events.
This implementation uses asyncio.Lock for thread-safe subscriber management and emits events concurrently to all subscribers with error isolation.
- Features:
Thread-safe subscribe/unsubscribe operations
Concurrent event emission to all subscribers
Error isolation (one subscriber’s error doesn’t affect others)
Async-safe list iteration
Example
- async def my_subscriber(event: dict):
print(f”Received: {event}”)
event_bus.subscribe(my_subscriber) await event_bus.emit({‘type’: ‘task.update’, ‘task’: {…}}) await event_bus.unsubscribe(my_subscriber)
- subscribe(fn)[source]
Subscribe to events (sync method for backward compatibility).
Note: This is synchronous but modifies shared state. For async contexts, consider using async_subscribe instead.
- Parameters:
fn (
Callable[[dict],Awaitable[None]]) – Async function that receives event dict
- async async_subscribe(fn)[source]
Subscribe to events (async-safe).
- Parameters:
fn (
Callable[[dict],Awaitable[None]]) – Async function that receives event dict
flowrra.exceptions module
Custom exceptions for flowrra.
- exception flowrra.exceptions.TaskNotFoundError(task_name)[source]
Bases:
FlowrraErrorRaised when a task is not found in the registry.
- exception flowrra.exceptions.TaskTimeoutError(task_id, timeout)[source]
Bases:
FlowrraErrorRaised when waiting for a task result times out.
- exception flowrra.exceptions.ExecutorNotRunningError[source]
Bases:
FlowrraErrorRaised when submitting to a stopped executor.
- exception flowrra.exceptions.BackendError[source]
Bases:
FlowrraErrorRaised when a backend operation fails.
flowrra.registry module
- class flowrra.registry.TaskRegistry(strict_cpu_checks=True)[source]
Bases:
object- __init__(strict_cpu_checks=True)[source]
Initialize the task registry.
- Parameters:
strict_cpu_checks (
bool) – If True, enforce module-level requirement for CPU tasks. Set to False for testing purposes only.
- task(name=None, cpu_bound=False, max_retries=3, retry_delay=1.0)[source]
Decorator to register an async function as a task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)cpu_bound (
bool) – Whether task is CPU-bound (runs in ProcessPoolExecutor)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
flowrra.task module
- class flowrra.task.TaskStatus(value)[source]
Bases:
Enum- PENDING = 'pending'
- RUNNING = 'running'
- SUCCESS = 'success'
- FAILED = 'failed'
- RETRYING = 'retrying'
- class flowrra.task.TaskResult(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)[source]
Bases:
object- task_id: str
- status: TaskStatus
- task_name: str | None = None
- result: Any = None
- error: str | None = None
- submitted_at: datetime | None = None
- started_at: datetime | None = None
- finished_at: datetime | None = None
- retries: int = 0
- args: tuple = ()
- kwargs: dict = None
- property is_complete: bool
- property is_success: bool
- property to_dict: dict
- __init__(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)
- class flowrra.task.Task(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)[source]
Bases:
object- id: str
- name: str
- args: tuple
- kwargs: dict
- max_retries: int = 3
- retry_delay: float = 1.0
- current_retry: int = 0
- priority: int = 0
- cpu_bound: bool = False
- __init__(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)
Module contents
Flowrra - Async task executor built on asyncio.
A lightweight, Celery-inspired task queue using pure Python asyncio.
- Basic usage with Flowrra (unified API):
from flowrra import Flowrra
- app = Flowrra.from_urls(
broker=’redis://localhost:6379/0’, backend=’redis://localhost:6379/1’
)
# I/O-bound task (async) @app.task() async def fetch_data(url: str):
return await fetch(url)
# CPU-bound task (sync) @app.task(cpu_bound=True) def heavy_compute(n: int):
return sum(i ** 2 for i in range(n))
- async def main():
- async with app:
task_id = await app.submit(fetch_data, “https://api.example.com”) result = await app.wait_for_result(task_id) print(result.result)
- Advanced usage with IOExecutor and CPUExecutor:
from flowrra import IOExecutor, CPUExecutor, Config, ExecutorConfig, BackendConfig
# For I/O-bound tasks only config = Config(executor=ExecutorConfig(io_workers=10)) executor = IOExecutor(config=config)
# For CPU-bound tasks (requires Redis backend) # cpu_workers defaults to os.cpu_count() if not specified config = Config(
backend=BackendConfig(url=’redis://localhost:6379/0’), executor=ExecutorConfig(cpu_workers=4) # or omit to use CPU core count
) executor = CPUExecutor(config=config)
- class flowrra.Flowrra(config=None)[source]
Bases:
objectUnified Flowrra application for task execution.
Automatically routes tasks to appropriate executors based on task type.
- Usage:
# Using Config config = Config(
broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’)
) app = Flowrra(config=config)
# Using from_urls() app = Flowrra.from_urls(
broker=’redis://localhost:6379/0’, backend=’redis://localhost:6379/1’
)
# I/O-bound task (default) @app.task() async def fetch_data(url: str):
return await fetch(url)
# CPU-bound task @app.task(cpu_bound=True) def heavy_compute(n: int):
return sum(i**2 for i in range(n))
- classmethod from_urls(broker=None, backend=None)[source]
Convenience method to create Flowrra from URLs.
- Parameters:
broker (
str|None) – Broker connection URL (optional, None = asyncio.PriorityQueue)backend (
str|None) – Backend connection URL (optional, None = InMemoryBackend for IO, required for CPU)
- Return type:
- Returns:
Flowrra instance
- task(name=None, cpu_bound=False, **kwargs)[source]
Register a task with automatic executor routing.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)cpu_bound (
bool) – If True, uses CPUExecutor; if False, uses IOExecutor (default)
- Returns:
Decorator function
- async submit(task_func, *args, **kwargs)[source]
Submit a task for execution.
- Parameters:
task_func (
Callable) – Registered task function*args – Positional arguments for the task
**kwargs – Keyword arguments for the task
- Returns:
Unique identifier for tracking
- Return type:
- async wait_for_result(task_id, timeout=None)[source]
Wait for a task to complete and return its result.
- Parameters:
task_id (
str) – Task identifiertimeout (
float|None) – Max seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult with status, result, and error information
- async get_result(task_id)[source]
- Return type:
TaskResult|None
- create_scheduler(backend=None, check_interval=60.0)[source]
Create a scheduler integrated with this app’s executors.
The scheduler is automatically registered with the app and will start/stop with the app lifecycle. Access it via app.scheduler property or use FlowrraManager for comprehensive schedule management.
- Parameters:
backend (BaseSchedulerBackend | str | None) – Scheduler backend (URL string or instance) - None: Uses default SQLite backend (.flowrra_schedule.db) - String: Database URL (e.g., “postgresql://localhost/db”) - Instance: Custom BaseSchedulerBackend instance
check_interval (
float) – How often to check for due tasks (seconds)
- Return type:
Scheduler
- Returns:
Scheduler instance configured with this app’s executors
Example
app = Flowrra.from_urls()
@app.task() async def send_report():
return “Report sent”
# Create integrated scheduler (auto-registered) scheduler = app.create_scheduler()
# Schedule tasks await scheduler.schedule_cron(
task_name=”send_report”, cron=”0 9 * * *”
)
# Start app (automatically starts scheduler too) await app.start()
# Or use FlowrraManager for comprehensive management from flowrra.management import FlowrraManager manager = FlowrraManager(app) schedules = await manager.list_schedules()
- async stop(wait=True, timeout=30.0)[source]
Stop the Flowrra application and all executors.
- Parameters:
wait (
bool) – If True, wait for pending tasks to completetimeout (
float|None) – Max seconds to wait for pending tasks
- property is_running: bool
- property registry
Get the shared task registry.
- Returns:
TaskRegistry instance
Note
Both IOExecutor and CPUExecutor share the same registry instance
- class flowrra.IOExecutor(config, registry=None)[source]
Bases:
BaseTaskExecutorExecutor for I/O-bound async tasks.
Best for: - Network requests (HTTP, database queries) - File I/O operations - API calls - Any async/await operations
- __init__(config, registry=None)[source]
Initialize I/O executor.
- Parameters:
config (
Config) – Configuration object (optional, defaults to Config())registry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)
Example
# With full config from flowrra import Config, BrokerConfig, BackendConfig, ExecutorConfig
- config = Config(
broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’), executor=ExecutorConfig(io_workers=8)
) executor = IOExecutor(config=config)
# With default config (uses InMemoryBackend and asyncio.PriorityQueue) executor = IOExecutor()
- task(name=None, max_retries=3, retry_delay=1.0)[source]
Register an async I/O-bound task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
- Returns:
Decorator function
- Raises:
TypeError – If decorated function is not async or is CPU-bound
- class flowrra.CPUExecutor(config, registry=None)[source]
Bases:
BaseTaskExecutorExecutor for CPU-bound sync tasks using ProcessPoolExecutor.
Best for: - Heavy computations - Data processing - Image/video processing - Scientific calculations - Cryptographic operations
Important: - Tasks must be regular (sync) functions, NOT async - Requires explicit backend for cross-process result sharing - Tasks must be picklable (no local functions)
- Usage:
from flowrra import Config, BackendConfig, ExecutorConfig
- config = Config(
backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)
) executor = CPUExecutor(config=config)
@executor.task() def heavy_computation(n: int):
# CPU-intensive work return sum(i ** 2 for i in range(n))
- async with executor:
task_id = await executor.submit(heavy_computation, 1000000) result = await executor.wait_for_result(task_id)
- __init__(config, registry=None)[source]
Initialize CPU executor.
- Parameters:
config (
Config) – Config object (REQUIRED) with backend configurationregistry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)
- Raises:
ValueError – If config is None or config lacks backend
Example
from flowrra import Config, BackendConfig, ExecutorConfig config = Config(
backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4, max_queue_size=1000)
) executor = CPUExecutor(config=config)
- task(name=None, max_retries=3, retry_delay=1.0)[source]
Register a sync CPU-bound task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
- Returns:
Decorator function
- Raises:
TypeError – If decorated function is async
- class flowrra.TaskRegistry(strict_cpu_checks=True)[source]
Bases:
object- __init__(strict_cpu_checks=True)[source]
Initialize the task registry.
- Parameters:
strict_cpu_checks (
bool) – If True, enforce module-level requirement for CPU tasks. Set to False for testing purposes only.
- task(name=None, cpu_bound=False, max_retries=3, retry_delay=1.0)[source]
Decorator to register an async function as a task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)cpu_bound (
bool) – Whether task is CPU-bound (runs in ProcessPoolExecutor)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
- class flowrra.Config(broker=None, backend=None, executor=<factory>, scheduler=None)[source]
Bases:
objectMain Flowrra configuration aggregating component configs.
This class brings together broker, backend, executor, and scheduler configurations into a single, structured configuration object. All components are optional with sensible defaults.
- Parameters:
broker (
BrokerConfig|None) – Broker configuration for task queueing (optional, uses asyncio.PriorityQueue if None)backend (
BackendConfig|None) – Backend configuration for result storage (optional)executor (
ExecutorConfig) – Executor configuration (optional, defaults to ExecutorConfig())scheduler (
SchedulerConfig|None) – Scheduler configuration (optional, disabled by default)
- broker: BrokerConfig | None = None
- backend: BackendConfig | None = None
- executor: ExecutorConfig
- scheduler: SchedulerConfig | None = None
- create_broker(queue_suffix='')[source]
Create broker instance from configuration.
- Parameters:
queue_suffix (
str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)- Return type:
BaseBroker | None
- Returns:
Broker instance if configured, None otherwise
- create_backend()[source]
Create backend instance from configuration.
- Return type:
BaseResultBackend
- Returns:
Backend instance if configured, InMemory otherwise
- create_scheduler_backend()[source]
Create scheduler backend from configuration.
- Returns:
Scheduler backend instance if configured, None otherwise
- classmethod from_env(prefix='FLOWRRA_')[source]
Load configuration from environment variables using mappings.
- Return type:
- __init__(broker=None, backend=None, executor=<factory>, scheduler=None)
- class flowrra.BrokerConfig(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)[source]
Bases:
object- url: str
- max_connections: int = 50
- socket_timeout: float = 5.0
- retry_on_timeout: bool = True
- queue_key: str | None = None
- create_broker(queue_suffix='')[source]
Create broker instance from this configuration.
- Parameters:
queue_suffix (
str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)- Return type:
BaseBroker
- Returns:
Broker instance
- Raises:
BrokerError – If URL scheme is unsupported or creation fails
- __init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)
- class flowrra.BackendConfig(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)[source]
Bases:
object- url: str
- ttl: int | None = None
- max_connections: int = 50
- socket_timeout: float = 5.0
- retry_on_timeout: bool = True
- create_backend()[source]
Create backend instance from this configuration.
- Return type:
BaseResultBackend
- Returns:
Backend instance
- Raises:
BackendError – If URL scheme is unsupported or creation fails
- __init__(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)
- class flowrra.ExecutorConfig(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)[source]
Bases:
object- io_workers: int = 4
- cpu_workers: int = 2
- max_queue_size: int = 1000
- max_retries: int = 3
- retry_delay: float = 1.0
- __init__(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)
- class flowrra.Task(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)[source]
Bases:
object- id: str
- name: str
- args: tuple
- kwargs: dict
- max_retries: int = 3
- retry_delay: float = 1.0
- current_retry: int = 0
- priority: int = 0
- cpu_bound: bool = False
- __init__(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)
- class flowrra.TaskResult(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)[source]
Bases:
object- task_id: str
- status: TaskStatus
- task_name: str | None = None
- result: Any = None
- error: str | None = None
- submitted_at: datetime | None = None
- started_at: datetime | None = None
- finished_at: datetime | None = None
- retries: int = 0
- args: tuple = ()
- kwargs: dict = None
- property is_complete: bool
- property is_success: bool
- property to_dict: dict
- __init__(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)
- class flowrra.TaskStatus(value)[source]
Bases:
Enum- PENDING = 'pending'
- RUNNING = 'running'
- SUCCESS = 'success'
- FAILED = 'failed'
- RETRYING = 'retrying'
- exception flowrra.TaskNotFoundError(task_name)[source]
Bases:
FlowrraErrorRaised when a task is not found in the registry.
- exception flowrra.TaskTimeoutError(task_id, timeout)[source]
Bases:
FlowrraErrorRaised when waiting for a task result times out.
- exception flowrra.ExecutorNotRunningError[source]
Bases:
FlowrraErrorRaised when submitting to a stopped executor.
- exception flowrra.BackendError[source]
Bases:
FlowrraErrorRaised when a backend operation fails.