flowrra package

Subpackages

Submodules

flowrra.app module

class flowrra.app.Flowrra(config=None)[source]

Bases: object

Unified Flowrra application for task execution.

Automatically routes tasks to appropriate executors based on task type.

Usage:

# Using Config config = Config(

broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’)

) app = Flowrra(config=config)

# Using from_urls() app = Flowrra.from_urls(

broker=’redis://localhost:6379/0’, backend=’redis://localhost:6379/1’

)

# I/O-bound task (default) @app.task() async def fetch_data(url: str):

return await fetch(url)

# CPU-bound task @app.task(cpu_bound=True) def heavy_compute(n: int):

return sum(i**2 for i in range(n))

__init__(config=None)[source]
classmethod from_urls(broker=None, backend=None)[source]

Convenience method to create Flowrra from URLs.

Parameters:
  • broker (str | None) – Broker connection URL (optional, None = asyncio.PriorityQueue)

  • backend (str | None) – Backend connection URL (optional, None = InMemoryBackend for IO, required for CPU)

Return type:

Flowrra

Returns:

Flowrra instance

task(name=None, cpu_bound=False, **kwargs)[source]

Register a task with automatic executor routing.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • cpu_bound (bool) – If True, uses CPUExecutor; if False, uses IOExecutor (default)

Returns:

Decorator function

async submit(task_func, *args, **kwargs)[source]

Submit a task for execution.

Parameters:
  • task_func (Callable) – Registered task function

  • *args – Positional arguments for the task

  • **kwargs – Keyword arguments for the task

Returns:

Unique identifier for tracking

Return type:

task_id

async wait_for_result(task_id, timeout=None)[source]

Wait for a task to complete and return its result.

Parameters:
  • task_id (str) – Task identifier

  • timeout (float | None) – Max seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult with status, result, and error information

async get_result(task_id)[source]
Return type:

TaskResult | None

async start()[source]
create_scheduler(backend=None, check_interval=60.0)[source]

Create a scheduler integrated with this app’s executors.

The scheduler is automatically registered with the app and will start/stop with the app lifecycle. Access it via app.scheduler property or use FlowrraManager for comprehensive schedule management.

Parameters:
  • backend (BaseSchedulerBackend | str | None) – Scheduler backend (URL string or instance) - None: Uses default SQLite backend (.flowrra_schedule.db) - String: Database URL (e.g., “postgresql://localhost/db”) - Instance: Custom BaseSchedulerBackend instance

  • check_interval (float) – How often to check for due tasks (seconds)

Return type:

Scheduler

Returns:

Scheduler instance configured with this app’s executors

Example

app = Flowrra.from_urls()

@app.task() async def send_report():

return “Report sent”

# Create integrated scheduler (auto-registered) scheduler = app.create_scheduler()

# Schedule tasks await scheduler.schedule_cron(

task_name=”send_report”, cron=”0 9 * * *

)

# Start app (automatically starts scheduler too) await app.start()

# Or use FlowrraManager for comprehensive management from flowrra.management import FlowrraManager manager = FlowrraManager(app) schedules = await manager.list_schedules()

async stop(wait=True, timeout=30.0)[source]

Stop the Flowrra application and all executors.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks

property is_running: bool
property registry

Get the shared task registry.

Returns:

TaskRegistry instance

Note

Both IOExecutor and CPUExecutor share the same registry instance

property scheduler: Scheduler | None

Get the registered scheduler instance.

Returns:

Scheduler instance if created via create_scheduler(), None otherwise

Note

The scheduler is optional. It only exists if create_scheduler() was called.

flowrra.config module

class flowrra.config.BrokerConfig(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)[source]

Bases: object

url: str
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
queue_key: str | None = None
__post_init__()[source]

Validate broker configuration.

create_broker(queue_suffix='')[source]

Create broker instance from this configuration.

Parameters:

queue_suffix (str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)

Return type:

BaseBroker

Returns:

Broker instance

Raises:

BrokerError – If URL scheme is unsupported or creation fails

__init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)
class flowrra.config.BackendConfig(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)[source]

Bases: object

url: str
ttl: int | None = None
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
__post_init__()[source]

Validate backend configuration.

create_backend()[source]

Create backend instance from this configuration.

Return type:

BaseResultBackend

Returns:

Backend instance

Raises:

BackendError – If URL scheme is unsupported or creation fails

__init__(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)
class flowrra.config.ExecutorConfig(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)[source]

Bases: object

io_workers: int = 4
cpu_workers: int = 2
max_queue_size: int = 1000
max_retries: int = 3
retry_delay: float = 1.0
__post_init__()[source]

Validate executor configuration.

__init__(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)
class flowrra.config.SchedulerConfig(database_url=None, check_interval=60.0, enabled=False)[source]

Bases: object

Scheduler configuration for persistent task scheduling.

Parameters:
  • database_url (str | None) – Database URL for scheduler storage - SQLite: “sqlite:///path/to/schedule.db” (default: None for .flowrra_schedule.db) - PostgreSQL: “postgresql://user:pass@host/db” - MySQL: “mysql://user:pass@host/db”

  • check_interval (float) – How often to check for due tasks (seconds)

  • enabled (bool) – Whether scheduler is enabled

database_url: str | None = None
check_interval: float = 60.0
enabled: bool = False
__post_init__()[source]

Validate scheduler configuration.

create_backend()[source]

Create scheduler backend from configuration.

Return type:

BaseSchedulerBackend

Returns:

Scheduler backend instance

__init__(database_url=None, check_interval=60.0, enabled=False)
class flowrra.config.Config(broker=None, backend=None, executor=<factory>, scheduler=None)[source]

Bases: object

Main Flowrra configuration aggregating component configs.

This class brings together broker, backend, executor, and scheduler configurations into a single, structured configuration object. All components are optional with sensible defaults.

Parameters:
  • broker (BrokerConfig | None) – Broker configuration for task queueing (optional, uses asyncio.PriorityQueue if None)

  • backend (BackendConfig | None) – Backend configuration for result storage (optional)

  • executor (ExecutorConfig) – Executor configuration (optional, defaults to ExecutorConfig())

  • scheduler (SchedulerConfig | None) – Scheduler configuration (optional, disabled by default)

broker: BrokerConfig | None = None
backend: BackendConfig | None = None
executor: ExecutorConfig
scheduler: SchedulerConfig | None = None
__post_init__()[source]

Ensure executor config exists.

create_broker(queue_suffix='')[source]

Create broker instance from configuration.

Parameters:

queue_suffix (str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)

Return type:

BaseBroker | None

Returns:

Broker instance if configured, None otherwise

create_backend()[source]

Create backend instance from configuration.

Return type:

BaseResultBackend

Returns:

Backend instance if configured, InMemory otherwise

create_scheduler_backend()[source]

Create scheduler backend from configuration.

Returns:

Scheduler backend instance if configured, None otherwise

classmethod from_env(prefix='FLOWRRA_')[source]

Load configuration from environment variables using mappings.

Return type:

Config

__init__(broker=None, backend=None, executor=<factory>, scheduler=None)

flowrra.constants module

Constants for Flowrra UI.

flowrra.events module

class flowrra.events.EventBus[source]

Bases: object

Thread-safe event bus for broadcasting task events.

This implementation uses asyncio.Lock for thread-safe subscriber management and emits events concurrently to all subscribers with error isolation.

Features:
  • Thread-safe subscribe/unsubscribe operations

  • Concurrent event emission to all subscribers

  • Error isolation (one subscriber’s error doesn’t affect others)

  • Async-safe list iteration

Example

async def my_subscriber(event: dict):

print(f”Received: {event}”)

event_bus.subscribe(my_subscriber) await event_bus.emit({‘type’: ‘task.update’, ‘task’: {…}}) await event_bus.unsubscribe(my_subscriber)

__init__()[source]
subscribe(fn)[source]

Subscribe to events (sync method for backward compatibility).

Note: This is synchronous but modifies shared state. For async contexts, consider using async_subscribe instead.

Parameters:

fn (Callable[[dict], Awaitable[None]]) – Async function that receives event dict

async async_subscribe(fn)[source]

Subscribe to events (async-safe).

Parameters:

fn (Callable[[dict], Awaitable[None]]) – Async function that receives event dict

async unsubscribe(fn)[source]

Unsubscribe from events.

Parameters:

fn (Callable[[dict], Awaitable[None]]) – The subscriber function to remove

async emit(event)[source]

Emit event to all subscribers concurrently.

Events are delivered to all subscribers in parallel using asyncio.gather. If a subscriber raises an exception, it’s logged but doesn’t affect others.

Parameters:

event (dict) – Event dict to broadcast

flowrra.exceptions module

Custom exceptions for flowrra.

exception flowrra.exceptions.FlowrraError[source]

Bases: Exception

exception flowrra.exceptions.TaskNotFoundError(task_name)[source]

Bases: FlowrraError

Raised when a task is not found in the registry.

__init__(task_name)[source]
exception flowrra.exceptions.TaskTimeoutError(task_id, timeout)[source]

Bases: FlowrraError

Raised when waiting for a task result times out.

__init__(task_id, timeout)[source]
exception flowrra.exceptions.ExecutorNotRunningError[source]

Bases: FlowrraError

Raised when submitting to a stopped executor.

__init__()[source]
exception flowrra.exceptions.BackendError[source]

Bases: FlowrraError

Raised when a backend operation fails.

flowrra.registry module

class flowrra.registry.TaskRegistry(strict_cpu_checks=True)[source]

Bases: object

__init__(strict_cpu_checks=True)[source]

Initialize the task registry.

Parameters:

strict_cpu_checks (bool) – If True, enforce module-level requirement for CPU tasks. Set to False for testing purposes only.

task(name=None, cpu_bound=False, max_retries=3, retry_delay=1.0)[source]

Decorator to register an async function as a task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • cpu_bound (bool) – Whether task is CPU-bound (runs in ProcessPoolExecutor)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

get(name)[source]
get_or_raise(name)[source]
list_tasks()[source]
is_registered(name)[source]
unregister(name)[source]

flowrra.task module

class flowrra.task.TaskStatus(value)[source]

Bases: Enum

PENDING = 'pending'
RUNNING = 'running'
SUCCESS = 'success'
FAILED = 'failed'
RETRYING = 'retrying'
class flowrra.task.TaskResult(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)[source]

Bases: object

task_id: str
status: TaskStatus
task_name: str | None = None
result: Any = None
error: str | None = None
submitted_at: datetime | None = None
started_at: datetime | None = None
finished_at: datetime | None = None
retries: int = 0
args: tuple = ()
kwargs: dict = None
property is_complete: bool
property is_success: bool
property to_dict: dict
classmethod from_dict(data)[source]
Return type:

TaskResult

__init__(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)
class flowrra.task.Task(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)[source]

Bases: object

id: str
name: str
args: tuple
kwargs: dict
max_retries: int = 3
retry_delay: float = 1.0
current_retry: int = 0
priority: int = 0
cpu_bound: bool = False
__init__(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)

Module contents

Flowrra - Async task executor built on asyncio.

A lightweight, Celery-inspired task queue using pure Python asyncio.

Basic usage with Flowrra (unified API):

from flowrra import Flowrra

app = Flowrra.from_urls(

broker=’redis://localhost:6379/0’, backend=’redis://localhost:6379/1’

)

# I/O-bound task (async) @app.task() async def fetch_data(url: str):

return await fetch(url)

# CPU-bound task (sync) @app.task(cpu_bound=True) def heavy_compute(n: int):

return sum(i ** 2 for i in range(n))

async def main():
async with app:

task_id = await app.submit(fetch_data, “https://api.example.com”) result = await app.wait_for_result(task_id) print(result.result)

Advanced usage with IOExecutor and CPUExecutor:

from flowrra import IOExecutor, CPUExecutor, Config, ExecutorConfig, BackendConfig

# For I/O-bound tasks only config = Config(executor=ExecutorConfig(io_workers=10)) executor = IOExecutor(config=config)

# For CPU-bound tasks (requires Redis backend) # cpu_workers defaults to os.cpu_count() if not specified config = Config(

backend=BackendConfig(url=’redis://localhost:6379/0’), executor=ExecutorConfig(cpu_workers=4) # or omit to use CPU core count

) executor = CPUExecutor(config=config)

class flowrra.Flowrra(config=None)[source]

Bases: object

Unified Flowrra application for task execution.

Automatically routes tasks to appropriate executors based on task type.

Usage:

# Using Config config = Config(

broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’)

) app = Flowrra(config=config)

# Using from_urls() app = Flowrra.from_urls(

broker=’redis://localhost:6379/0’, backend=’redis://localhost:6379/1’

)

# I/O-bound task (default) @app.task() async def fetch_data(url: str):

return await fetch(url)

# CPU-bound task @app.task(cpu_bound=True) def heavy_compute(n: int):

return sum(i**2 for i in range(n))

__init__(config=None)[source]
classmethod from_urls(broker=None, backend=None)[source]

Convenience method to create Flowrra from URLs.

Parameters:
  • broker (str | None) – Broker connection URL (optional, None = asyncio.PriorityQueue)

  • backend (str | None) – Backend connection URL (optional, None = InMemoryBackend for IO, required for CPU)

Return type:

Flowrra

Returns:

Flowrra instance

task(name=None, cpu_bound=False, **kwargs)[source]

Register a task with automatic executor routing.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • cpu_bound (bool) – If True, uses CPUExecutor; if False, uses IOExecutor (default)

Returns:

Decorator function

async submit(task_func, *args, **kwargs)[source]

Submit a task for execution.

Parameters:
  • task_func (Callable) – Registered task function

  • *args – Positional arguments for the task

  • **kwargs – Keyword arguments for the task

Returns:

Unique identifier for tracking

Return type:

task_id

async wait_for_result(task_id, timeout=None)[source]

Wait for a task to complete and return its result.

Parameters:
  • task_id (str) – Task identifier

  • timeout (float | None) – Max seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult with status, result, and error information

async get_result(task_id)[source]
Return type:

TaskResult | None

async start()[source]
create_scheduler(backend=None, check_interval=60.0)[source]

Create a scheduler integrated with this app’s executors.

The scheduler is automatically registered with the app and will start/stop with the app lifecycle. Access it via app.scheduler property or use FlowrraManager for comprehensive schedule management.

Parameters:
  • backend (BaseSchedulerBackend | str | None) – Scheduler backend (URL string or instance) - None: Uses default SQLite backend (.flowrra_schedule.db) - String: Database URL (e.g., “postgresql://localhost/db”) - Instance: Custom BaseSchedulerBackend instance

  • check_interval (float) – How often to check for due tasks (seconds)

Return type:

Scheduler

Returns:

Scheduler instance configured with this app’s executors

Example

app = Flowrra.from_urls()

@app.task() async def send_report():

return “Report sent”

# Create integrated scheduler (auto-registered) scheduler = app.create_scheduler()

# Schedule tasks await scheduler.schedule_cron(

task_name=”send_report”, cron=”0 9 * * *

)

# Start app (automatically starts scheduler too) await app.start()

# Or use FlowrraManager for comprehensive management from flowrra.management import FlowrraManager manager = FlowrraManager(app) schedules = await manager.list_schedules()

async stop(wait=True, timeout=30.0)[source]

Stop the Flowrra application and all executors.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks

property is_running: bool
property registry

Get the shared task registry.

Returns:

TaskRegistry instance

Note

Both IOExecutor and CPUExecutor share the same registry instance

property scheduler: Scheduler | None

Get the registered scheduler instance.

Returns:

Scheduler instance if created via create_scheduler(), None otherwise

Note

The scheduler is optional. It only exists if create_scheduler() was called.

class flowrra.IOExecutor(config, registry=None)[source]

Bases: BaseTaskExecutor

Executor for I/O-bound async tasks.

Best for: - Network requests (HTTP, database queries) - File I/O operations - API calls - Any async/await operations

__init__(config, registry=None)[source]

Initialize I/O executor.

Parameters:
  • config (Config) – Configuration object (optional, defaults to Config())

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

Example

# With full config from flowrra import Config, BrokerConfig, BackendConfig, ExecutorConfig

config = Config(

broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’), executor=ExecutorConfig(io_workers=8)

) executor = IOExecutor(config=config)

# With default config (uses InMemoryBackend and asyncio.PriorityQueue) executor = IOExecutor()

is_broker()[source]

Check if executor uses a broker for task queueing.

Return type:

bool

task(name=None, max_retries=3, retry_delay=1.0)[source]

Register an async I/O-bound task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

Returns:

Decorator function

Raises:

TypeError – If decorated function is not async or is CPU-bound

async start()[source]

Start the I/O executor and worker pool.

async stop(wait=True, timeout=30.0)[source]

Stop the I/O executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks (default: 30.0)

class flowrra.CPUExecutor(config, registry=None)[source]

Bases: BaseTaskExecutor

Executor for CPU-bound sync tasks using ProcessPoolExecutor.

Best for: - Heavy computations - Data processing - Image/video processing - Scientific calculations - Cryptographic operations

Important: - Tasks must be regular (sync) functions, NOT async - Requires explicit backend for cross-process result sharing - Tasks must be picklable (no local functions)

Usage:

from flowrra import Config, BackendConfig, ExecutorConfig

config = Config(

backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)

) executor = CPUExecutor(config=config)

@executor.task() def heavy_computation(n: int):

# CPU-intensive work return sum(i ** 2 for i in range(n))

async with executor:

task_id = await executor.submit(heavy_computation, 1000000) result = await executor.wait_for_result(task_id)

__init__(config, registry=None)[source]

Initialize CPU executor.

Parameters:
  • config (Config) – Config object (REQUIRED) with backend configuration

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

Raises:

ValueError – If config is None or config lacks backend

Example

from flowrra import Config, BackendConfig, ExecutorConfig config = Config(

backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4, max_queue_size=1000)

) executor = CPUExecutor(config=config)

task(name=None, max_retries=3, retry_delay=1.0)[source]

Register a sync CPU-bound task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

Returns:

Decorator function

Raises:

TypeError – If decorated function is async

async start()[source]

Start the CPU executor and worker pool.

async stop(wait=True, timeout=30.0)[source]

Stop the CPU executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks (default: 30.0)

class flowrra.TaskRegistry(strict_cpu_checks=True)[source]

Bases: object

__init__(strict_cpu_checks=True)[source]

Initialize the task registry.

Parameters:

strict_cpu_checks (bool) – If True, enforce module-level requirement for CPU tasks. Set to False for testing purposes only.

task(name=None, cpu_bound=False, max_retries=3, retry_delay=1.0)[source]

Decorator to register an async function as a task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • cpu_bound (bool) – Whether task is CPU-bound (runs in ProcessPoolExecutor)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

get(name)[source]
get_or_raise(name)[source]
list_tasks()[source]
is_registered(name)[source]
unregister(name)[source]
class flowrra.Config(broker=None, backend=None, executor=<factory>, scheduler=None)[source]

Bases: object

Main Flowrra configuration aggregating component configs.

This class brings together broker, backend, executor, and scheduler configurations into a single, structured configuration object. All components are optional with sensible defaults.

Parameters:
  • broker (BrokerConfig | None) – Broker configuration for task queueing (optional, uses asyncio.PriorityQueue if None)

  • backend (BackendConfig | None) – Backend configuration for result storage (optional)

  • executor (ExecutorConfig) – Executor configuration (optional, defaults to ExecutorConfig())

  • scheduler (SchedulerConfig | None) – Scheduler configuration (optional, disabled by default)

broker: BrokerConfig | None = None
backend: BackendConfig | None = None
executor: ExecutorConfig
scheduler: SchedulerConfig | None = None
__post_init__()[source]

Ensure executor config exists.

create_broker(queue_suffix='')[source]

Create broker instance from configuration.

Parameters:

queue_suffix (str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)

Return type:

BaseBroker | None

Returns:

Broker instance if configured, None otherwise

create_backend()[source]

Create backend instance from configuration.

Return type:

BaseResultBackend

Returns:

Backend instance if configured, InMemory otherwise

create_scheduler_backend()[source]

Create scheduler backend from configuration.

Returns:

Scheduler backend instance if configured, None otherwise

classmethod from_env(prefix='FLOWRRA_')[source]

Load configuration from environment variables using mappings.

Return type:

Config

__init__(broker=None, backend=None, executor=<factory>, scheduler=None)
class flowrra.BrokerConfig(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)[source]

Bases: object

url: str
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
queue_key: str | None = None
__post_init__()[source]

Validate broker configuration.

create_broker(queue_suffix='')[source]

Create broker instance from this configuration.

Parameters:

queue_suffix (str) – Optional suffix for queue key (e.g., “:io” or “:cpu”)

Return type:

BaseBroker

Returns:

Broker instance

Raises:

BrokerError – If URL scheme is unsupported or creation fails

__init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key=None)
class flowrra.BackendConfig(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)[source]

Bases: object

url: str
ttl: int | None = None
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
__post_init__()[source]

Validate backend configuration.

create_backend()[source]

Create backend instance from this configuration.

Return type:

BaseResultBackend

Returns:

Backend instance

Raises:

BackendError – If URL scheme is unsupported or creation fails

__init__(url, ttl=None, max_connections=50, socket_timeout=5.0, retry_on_timeout=True)
class flowrra.ExecutorConfig(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)[source]

Bases: object

io_workers: int = 4
cpu_workers: int = 2
max_queue_size: int = 1000
max_retries: int = 3
retry_delay: float = 1.0
__post_init__()[source]

Validate executor configuration.

__init__(io_workers=4, cpu_workers=2, max_queue_size=1000, max_retries=3, retry_delay=1.0)
class flowrra.Task(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)[source]

Bases: object

id: str
name: str
args: tuple
kwargs: dict
max_retries: int = 3
retry_delay: float = 1.0
current_retry: int = 0
priority: int = 0
cpu_bound: bool = False
__init__(id, name, args, kwargs, max_retries=3, retry_delay=1.0, current_retry=0, priority=0, cpu_bound=False)
class flowrra.TaskResult(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)[source]

Bases: object

task_id: str
status: TaskStatus
task_name: str | None = None
result: Any = None
error: str | None = None
submitted_at: datetime | None = None
started_at: datetime | None = None
finished_at: datetime | None = None
retries: int = 0
args: tuple = ()
kwargs: dict = None
property is_complete: bool
property is_success: bool
property to_dict: dict
classmethod from_dict(data)[source]
Return type:

TaskResult

__init__(task_id, status, task_name=None, result=None, error=None, submitted_at=None, started_at=None, finished_at=None, retries=0, args=(), kwargs=None)
class flowrra.TaskStatus(value)[source]

Bases: Enum

PENDING = 'pending'
RUNNING = 'running'
SUCCESS = 'success'
FAILED = 'failed'
RETRYING = 'retrying'
exception flowrra.FlowrraError[source]

Bases: Exception

exception flowrra.TaskNotFoundError(task_name)[source]

Bases: FlowrraError

Raised when a task is not found in the registry.

__init__(task_name)[source]
exception flowrra.TaskTimeoutError(task_id, timeout)[source]

Bases: FlowrraError

Raised when waiting for a task result times out.

__init__(task_id, timeout)[source]
exception flowrra.ExecutorNotRunningError[source]

Bases: FlowrraError

Raised when submitting to a stopped executor.

__init__()[source]
exception flowrra.BackendError[source]

Bases: FlowrraError

Raised when a backend operation fails.