flowrra.executors package

Submodules

flowrra.executors.base module

Base executor class with common functionality.

class flowrra.executors.base.BaseTaskExecutor(config, registry=None, queue_suffix='')[source]

Bases: ABC

Abstract base class for task executors.

Provides common functionality for task submission, queue management, worker lifecycle, and result storage. Subclasses must implement task execution logic specific to I/O-bound or CPU-bound tasks.

__init__(config, registry=None, queue_suffix='')[source]

Initialize base executor.

Parameters:
  • config (Config) – Configuration object (optional, defaults to Config())

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

  • queue_suffix (str) – Queue suffix for broker (e.g., “:io” or “:cpu”)

property is_running: bool

Check if executor is currently running.

task(*args, **kwargs)[source]

Decorator shortcut for self.registry.task()

async submit(task_func, *args, priority=0, **kwargs)[source]

Submit a task for execution.

Parameters:
  • task_func (Callable) – Registered task function

  • *args – Positional arguments for the task

  • priority (int) – Lower number = higher priority (keyword-only)

  • **kwargs – Keyword arguments for the task

Returns:

Unique identifier for tracking

Return type:

task_id

Raises:
abstractmethod async start()[source]

Start the executor and worker pool.

Subclasses must implement this to initialize any resources (e.g., worker tasks, process pools) needed for execution.

abstractmethod async stop(wait=True, timeout=30.0)[source]

Stop the executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks

Subclasses must implement this to clean up resources (e.g., cancel workers, shutdown process pools).

async wait_for_result(task_id, timeout=None)[source]

Wait for a task to complete and return its result.

Parameters:
  • task_id (str) – Task identifier

  • timeout (float | None) – Max seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult with status, result, and error information

async get_result(task_id)[source]

Get task result without waiting.

Parameters:

task_id (str) – Task identifier

Return type:

TaskResult | None

Returns:

TaskResult if available, None otherwise

async __aenter__()[source]

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)[source]

Async context manager exit.

flowrra.executors.cpu_executor module

CPU-bound task executor using ProcessPoolExecutor.

class flowrra.executors.cpu_executor.CPUExecutor(config, registry=None)[source]

Bases: BaseTaskExecutor

Executor for CPU-bound sync tasks using ProcessPoolExecutor.

Best for: - Heavy computations - Data processing - Image/video processing - Scientific calculations - Cryptographic operations

Important: - Tasks must be regular (sync) functions, NOT async - Requires explicit backend for cross-process result sharing - Tasks must be picklable (no local functions)

Usage:

from flowrra import Config, BackendConfig, ExecutorConfig

config = Config(

backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)

) executor = CPUExecutor(config=config)

@executor.task() def heavy_computation(n: int):

# CPU-intensive work return sum(i ** 2 for i in range(n))

async with executor:

task_id = await executor.submit(heavy_computation, 1000000) result = await executor.wait_for_result(task_id)

__init__(config, registry=None)[source]

Initialize CPU executor.

Parameters:
  • config (Config) – Config object (REQUIRED) with backend configuration

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

Raises:

ValueError – If config is None or config lacks backend

Example

from flowrra import Config, BackendConfig, ExecutorConfig config = Config(

backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4, max_queue_size=1000)

) executor = CPUExecutor(config=config)

task(name=None, max_retries=3, retry_delay=1.0)[source]

Register a sync CPU-bound task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

Returns:

Decorator function

Raises:

TypeError – If decorated function is async

async start()[source]

Start the CPU executor and worker pool.

async stop(wait=True, timeout=30.0)[source]

Stop the CPU executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks (default: 30.0)

flowrra.executors.io_executor module

I/O-bound task executor for async operations.

class flowrra.executors.io_executor.IOExecutor(config, registry=None)[source]

Bases: BaseTaskExecutor

Executor for I/O-bound async tasks.

Best for: - Network requests (HTTP, database queries) - File I/O operations - API calls - Any async/await operations

__init__(config, registry=None)[source]

Initialize I/O executor.

Parameters:
  • config (Config) – Configuration object (optional, defaults to Config())

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

Example

# With full config from flowrra import Config, BrokerConfig, BackendConfig, ExecutorConfig

config = Config(

broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’), executor=ExecutorConfig(io_workers=8)

) executor = IOExecutor(config=config)

# With default config (uses InMemoryBackend and asyncio.PriorityQueue) executor = IOExecutor()

is_broker()[source]

Check if executor uses a broker for task queueing.

Return type:

bool

task(name=None, max_retries=3, retry_delay=1.0)[source]

Register an async I/O-bound task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

Returns:

Decorator function

Raises:

TypeError – If decorated function is not async or is CPU-bound

async start()[source]

Start the I/O executor and worker pool.

async stop(wait=True, timeout=30.0)[source]

Stop the I/O executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks (default: 30.0)

Module contents

Flowrra task executors.

class flowrra.executors.BaseTaskExecutor(config, registry=None, queue_suffix='')[source]

Bases: ABC

Abstract base class for task executors.

Provides common functionality for task submission, queue management, worker lifecycle, and result storage. Subclasses must implement task execution logic specific to I/O-bound or CPU-bound tasks.

__init__(config, registry=None, queue_suffix='')[source]

Initialize base executor.

Parameters:
  • config (Config) – Configuration object (optional, defaults to Config())

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

  • queue_suffix (str) – Queue suffix for broker (e.g., “:io” or “:cpu”)

property is_running: bool

Check if executor is currently running.

task(*args, **kwargs)[source]

Decorator shortcut for self.registry.task()

async submit(task_func, *args, priority=0, **kwargs)[source]

Submit a task for execution.

Parameters:
  • task_func (Callable) – Registered task function

  • *args – Positional arguments for the task

  • priority (int) – Lower number = higher priority (keyword-only)

  • **kwargs – Keyword arguments for the task

Returns:

Unique identifier for tracking

Return type:

task_id

Raises:
abstractmethod async start()[source]

Start the executor and worker pool.

Subclasses must implement this to initialize any resources (e.g., worker tasks, process pools) needed for execution.

abstractmethod async stop(wait=True, timeout=30.0)[source]

Stop the executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks

Subclasses must implement this to clean up resources (e.g., cancel workers, shutdown process pools).

async wait_for_result(task_id, timeout=None)[source]

Wait for a task to complete and return its result.

Parameters:
  • task_id (str) – Task identifier

  • timeout (float | None) – Max seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult with status, result, and error information

async get_result(task_id)[source]

Get task result without waiting.

Parameters:

task_id (str) – Task identifier

Return type:

TaskResult | None

Returns:

TaskResult if available, None otherwise

async __aenter__()[source]

Async context manager entry.

async __aexit__(exc_type, exc_val, exc_tb)[source]

Async context manager exit.

class flowrra.executors.IOExecutor(config, registry=None)[source]

Bases: BaseTaskExecutor

Executor for I/O-bound async tasks.

Best for: - Network requests (HTTP, database queries) - File I/O operations - API calls - Any async/await operations

__init__(config, registry=None)[source]

Initialize I/O executor.

Parameters:
  • config (Config) – Configuration object (optional, defaults to Config())

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

Example

# With full config from flowrra import Config, BrokerConfig, BackendConfig, ExecutorConfig

config = Config(

broker=BrokerConfig(url=’redis://localhost:6379/0’), backend=BackendConfig(url=’redis://localhost:6379/1’), executor=ExecutorConfig(io_workers=8)

) executor = IOExecutor(config=config)

# With default config (uses InMemoryBackend and asyncio.PriorityQueue) executor = IOExecutor()

is_broker()[source]

Check if executor uses a broker for task queueing.

Return type:

bool

task(name=None, max_retries=3, retry_delay=1.0)[source]

Register an async I/O-bound task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

Returns:

Decorator function

Raises:

TypeError – If decorated function is not async or is CPU-bound

async start()[source]

Start the I/O executor and worker pool.

async stop(wait=True, timeout=30.0)[source]

Stop the I/O executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks (default: 30.0)

class flowrra.executors.CPUExecutor(config, registry=None)[source]

Bases: BaseTaskExecutor

Executor for CPU-bound sync tasks using ProcessPoolExecutor.

Best for: - Heavy computations - Data processing - Image/video processing - Scientific calculations - Cryptographic operations

Important: - Tasks must be regular (sync) functions, NOT async - Requires explicit backend for cross-process result sharing - Tasks must be picklable (no local functions)

Usage:

from flowrra import Config, BackendConfig, ExecutorConfig

config = Config(

backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)

) executor = CPUExecutor(config=config)

@executor.task() def heavy_computation(n: int):

# CPU-intensive work return sum(i ** 2 for i in range(n))

async with executor:

task_id = await executor.submit(heavy_computation, 1000000) result = await executor.wait_for_result(task_id)

__init__(config, registry=None)[source]

Initialize CPU executor.

Parameters:
  • config (Config) – Config object (REQUIRED) with backend configuration

  • registry (TaskRegistry | None) – Shared TaskRegistry instance (optional, creates new if None)

Raises:

ValueError – If config is None or config lacks backend

Example

from flowrra import Config, BackendConfig, ExecutorConfig config = Config(

backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4, max_queue_size=1000)

) executor = CPUExecutor(config=config)

task(name=None, max_retries=3, retry_delay=1.0)[source]

Register a sync CPU-bound task.

Parameters:
  • name (str | None) – Custom task name (defaults to function name)

  • max_retries (int) – Max retry attempts on failure

  • retry_delay (float) – Seconds between retries

Returns:

Decorator function

Raises:

TypeError – If decorated function is async

async start()[source]

Start the CPU executor and worker pool.

async stop(wait=True, timeout=30.0)[source]

Stop the CPU executor.

Parameters:
  • wait (bool) – If True, wait for pending tasks to complete

  • timeout (float | None) – Max seconds to wait for pending tasks (default: 30.0)