flowrra.backends package
Submodules
flowrra.backends.base module
- class flowrra.backends.base.BaseResultBackend[source]
Bases:
ABCAbstract base for result storage backends.
Implementations must provide: - store(): Persist a task result - get(): Retrieve a task result by ID - wait_for(): Block until a task completes
- abstractmethod async store(task_id, result)[source]
Store a task result.
- Parameters:
task_id (
str) – Unique task identifierresult (
TaskResult) – TaskResult object to store
- Return type:
None
- abstractmethod async get(task_id)[source]
Retrieve a task result.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
TaskResult|None- Returns:
TaskResult if found, None otherwise
- abstractmethod async wait_for(task_id, timeout)[source]
Wait for a task to complete.
- Parameters:
task_id (
str) – Unique task identifiertimeout (
float|None) – Maximum seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult when task completes
- Raises:
asyncio.TimeoutError – If timeout exceeded
- async delete(task_id)[source]
Delete a task result. Optional to implement.
- Return type:
bool- Returns:
True if deleted, False if not found
- async clear()[source]
Clear all results. Optional to implement.
- Return type:
int- Returns:
Number of results cleared
- async list_by_status(status, limit=None, offset=0)[source]
List tasks by status with optional pagination.
Optional to implement. Backends that don’t support querying should raise NotImplementedError (default behavior).
- Parameters:
status (
TaskStatus) – Task status to filter by (PENDING, RUNNING, SUCCESS, FAILED, RETRYING)limit (
int|None) – Maximum number of results to return (None = no limit)offset (
int) – Number of results to skip (for pagination)
- Return type:
list[TaskResult]- Returns:
List of TaskResult objects matching the status, ordered by submitted_at DESC (most recent first). Empty list if no matches found.
- Raises:
NotImplementedError – If backend doesn’t support status queries
Note
Results are ordered by submitted_at DESC (newest first)
If submitted_at is None, those tasks are ordered last
InMemoryBackend and RedisBackend both support this operation
flowrra.backends.factory module
Backend factory for creating backends from connection strings.
- flowrra.backends.factory.get_backend(backend, **kwargs)[source]
Create a backend from a connection string or return existing instance.
This factory function provides a convenient way to create backends from connection strings while maintaining support for custom backend instances. Additional keyword arguments are passed to the backend constructor.
- Parameters:
backend (
BaseResultBackend|str|None) – Either a BaseResultBackend instance, a connection string, or None**kwargs – Additional backend-specific configuration options (ttl, max_connections, etc.)
- Return type:
- Returns:
BaseResultBackend instance
- Raises:
BackendError – If URL scheme is unsupported or backend creation fails
Examples
# Redis connection string (recommended for production) backend = get_backend(“redis://localhost:6379/0”) backend = get_backend(“rediss://localhost:6379/0”, ttl=3600) # With TTL
# None returns InMemoryBackend (internal default) backend = get_backend(None) # Returns InMemoryBackend()
# Custom backend instance passthrough (advanced) backend = get_backend(custom_backend_instance)
flowrra.backends.memory module
In-memory result backend for single-process use.
- class flowrra.backends.memory.InMemoryBackend[source]
Bases:
BaseResultBackendIn-memory result storage.
Best for: - Development and testing - Single-process applications - When persistence isn’t needed
Limitations: - Results lost on restart - Not shared across processes
- async store(task_id, result)[source]
Store a task result.
- Parameters:
task_id (
str) – Unique task identifierresult (
TaskResult) – TaskResult object to store
- Return type:
None
- async get(task_id)[source]
Retrieve a task result.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
TaskResult|None- Returns:
TaskResult if found, None otherwise
- async wait_for(task_id, timeout)[source]
Wait for a task to complete.
- Parameters:
task_id (
str) – Unique task identifiertimeout (
float|None) – Maximum seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult when task completes
- Raises:
asyncio.TimeoutError – If timeout exceeded
- async delete(task_id)[source]
Delete a task result. Optional to implement.
- Return type:
bool- Returns:
True if deleted, False if not found
- async clear()[source]
Clear all results. Optional to implement.
- Return type:
int- Returns:
Number of results cleared
- async list_by_status(status, limit=None, offset=0)[source]
List tasks by status with optional pagination.
- Return type:
list[TaskResult]
flowrra.backends.redis module
Redis-based result backend for distributed task execution.
- class flowrra.backends.redis.RedisBackend(url, ttl=None, **kwargs)[source]
Bases:
BaseResultBackendRedis-based result backend for distributed task execution.
Features: - Cross-process result sharing - Persistent storage - Pub/sub for wait notifications - Automatic connection pooling - TTL support for result expiration
Connection Patterns: - Basic: redis://localhost:6379/0 - With password: redis://:password@localhost:6379/0 - With username: redis://username:password@localhost:6379/0 - SSL: rediss://localhost:6379/0 - Unix socket: unix:///path/to/socket
- Parameters:
url (
str) – Redis connection URLttl (
int|None) – Optional TTL in seconds for result expiration (None = no expiration)**kwargs (
Any) – Additional options passed to redis.asyncio.from_url()
Example
backend = RedisBackend(“redis://localhost:6379/0”) backend = RedisBackend(“redis://localhost:6379/0”, ttl=3600) # 1 hour expiration
- async store(task_id, result)[source]
Store a task result.
- Parameters:
task_id (
str) – Unique task identifierresult (
TaskResult) – TaskResult object to store
- Return type:
None
- async get(task_id)[source]
Retrieve a task result.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
TaskResult|None- Returns:
TaskResult if found, None otherwise
- async wait_for(task_id, timeout=10)[source]
Wait for a task to complete using Redis pub/sub.
- Parameters:
task_id (
str) – Unique task identifiertimeout (
float|None) – Maximum seconds to wait (default is 10 sec)
- Return type:
- Returns:
TaskResult when task completes
- Raises:
asyncio.TimeoutError – If timeout exceeded
- async delete(task_id)[source]
Delete a task result from Redis.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
bool- Returns:
True if deleted, False if not found
- async clear()[source]
Clear all Flowrra task results from Redis.
- Return type:
int- Returns:
Number of results cleared
- async list_by_status(status, limit=None, offset=0)[source]
List tasks by status with optional pagination.
- Return type:
list[TaskResult]
Module contents
Flowrra result storage backends.
- Available backends:
InMemoryBackend: Default, single-process, non-persistent (internal use only)
RedisBackend: Distributed, persistent (supports connection strings)
- Factory function:
get_backend(): Create Redis backend from connection string or passthrough any backend instance
- Usage:
from flowrra import IOExecutor, CPUExecutor, Config, ExecutorConfig, BackendConfig
# IOExecutor: No backend needed (InMemoryBackend used internally) config = Config(executor=ExecutorConfig(io_workers=4)) io_executor = IOExecutor(config=config)
# CPUExecutor: Redis connection string (recommended for production) config = Config(
backend=BackendConfig(url=”redis://localhost:6379/0”), executor=ExecutorConfig(cpu_workers=4)
) cpu_executor = CPUExecutor(config=config)
- Creating custom backends:
Subclass BaseResultBackend and implement: - store(task_id, result) - get(task_id) -> TaskResult | None - wait_for(task_id, timeout) -> TaskResult
- Example:
- class PostgresBackend(BaseResultBackend):
async def store(self, task_id, result): … async def get(self, task_id): … async def wait_for(self, task_id, timeout): …
- class flowrra.backends.BaseResultBackend[source]
Bases:
ABCAbstract base for result storage backends.
Implementations must provide: - store(): Persist a task result - get(): Retrieve a task result by ID - wait_for(): Block until a task completes
- abstractmethod async store(task_id, result)[source]
Store a task result.
- Parameters:
task_id (
str) – Unique task identifierresult (
TaskResult) – TaskResult object to store
- Return type:
None
- abstractmethod async get(task_id)[source]
Retrieve a task result.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
TaskResult|None- Returns:
TaskResult if found, None otherwise
- abstractmethod async wait_for(task_id, timeout)[source]
Wait for a task to complete.
- Parameters:
task_id (
str) – Unique task identifiertimeout (
float|None) – Maximum seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult when task completes
- Raises:
asyncio.TimeoutError – If timeout exceeded
- async delete(task_id)[source]
Delete a task result. Optional to implement.
- Return type:
bool- Returns:
True if deleted, False if not found
- async clear()[source]
Clear all results. Optional to implement.
- Return type:
int- Returns:
Number of results cleared
- async list_by_status(status, limit=None, offset=0)[source]
List tasks by status with optional pagination.
Optional to implement. Backends that don’t support querying should raise NotImplementedError (default behavior).
- Parameters:
status (
TaskStatus) – Task status to filter by (PENDING, RUNNING, SUCCESS, FAILED, RETRYING)limit (
int|None) – Maximum number of results to return (None = no limit)offset (
int) – Number of results to skip (for pagination)
- Return type:
list[TaskResult]- Returns:
List of TaskResult objects matching the status, ordered by submitted_at DESC (most recent first). Empty list if no matches found.
- Raises:
NotImplementedError – If backend doesn’t support status queries
Note
Results are ordered by submitted_at DESC (newest first)
If submitted_at is None, those tasks are ordered last
InMemoryBackend and RedisBackend both support this operation
- class flowrra.backends.InMemoryBackend[source]
Bases:
BaseResultBackendIn-memory result storage.
Best for: - Development and testing - Single-process applications - When persistence isn’t needed
Limitations: - Results lost on restart - Not shared across processes
- async store(task_id, result)[source]
Store a task result.
- Parameters:
task_id (
str) – Unique task identifierresult (
TaskResult) – TaskResult object to store
- Return type:
None
- async get(task_id)[source]
Retrieve a task result.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
TaskResult|None- Returns:
TaskResult if found, None otherwise
- async wait_for(task_id, timeout)[source]
Wait for a task to complete.
- Parameters:
task_id (
str) – Unique task identifiertimeout (
float|None) – Maximum seconds to wait (None = wait forever)
- Return type:
- Returns:
TaskResult when task completes
- Raises:
asyncio.TimeoutError – If timeout exceeded
- async delete(task_id)[source]
Delete a task result. Optional to implement.
- Return type:
bool- Returns:
True if deleted, False if not found
- async clear()[source]
Clear all results. Optional to implement.
- Return type:
int- Returns:
Number of results cleared
- async list_by_status(status, limit=None, offset=0)[source]
List tasks by status with optional pagination.
- Return type:
list[TaskResult]
- flowrra.backends.get_backend(backend, **kwargs)[source]
Create a backend from a connection string or return existing instance.
This factory function provides a convenient way to create backends from connection strings while maintaining support for custom backend instances. Additional keyword arguments are passed to the backend constructor.
- Parameters:
backend (
BaseResultBackend|str|None) – Either a BaseResultBackend instance, a connection string, or None**kwargs – Additional backend-specific configuration options (ttl, max_connections, etc.)
- Return type:
- Returns:
BaseResultBackend instance
- Raises:
BackendError – If URL scheme is unsupported or backend creation fails
Examples
# Redis connection string (recommended for production) backend = get_backend(“redis://localhost:6379/0”) backend = get_backend(“rediss://localhost:6379/0”, ttl=3600) # With TTL
# None returns InMemoryBackend (internal default) backend = get_backend(None) # Returns InMemoryBackend()
# Custom backend instance passthrough (advanced) backend = get_backend(custom_backend_instance)
- class flowrra.backends.RedisBackend(url, ttl=None, **kwargs)[source]
Bases:
BaseResultBackendRedis-based result backend for distributed task execution.
Features: - Cross-process result sharing - Persistent storage - Pub/sub for wait notifications - Automatic connection pooling - TTL support for result expiration
Connection Patterns: - Basic: redis://localhost:6379/0 - With password: redis://:password@localhost:6379/0 - With username: redis://username:password@localhost:6379/0 - SSL: rediss://localhost:6379/0 - Unix socket: unix:///path/to/socket
- Parameters:
url (
str) – Redis connection URLttl (
int|None) – Optional TTL in seconds for result expiration (None = no expiration)**kwargs (
Any) – Additional options passed to redis.asyncio.from_url()
Example
backend = RedisBackend(“redis://localhost:6379/0”) backend = RedisBackend(“redis://localhost:6379/0”, ttl=3600) # 1 hour expiration
- async store(task_id, result)[source]
Store a task result.
- Parameters:
task_id (
str) – Unique task identifierresult (
TaskResult) – TaskResult object to store
- Return type:
None
- async get(task_id)[source]
Retrieve a task result.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
TaskResult|None- Returns:
TaskResult if found, None otherwise
- async wait_for(task_id, timeout=10)[source]
Wait for a task to complete using Redis pub/sub.
- Parameters:
task_id (
str) – Unique task identifiertimeout (
float|None) – Maximum seconds to wait (default is 10 sec)
- Return type:
- Returns:
TaskResult when task completes
- Raises:
asyncio.TimeoutError – If timeout exceeded
- async delete(task_id)[source]
Delete a task result from Redis.
- Parameters:
task_id (
str) – Unique task identifier- Return type:
bool- Returns:
True if deleted, False if not found
- async clear()[source]
Clear all Flowrra task results from Redis.
- Return type:
int- Returns:
Number of results cleared
- async list_by_status(status, limit=None, offset=0)[source]
List tasks by status with optional pagination.
- Return type:
list[TaskResult]