flowrra.backends package

Submodules

flowrra.backends.base module

class flowrra.backends.base.BaseResultBackend[source]

Bases: ABC

Abstract base for result storage backends.

Implementations must provide: - store(): Persist a task result - get(): Retrieve a task result by ID - wait_for(): Block until a task completes

abstractmethod async store(task_id, result)[source]

Store a task result.

Parameters:
  • task_id (str) – Unique task identifier

  • result (TaskResult) – TaskResult object to store

Return type:

None

abstractmethod async get(task_id)[source]

Retrieve a task result.

Parameters:

task_id (str) – Unique task identifier

Return type:

TaskResult | None

Returns:

TaskResult if found, None otherwise

abstractmethod async wait_for(task_id, timeout)[source]

Wait for a task to complete.

Parameters:
  • task_id (str) – Unique task identifier

  • timeout (float | None) – Maximum seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult when task completes

Raises:

asyncio.TimeoutError – If timeout exceeded

async delete(task_id)[source]

Delete a task result. Optional to implement.

Return type:

bool

Returns:

True if deleted, False if not found

async clear()[source]

Clear all results. Optional to implement.

Return type:

int

Returns:

Number of results cleared

async list_by_status(status, limit=None, offset=0)[source]

List tasks by status with optional pagination.

Optional to implement. Backends that don’t support querying should raise NotImplementedError (default behavior).

Parameters:
  • status (TaskStatus) – Task status to filter by (PENDING, RUNNING, SUCCESS, FAILED, RETRYING)

  • limit (int | None) – Maximum number of results to return (None = no limit)

  • offset (int) – Number of results to skip (for pagination)

Return type:

list[TaskResult]

Returns:

List of TaskResult objects matching the status, ordered by submitted_at DESC (most recent first). Empty list if no matches found.

Raises:

NotImplementedError – If backend doesn’t support status queries

Note

  • Results are ordered by submitted_at DESC (newest first)

  • If submitted_at is None, those tasks are ordered last

  • InMemoryBackend and RedisBackend both support this operation

flowrra.backends.factory module

Backend factory for creating backends from connection strings.

flowrra.backends.factory.get_backend(backend, **kwargs)[source]

Create a backend from a connection string or return existing instance.

This factory function provides a convenient way to create backends from connection strings while maintaining support for custom backend instances. Additional keyword arguments are passed to the backend constructor.

Parameters:
  • backend (BaseResultBackend | str | None) – Either a BaseResultBackend instance, a connection string, or None

  • **kwargs – Additional backend-specific configuration options (ttl, max_connections, etc.)

Return type:

BaseResultBackend

Returns:

BaseResultBackend instance

Raises:

BackendError – If URL scheme is unsupported or backend creation fails

Examples

# Redis connection string (recommended for production) backend = get_backend(“redis://localhost:6379/0”) backend = get_backend(“rediss://localhost:6379/0”, ttl=3600) # With TTL

# None returns InMemoryBackend (internal default) backend = get_backend(None) # Returns InMemoryBackend()

# Custom backend instance passthrough (advanced) backend = get_backend(custom_backend_instance)

flowrra.backends.memory module

In-memory result backend for single-process use.

class flowrra.backends.memory.InMemoryBackend[source]

Bases: BaseResultBackend

In-memory result storage.

Best for: - Development and testing - Single-process applications - When persistence isn’t needed

Limitations: - Results lost on restart - Not shared across processes

__init__()[source]
async store(task_id, result)[source]

Store a task result.

Parameters:
  • task_id (str) – Unique task identifier

  • result (TaskResult) – TaskResult object to store

Return type:

None

async get(task_id)[source]

Retrieve a task result.

Parameters:

task_id (str) – Unique task identifier

Return type:

TaskResult | None

Returns:

TaskResult if found, None otherwise

async wait_for(task_id, timeout)[source]

Wait for a task to complete.

Parameters:
  • task_id (str) – Unique task identifier

  • timeout (float | None) – Maximum seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult when task completes

Raises:

asyncio.TimeoutError – If timeout exceeded

async delete(task_id)[source]

Delete a task result. Optional to implement.

Return type:

bool

Returns:

True if deleted, False if not found

async clear()[source]

Clear all results. Optional to implement.

Return type:

int

Returns:

Number of results cleared

async list_by_status(status, limit=None, offset=0)[source]

List tasks by status with optional pagination.

Return type:

list[TaskResult]

flowrra.backends.redis module

Redis-based result backend for distributed task execution.

class flowrra.backends.redis.RedisBackend(url, ttl=None, **kwargs)[source]

Bases: BaseResultBackend

Redis-based result backend for distributed task execution.

Features: - Cross-process result sharing - Persistent storage - Pub/sub for wait notifications - Automatic connection pooling - TTL support for result expiration

Connection Patterns: - Basic: redis://localhost:6379/0 - With password: redis://:password@localhost:6379/0 - With username: redis://username:password@localhost:6379/0 - SSL: rediss://localhost:6379/0 - Unix socket: unix:///path/to/socket

Parameters:
  • url (str) – Redis connection URL

  • ttl (int | None) – Optional TTL in seconds for result expiration (None = no expiration)

  • **kwargs (Any) – Additional options passed to redis.asyncio.from_url()

Example

backend = RedisBackend(“redis://localhost:6379/0”) backend = RedisBackend(“redis://localhost:6379/0”, ttl=3600) # 1 hour expiration

__init__(url, ttl=None, **kwargs)[source]
async store(task_id, result)[source]

Store a task result.

Parameters:
  • task_id (str) – Unique task identifier

  • result (TaskResult) – TaskResult object to store

Return type:

None

async get(task_id)[source]

Retrieve a task result.

Parameters:

task_id (str) – Unique task identifier

Return type:

TaskResult | None

Returns:

TaskResult if found, None otherwise

async wait_for(task_id, timeout=10)[source]

Wait for a task to complete using Redis pub/sub.

Parameters:
  • task_id (str) – Unique task identifier

  • timeout (float | None) – Maximum seconds to wait (default is 10 sec)

Return type:

TaskResult

Returns:

TaskResult when task completes

Raises:

asyncio.TimeoutError – If timeout exceeded

async delete(task_id)[source]

Delete a task result from Redis.

Parameters:

task_id (str) – Unique task identifier

Return type:

bool

Returns:

True if deleted, False if not found

async clear()[source]

Clear all Flowrra task results from Redis.

Return type:

int

Returns:

Number of results cleared

async list_by_status(status, limit=None, offset=0)[source]

List tasks by status with optional pagination.

Return type:

list[TaskResult]

async close()[source]
Return type:

None

__len__()[source]

Not supported for Redis backend.

Return type:

int

Module contents

Flowrra result storage backends.

Available backends:
  • InMemoryBackend: Default, single-process, non-persistent (internal use only)

  • RedisBackend: Distributed, persistent (supports connection strings)

Factory function:
  • get_backend(): Create Redis backend from connection string or passthrough any backend instance

Usage:

from flowrra import IOExecutor, CPUExecutor, Config, ExecutorConfig, BackendConfig

# IOExecutor: No backend needed (InMemoryBackend used internally) config = Config(executor=ExecutorConfig(io_workers=4)) io_executor = IOExecutor(config=config)

# CPUExecutor: Redis connection string (recommended for production) config = Config(

backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)

) cpu_executor = CPUExecutor(config=config)

Creating custom backends:

Subclass BaseResultBackend and implement: - store(task_id, result) - get(task_id) -> TaskResult | None - wait_for(task_id, timeout) -> TaskResult

Example:
class PostgresBackend(BaseResultBackend):

async def store(self, task_id, result): … async def get(self, task_id): … async def wait_for(self, task_id, timeout): …

class flowrra.backends.BaseResultBackend[source]

Bases: ABC

Abstract base for result storage backends.

Implementations must provide: - store(): Persist a task result - get(): Retrieve a task result by ID - wait_for(): Block until a task completes

abstractmethod async store(task_id, result)[source]

Store a task result.

Parameters:
  • task_id (str) – Unique task identifier

  • result (TaskResult) – TaskResult object to store

Return type:

None

abstractmethod async get(task_id)[source]

Retrieve a task result.

Parameters:

task_id (str) – Unique task identifier

Return type:

TaskResult | None

Returns:

TaskResult if found, None otherwise

abstractmethod async wait_for(task_id, timeout)[source]

Wait for a task to complete.

Parameters:
  • task_id (str) – Unique task identifier

  • timeout (float | None) – Maximum seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult when task completes

Raises:

asyncio.TimeoutError – If timeout exceeded

async delete(task_id)[source]

Delete a task result. Optional to implement.

Return type:

bool

Returns:

True if deleted, False if not found

async clear()[source]

Clear all results. Optional to implement.

Return type:

int

Returns:

Number of results cleared

async list_by_status(status, limit=None, offset=0)[source]

List tasks by status with optional pagination.

Optional to implement. Backends that don’t support querying should raise NotImplementedError (default behavior).

Parameters:
  • status (TaskStatus) – Task status to filter by (PENDING, RUNNING, SUCCESS, FAILED, RETRYING)

  • limit (int | None) – Maximum number of results to return (None = no limit)

  • offset (int) – Number of results to skip (for pagination)

Return type:

list[TaskResult]

Returns:

List of TaskResult objects matching the status, ordered by submitted_at DESC (most recent first). Empty list if no matches found.

Raises:

NotImplementedError – If backend doesn’t support status queries

Note

  • Results are ordered by submitted_at DESC (newest first)

  • If submitted_at is None, those tasks are ordered last

  • InMemoryBackend and RedisBackend both support this operation

class flowrra.backends.InMemoryBackend[source]

Bases: BaseResultBackend

In-memory result storage.

Best for: - Development and testing - Single-process applications - When persistence isn’t needed

Limitations: - Results lost on restart - Not shared across processes

__init__()[source]
async store(task_id, result)[source]

Store a task result.

Parameters:
  • task_id (str) – Unique task identifier

  • result (TaskResult) – TaskResult object to store

Return type:

None

async get(task_id)[source]

Retrieve a task result.

Parameters:

task_id (str) – Unique task identifier

Return type:

TaskResult | None

Returns:

TaskResult if found, None otherwise

async wait_for(task_id, timeout)[source]

Wait for a task to complete.

Parameters:
  • task_id (str) – Unique task identifier

  • timeout (float | None) – Maximum seconds to wait (None = wait forever)

Return type:

TaskResult

Returns:

TaskResult when task completes

Raises:

asyncio.TimeoutError – If timeout exceeded

async delete(task_id)[source]

Delete a task result. Optional to implement.

Return type:

bool

Returns:

True if deleted, False if not found

async clear()[source]

Clear all results. Optional to implement.

Return type:

int

Returns:

Number of results cleared

async list_by_status(status, limit=None, offset=0)[source]

List tasks by status with optional pagination.

Return type:

list[TaskResult]

flowrra.backends.get_backend(backend, **kwargs)[source]

Create a backend from a connection string or return existing instance.

This factory function provides a convenient way to create backends from connection strings while maintaining support for custom backend instances. Additional keyword arguments are passed to the backend constructor.

Parameters:
  • backend (BaseResultBackend | str | None) – Either a BaseResultBackend instance, a connection string, or None

  • **kwargs – Additional backend-specific configuration options (ttl, max_connections, etc.)

Return type:

BaseResultBackend

Returns:

BaseResultBackend instance

Raises:

BackendError – If URL scheme is unsupported or backend creation fails

Examples

# Redis connection string (recommended for production) backend = get_backend(“redis://localhost:6379/0”) backend = get_backend(“rediss://localhost:6379/0”, ttl=3600) # With TTL

# None returns InMemoryBackend (internal default) backend = get_backend(None) # Returns InMemoryBackend()

# Custom backend instance passthrough (advanced) backend = get_backend(custom_backend_instance)

class flowrra.backends.RedisBackend(url, ttl=None, **kwargs)[source]

Bases: BaseResultBackend

Redis-based result backend for distributed task execution.

Features: - Cross-process result sharing - Persistent storage - Pub/sub for wait notifications - Automatic connection pooling - TTL support for result expiration

Connection Patterns: - Basic: redis://localhost:6379/0 - With password: redis://:password@localhost:6379/0 - With username: redis://username:password@localhost:6379/0 - SSL: rediss://localhost:6379/0 - Unix socket: unix:///path/to/socket

Parameters:
  • url (str) – Redis connection URL

  • ttl (int | None) – Optional TTL in seconds for result expiration (None = no expiration)

  • **kwargs (Any) – Additional options passed to redis.asyncio.from_url()

Example

backend = RedisBackend(“redis://localhost:6379/0”) backend = RedisBackend(“redis://localhost:6379/0”, ttl=3600) # 1 hour expiration

__init__(url, ttl=None, **kwargs)[source]
async store(task_id, result)[source]

Store a task result.

Parameters:
  • task_id (str) – Unique task identifier

  • result (TaskResult) – TaskResult object to store

Return type:

None

async get(task_id)[source]

Retrieve a task result.

Parameters:

task_id (str) – Unique task identifier

Return type:

TaskResult | None

Returns:

TaskResult if found, None otherwise

async wait_for(task_id, timeout=10)[source]

Wait for a task to complete using Redis pub/sub.

Parameters:
  • task_id (str) – Unique task identifier

  • timeout (float | None) – Maximum seconds to wait (default is 10 sec)

Return type:

TaskResult

Returns:

TaskResult when task completes

Raises:

asyncio.TimeoutError – If timeout exceeded

async delete(task_id)[source]

Delete a task result from Redis.

Parameters:

task_id (str) – Unique task identifier

Return type:

bool

Returns:

True if deleted, False if not found

async clear()[source]

Clear all Flowrra task results from Redis.

Return type:

int

Returns:

Number of results cleared

async list_by_status(status, limit=None, offset=0)[source]

List tasks by status with optional pagination.

Return type:

list[TaskResult]

async close()[source]
Return type:

None

__len__()[source]

Not supported for Redis backend.

Return type:

int