flowrra.executors package
Submodules
flowrra.executors.base module
Base executor class with common functionality.
- class flowrra.executors.base.BaseTaskExecutor(config, registry=None, queue_suffix='')[source]
Bases:
ABCAbstract base class for task executors.
Provides common functionality for task submission, queue management, worker lifecycle, and result storage. Subclasses must implement task execution logic specific to I/O-bound or CPU-bound tasks.
- __init__(config, registry=None, queue_suffix='')[source]
Initialize base executor.
- Parameters:
config (
Config) – Configuration object (optional, defaults to Config())registry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)queue_suffix (
str) – Queue suffix for broker (e.g., “:io” or “:cpu”)
- property is_running: bool
Check if executor is currently running.
- async submit(task_func, *args, priority=0, **kwargs)[source]
Submit a task for execution.
- Parameters:
task_func (
Callable) – Registered task function*args – Positional arguments for the task
priority (
int) – Lower number = higher priority (keyword-only)**kwargs – Keyword arguments for the task
- Returns:
Unique identifier for tracking
- Return type:
- Raises:
TaskNotFoundError – If task not registered
ExecutorNotRunningError – If executor not started
- abstractmethod async start()[source]
Start the executor and worker pool.
Subclasses must implement this to initialize any resources (e.g., worker tasks, process pools) needed for execution.
- abstractmethod async stop(wait=True, timeout=30.0)[source]
Stop the executor.
- Parameters:
wait (
bool) – If True, wait for pending tasks to completetimeout (
float|None) – Max seconds to wait for pending tasks
Subclasses must implement this to clean up resources (e.g., cancel workers, shutdown process pools).
- async wait_for_result(task_id, timeout=None)[source]
Wait for a task to complete and return its result.
- Parameters:
task_id (
str) – Task identifiertimeout (
float|None) – Max seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult with status, result, and error information
- async get_result(task_id)[source]
Get task result without waiting.
- Parameters:
task_id (
str) – Task identifier- Return type:
TaskResult|None- Returns:
TaskResult if available, None otherwise
flowrra.executors.cpu_executor module
CPU-bound task executor using ProcessPoolExecutor.
- class flowrra.executors.cpu_executor.CPUExecutor(config, registry=None)[source]
Bases:
BaseTaskExecutorExecutor for CPU-bound sync tasks using ProcessPoolExecutor.
Best for: - Heavy computations - Data processing - Image/video processing - Scientific calculations - Cryptographic operations
Important: - Tasks must be regular (sync) functions, NOT async - Requires explicit backend for cross-process result sharing - Tasks must be picklable (no local functions)
- Usage:
from flowrra import Config, BackendConfig, ExecutorConfig
- config = Config(
backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)
) executor = CPUExecutor(config=config)
@executor.task() def heavy_computation(n: int):
# CPU-intensive work return sum(i ** 2 for i in range(n))
- async with executor:
task_id = await executor.submit(heavy_computation, 1000000) result = await executor.wait_for_result(task_id)
- __init__(config, registry=None)[source]
Initialize CPU executor.
- Parameters:
config (
Config) – Config object (REQUIRED) with backend configurationregistry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)
- Raises:
ValueError – If config is None or config lacks backend
Example
from flowrra import Config, BackendConfig, ExecutorConfig config = Config(
backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4, max_queue_size=1000)
) executor = CPUExecutor(config=config)
- task(name=None, max_retries=3, retry_delay=1.0)[source]
Register a sync CPU-bound task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
- Returns:
Decorator function
- Raises:
TypeError – If decorated function is async
flowrra.executors.io_executor module
I/O-bound task executor for async operations.
- class flowrra.executors.io_executor.IOExecutor(config, registry=None)[source]
Bases:
BaseTaskExecutorExecutor for I/O-bound async tasks.
Best for: - Network requests (HTTP, database queries) - File I/O operations - API calls - Any async/await operations
- __init__(config, registry=None)[source]
Initialize I/O executor.
- Parameters:
config (
Config) – Configuration object (optional, defaults to Config())registry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)
Example
# With full config from flowrra import Config, BrokerConfig, BackendConfig, ExecutorConfig
- config = Config(
broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’), executor=ExecutorConfig(io_workers=8)
) executor = IOExecutor(config=config)
# With default config (uses InMemoryBackend and asyncio.PriorityQueue) executor = IOExecutor()
- task(name=None, max_retries=3, retry_delay=1.0)[source]
Register an async I/O-bound task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
- Returns:
Decorator function
- Raises:
TypeError – If decorated function is not async or is CPU-bound
Module contents
Flowrra task executors.
- class flowrra.executors.BaseTaskExecutor(config, registry=None, queue_suffix='')[source]
Bases:
ABCAbstract base class for task executors.
Provides common functionality for task submission, queue management, worker lifecycle, and result storage. Subclasses must implement task execution logic specific to I/O-bound or CPU-bound tasks.
- __init__(config, registry=None, queue_suffix='')[source]
Initialize base executor.
- Parameters:
config (
Config) – Configuration object (optional, defaults to Config())registry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)queue_suffix (
str) – Queue suffix for broker (e.g., “:io” or “:cpu”)
- property is_running: bool
Check if executor is currently running.
- async submit(task_func, *args, priority=0, **kwargs)[source]
Submit a task for execution.
- Parameters:
task_func (
Callable) – Registered task function*args – Positional arguments for the task
priority (
int) – Lower number = higher priority (keyword-only)**kwargs – Keyword arguments for the task
- Returns:
Unique identifier for tracking
- Return type:
- Raises:
TaskNotFoundError – If task not registered
ExecutorNotRunningError – If executor not started
- abstractmethod async start()[source]
Start the executor and worker pool.
Subclasses must implement this to initialize any resources (e.g., worker tasks, process pools) needed for execution.
- abstractmethod async stop(wait=True, timeout=30.0)[source]
Stop the executor.
- Parameters:
wait (
bool) – If True, wait for pending tasks to completetimeout (
float|None) – Max seconds to wait for pending tasks
Subclasses must implement this to clean up resources (e.g., cancel workers, shutdown process pools).
- async wait_for_result(task_id, timeout=None)[source]
Wait for a task to complete and return its result.
- Parameters:
task_id (
str) – Task identifiertimeout (
float|None) – Max seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult with status, result, and error information
- async get_result(task_id)[source]
Get task result without waiting.
- Parameters:
task_id (
str) – Task identifier- Return type:
TaskResult|None- Returns:
TaskResult if available, None otherwise
- class flowrra.executors.IOExecutor(config, registry=None)[source]
Bases:
BaseTaskExecutorExecutor for I/O-bound async tasks.
Best for: - Network requests (HTTP, database queries) - File I/O operations - API calls - Any async/await operations
- __init__(config, registry=None)[source]
Initialize I/O executor.
- Parameters:
config (
Config) – Configuration object (optional, defaults to Config())registry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)
Example
# With full config from flowrra import Config, BrokerConfig, BackendConfig, ExecutorConfig
- config = Config(
broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’), executor=ExecutorConfig(io_workers=8)
) executor = IOExecutor(config=config)
# With default config (uses InMemoryBackend and asyncio.PriorityQueue) executor = IOExecutor()
- task(name=None, max_retries=3, retry_delay=1.0)[source]
Register an async I/O-bound task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
- Returns:
Decorator function
- Raises:
TypeError – If decorated function is not async or is CPU-bound
- class flowrra.executors.CPUExecutor(config, registry=None)[source]
Bases:
BaseTaskExecutorExecutor for CPU-bound sync tasks using ProcessPoolExecutor.
Best for: - Heavy computations - Data processing - Image/video processing - Scientific calculations - Cryptographic operations
Important: - Tasks must be regular (sync) functions, NOT async - Requires explicit backend for cross-process result sharing - Tasks must be picklable (no local functions)
- Usage:
from flowrra import Config, BackendConfig, ExecutorConfig
- config = Config(
backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)
) executor = CPUExecutor(config=config)
@executor.task() def heavy_computation(n: int):
# CPU-intensive work return sum(i ** 2 for i in range(n))
- async with executor:
task_id = await executor.submit(heavy_computation, 1000000) result = await executor.wait_for_result(task_id)
- __init__(config, registry=None)[source]
Initialize CPU executor.
- Parameters:
config (
Config) – Config object (REQUIRED) with backend configurationregistry (
TaskRegistry|None) – Shared TaskRegistry instance (optional, creates new if None)
- Raises:
ValueError – If config is None or config lacks backend
Example
from flowrra import Config, BackendConfig, ExecutorConfig config = Config(
backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4, max_queue_size=1000)
) executor = CPUExecutor(config=config)
- task(name=None, max_retries=3, retry_delay=1.0)[source]
Register a sync CPU-bound task.
- Parameters:
name (
str|None) – Custom task name (defaults to function name)max_retries (
int) – Max retry attempts on failureretry_delay (
float) – Seconds between retries
- Returns:
Decorator function
- Raises:
TypeError – If decorated function is async