flowrra.brokers package
Submodules
flowrra.brokers.base module
Base broker interface for task queueing.
- class flowrra.brokers.base.BaseBroker[source]
Bases:
ABCAbstract base class for task queue brokers.
Brokers handle task queueing and distribution to workers. Unlike backends (which store results), brokers manage the task queue.
- abstractmethod async push(task)[source]
Push a task to the queue.
- Parameters:
task (
Task) – Task to queue- Return type:
None
- abstractmethod async pop(timeout=None)[source]
Pop a task from the queue.
- Parameters:
timeout (
float|None) – Maximum time to wait for a task (None = wait indefinitely)- Return type:
Task|None- Returns:
Task if available, None if timeout
flowrra.brokers.factory module
Factory for creating broker instances from connection strings.
- flowrra.brokers.factory.get_broker(broker, **kwargs)[source]
Create a broker from a connection string or return existing instance.
This factory function provides a convenient way to create brokers from connection strings while maintaining support for custom broker instances. Additional keyword arguments are passed to the broker constructor.
- Parameters:
broker (
BaseBroker|str|None) – Either a BaseBroker instance, a connection string, or None**kwargs – Additional broker-specific configuration options (max_connections, etc.)
- Return type:
BaseBroker|None- Returns:
BaseBroker instance if provided, None if broker is None
- Raises:
BackendError – If URL scheme is unsupported or broker creation fails
Examples
# Redis connection string broker = get_broker(“redis://localhost:6379/0”) broker = get_broker(“redis://localhost:6379/0”, max_connections=100)
# None returns None (use asyncio.PriorityQueue) broker = get_broker(None) # Returns None
# Custom broker instance passthrough broker = get_broker(custom_broker_instance)
flowrra.brokers.redis module
Redis-based broker for distributed task queueing.
- class flowrra.brokers.redis.RedisBroker(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]
Bases:
BaseBrokerRedis-based task queue broker.
Uses Redis lists (LPUSH/BRPOP) for reliable task queueing. Tasks are serialized as JSON and pushed to a Redis list.
Features: - Cross-process task distribution - Blocking pop with timeout support - Automatic connection pooling - Priority-based queueing (using task priority)
- Parameters:
url (
str) – Redis connection string (e.g., ‘redis://localhost:6379/0’)max_connections (
int) – Maximum connections in pool (default: 50)socket_timeout (
float) – Socket timeout in seconds (default: 5.0)retry_on_timeout (
bool) – Retry on connection timeout (default: True)queue_key (
str) – Redis key for task queue (default: ‘flowrra:queue’)**kwargs (
Any) – Additional options passed to redis.asyncio.from_url()
Example
broker = RedisBroker(‘redis://localhost:6379/0’)
# Push task task = Task(id=’123’, name=’my_task’, args=(1,), kwargs={}) await broker.push(task)
# Pop task (blocking) task = await broker.pop(timeout=5.0)
- __init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]
- async push(task)[source]
Push a task to the Redis queue.
Tasks are serialized as JSON and pushed to the left of the list (LPUSH). Priority is handled by using separate queues or sorting logic.
- Parameters:
task (
Task) – Task to queue- Raises:
BackendError – If Redis operation fails
- Return type:
None
- async pop(timeout=None)[source]
Pop a task from the Redis queue.
Uses BRPOP (blocking right pop) to wait for tasks efficiently.
- Parameters:
timeout (
float|None) – Maximum time to wait in seconds (None = wait indefinitely)- Return type:
Task|None- Returns:
Task if available, None if timeout
- Raises:
BackendError – If Redis operation fails
- async size()[source]
Get the number of tasks in the queue.
Uses LLEN to get the length of the Redis list.
- Return type:
int- Returns:
Number of pending tasks
- Raises:
BackendError – If Redis operation fails
Module contents
Flowrra task queue brokers.
Brokers handle task queueing and distribution (separate from result storage backends).
- Available brokers:
RedisBroker: Distributed task queue using Redis lists
- class flowrra.brokers.BaseBroker[source]
Bases:
ABCAbstract base class for task queue brokers.
Brokers handle task queueing and distribution to workers. Unlike backends (which store results), brokers manage the task queue.
- abstractmethod async push(task)[source]
Push a task to the queue.
- Parameters:
task (
Task) – Task to queue- Return type:
None
- abstractmethod async pop(timeout=None)[source]
Pop a task from the queue.
- Parameters:
timeout (
float|None) – Maximum time to wait for a task (None = wait indefinitely)- Return type:
Task|None- Returns:
Task if available, None if timeout
- class flowrra.brokers.RedisBroker(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]
Bases:
BaseBrokerRedis-based task queue broker.
Uses Redis lists (LPUSH/BRPOP) for reliable task queueing. Tasks are serialized as JSON and pushed to a Redis list.
Features: - Cross-process task distribution - Blocking pop with timeout support - Automatic connection pooling - Priority-based queueing (using task priority)
- Parameters:
url (
str) – Redis connection string (e.g., ‘redis://localhost:6379/0’)max_connections (
int) – Maximum connections in pool (default: 50)socket_timeout (
float) – Socket timeout in seconds (default: 5.0)retry_on_timeout (
bool) – Retry on connection timeout (default: True)queue_key (
str) – Redis key for task queue (default: ‘flowrra:queue’)**kwargs (
Any) – Additional options passed to redis.asyncio.from_url()
Example
broker = RedisBroker(‘redis://localhost:6379/0’)
# Push task task = Task(id=’123’, name=’my_task’, args=(1,), kwargs={}) await broker.push(task)
# Pop task (blocking) task = await broker.pop(timeout=5.0)
- __init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]
- async push(task)[source]
Push a task to the Redis queue.
Tasks are serialized as JSON and pushed to the left of the list (LPUSH). Priority is handled by using separate queues or sorting logic.
- Parameters:
task (
Task) – Task to queue- Raises:
BackendError – If Redis operation fails
- Return type:
None
- async pop(timeout=None)[source]
Pop a task from the Redis queue.
Uses BRPOP (blocking right pop) to wait for tasks efficiently.
- Parameters:
timeout (
float|None) – Maximum time to wait in seconds (None = wait indefinitely)- Return type:
Task|None- Returns:
Task if available, None if timeout
- Raises:
BackendError – If Redis operation fails
- async size()[source]
Get the number of tasks in the queue.
Uses LLEN to get the length of the Redis list.
- Return type:
int- Returns:
Number of pending tasks
- Raises:
BackendError – If Redis operation fails