flowrra.brokers package

Submodules

flowrra.brokers.base module

Base broker interface for task queueing.

class flowrra.brokers.base.BaseBroker[source]

Bases: ABC

Abstract base class for task queue brokers.

Brokers handle task queueing and distribution to workers. Unlike backends (which store results), brokers manage the task queue.

abstractmethod async push(task)[source]

Push a task to the queue.

Parameters:

task (Task) – Task to queue

Return type:

None

abstractmethod async pop(timeout=None)[source]

Pop a task from the queue.

Parameters:

timeout (float | None) – Maximum time to wait for a task (None = wait indefinitely)

Return type:

Task | None

Returns:

Task if available, None if timeout

abstractmethod async size()[source]

Get the number of tasks in the queue.

Return type:

int

Returns:

Number of pending tasks

async close()[source]

Close broker connections and cleanup resources.

Optional method for brokers that need cleanup.

Return type:

None

flowrra.brokers.factory module

Factory for creating broker instances from connection strings.

flowrra.brokers.factory.get_broker(broker, **kwargs)[source]

Create a broker from a connection string or return existing instance.

This factory function provides a convenient way to create brokers from connection strings while maintaining support for custom broker instances. Additional keyword arguments are passed to the broker constructor.

Parameters:
  • broker (BaseBroker | str | None) – Either a BaseBroker instance, a connection string, or None

  • **kwargs – Additional broker-specific configuration options (max_connections, etc.)

Return type:

BaseBroker | None

Returns:

BaseBroker instance if provided, None if broker is None

Raises:

BackendError – If URL scheme is unsupported or broker creation fails

Examples

# Redis connection string broker = get_broker(“redis://localhost:6379/0”) broker = get_broker(“redis://localhost:6379/0”, max_connections=100)

# None returns None (use asyncio.PriorityQueue) broker = get_broker(None) # Returns None

# Custom broker instance passthrough broker = get_broker(custom_broker_instance)

flowrra.brokers.redis module

Redis-based broker for distributed task queueing.

class flowrra.brokers.redis.RedisBroker(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]

Bases: BaseBroker

Redis-based task queue broker.

Uses Redis lists (LPUSH/BRPOP) for reliable task queueing. Tasks are serialized as JSON and pushed to a Redis list.

Features: - Cross-process task distribution - Blocking pop with timeout support - Automatic connection pooling - Priority-based queueing (using task priority)

Parameters:
  • url (str) – Redis connection string (e.g., ‘redis://localhost:6379/0’)

  • max_connections (int) – Maximum connections in pool (default: 50)

  • socket_timeout (float) – Socket timeout in seconds (default: 5.0)

  • retry_on_timeout (bool) – Retry on connection timeout (default: True)

  • queue_key (str) – Redis key for task queue (default: ‘flowrra:queue’)

  • **kwargs (Any) – Additional options passed to redis.asyncio.from_url()

Example

broker = RedisBroker(‘redis://localhost:6379/0’)

# Push task task = Task(id=’123’, name=’my_task’, args=(1,), kwargs={}) await broker.push(task)

# Pop task (blocking) task = await broker.pop(timeout=5.0)

__init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]
async push(task)[source]

Push a task to the Redis queue.

Tasks are serialized as JSON and pushed to the left of the list (LPUSH). Priority is handled by using separate queues or sorting logic.

Parameters:

task (Task) – Task to queue

Raises:

BackendError – If Redis operation fails

Return type:

None

async pop(timeout=None)[source]

Pop a task from the Redis queue.

Uses BRPOP (blocking right pop) to wait for tasks efficiently.

Parameters:

timeout (float | None) – Maximum time to wait in seconds (None = wait indefinitely)

Return type:

Task | None

Returns:

Task if available, None if timeout

Raises:

BackendError – If Redis operation fails

async size()[source]

Get the number of tasks in the queue.

Uses LLEN to get the length of the Redis list.

Return type:

int

Returns:

Number of pending tasks

Raises:

BackendError – If Redis operation fails

async close()[source]

Close Redis connections.

Return type:

None

Module contents

Flowrra task queue brokers.

Brokers handle task queueing and distribution (separate from result storage backends).

Available brokers:
  • RedisBroker: Distributed task queue using Redis lists

class flowrra.brokers.BaseBroker[source]

Bases: ABC

Abstract base class for task queue brokers.

Brokers handle task queueing and distribution to workers. Unlike backends (which store results), brokers manage the task queue.

abstractmethod async push(task)[source]

Push a task to the queue.

Parameters:

task (Task) – Task to queue

Return type:

None

abstractmethod async pop(timeout=None)[source]

Pop a task from the queue.

Parameters:

timeout (float | None) – Maximum time to wait for a task (None = wait indefinitely)

Return type:

Task | None

Returns:

Task if available, None if timeout

abstractmethod async size()[source]

Get the number of tasks in the queue.

Return type:

int

Returns:

Number of pending tasks

async close()[source]

Close broker connections and cleanup resources.

Optional method for brokers that need cleanup.

Return type:

None

class flowrra.brokers.RedisBroker(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]

Bases: BaseBroker

Redis-based task queue broker.

Uses Redis lists (LPUSH/BRPOP) for reliable task queueing. Tasks are serialized as JSON and pushed to a Redis list.

Features: - Cross-process task distribution - Blocking pop with timeout support - Automatic connection pooling - Priority-based queueing (using task priority)

Parameters:
  • url (str) – Redis connection string (e.g., ‘redis://localhost:6379/0’)

  • max_connections (int) – Maximum connections in pool (default: 50)

  • socket_timeout (float) – Socket timeout in seconds (default: 5.0)

  • retry_on_timeout (bool) – Retry on connection timeout (default: True)

  • queue_key (str) – Redis key for task queue (default: ‘flowrra:queue’)

  • **kwargs (Any) – Additional options passed to redis.asyncio.from_url()

Example

broker = RedisBroker(‘redis://localhost:6379/0’)

# Push task task = Task(id=’123’, name=’my_task’, args=(1,), kwargs={}) await broker.push(task)

# Pop task (blocking) task = await broker.pop(timeout=5.0)

__init__(url, max_connections=50, socket_timeout=5.0, retry_on_timeout=True, queue_key='flowrra:queue', **kwargs)[source]
async push(task)[source]

Push a task to the Redis queue.

Tasks are serialized as JSON and pushed to the left of the list (LPUSH). Priority is handled by using separate queues or sorting logic.

Parameters:

task (Task) – Task to queue

Raises:

BackendError – If Redis operation fails

Return type:

None

async pop(timeout=None)[source]

Pop a task from the Redis queue.

Uses BRPOP (blocking right pop) to wait for tasks efficiently.

Parameters:

timeout (float | None) – Maximum time to wait in seconds (None = wait indefinitely)

Return type:

Task | None

Returns:

Task if available, None if timeout

Raises:

BackendError – If Redis operation fails

async size()[source]

Get the number of tasks in the queue.

Uses LLEN to get the length of the Redis list.

Return type:

int

Returns:

Number of pending tasks

Raises:

BackendError – If Redis operation fails

async close()[source]

Close Redis connections.

Return type:

None