Source code for flowrra.brokers.redis

"""Redis-based broker for distributed task queueing."""

import json
import asyncio
from typing import Any

try:
    import redis.asyncio as redis
except ImportError:
    redis = None

from flowrra.brokers.base import BaseBroker
from flowrra.task import Task
from flowrra.exceptions import BackendError


[docs] class RedisBroker(BaseBroker): """Redis-based task queue broker. Uses Redis lists (LPUSH/BRPOP) for reliable task queueing. Tasks are serialized as JSON and pushed to a Redis list. Features: - Cross-process task distribution - Blocking pop with timeout support - Automatic connection pooling - Priority-based queueing (using task priority) Args: url: Redis connection string (e.g., 'redis://localhost:6379/0') max_connections: Maximum connections in pool (default: 50) socket_timeout: Socket timeout in seconds (default: 5.0) retry_on_timeout: Retry on connection timeout (default: True) queue_key: Redis key for task queue (default: 'flowrra:queue') **kwargs: Additional options passed to redis.asyncio.from_url() Example: broker = RedisBroker('redis://localhost:6379/0') # Push task task = Task(id='123', name='my_task', args=(1,), kwargs={}) await broker.push(task) # Pop task (blocking) task = await broker.pop(timeout=5.0) """
[docs] def __init__( self, url: str, max_connections: int = 50, socket_timeout: float = 5.0, retry_on_timeout: bool = True, queue_key: str = "flowrra:queue", **kwargs: Any ): if redis is None: raise ImportError( "Redis broker requires redis package. " "Install with: pip install flowrra[redis]" ) self._url = url self._max_connections = max_connections self._socket_timeout = socket_timeout self._retry_on_timeout = retry_on_timeout self._queue_key = queue_key self._redis: redis.Redis | None = None self._kwargs = kwargs
async def _ensure_connected(self) -> None: """Lazy connection initialization.""" if self._redis is None: try: self._redis = await redis.from_url( self._url, decode_responses=True, max_connections=self._max_connections, socket_timeout=self._socket_timeout, retry_on_timeout=self._retry_on_timeout, **self._kwargs ) except Exception as e: raise BackendError(f"Failed to connect to Redis broker: {e}") from e
[docs] async def push(self, task: Task) -> None: """Push a task to the Redis queue. Tasks are serialized as JSON and pushed to the left of the list (LPUSH). Priority is handled by using separate queues or sorting logic. Args: task: Task to queue Raises: BackendError: If Redis operation fails """ await self._ensure_connected() try: task_data = { "id": task.id, "name": task.name, "args": task.args, "kwargs": task.kwargs, "max_retries": task.max_retries, "retry_delay": task.retry_delay, "current_retry": task.current_retry, "priority": task.priority, "cpu_bound": task.cpu_bound, } task_json = json.dumps(task_data) await self._redis.lpush(self._queue_key, task_json) except Exception as e: raise BackendError(f"Failed to push task to Redis queue: {e}") from e
[docs] async def pop(self, timeout: float | None = None) -> Task | None: """Pop a task from the Redis queue. Uses BRPOP (blocking right pop) to wait for tasks efficiently. Args: timeout: Maximum time to wait in seconds (None = wait indefinitely) Returns: Task if available, None if timeout Raises: BackendError: If Redis operation fails """ await self._ensure_connected() try: # timeout=0 means wait indefinitely redis_timeout = int(timeout) if timeout is not None else 0 result = await self._redis.brpop(self._queue_key, timeout=redis_timeout) if result is None: return None _, task_json = result task_data = json.loads(task_json) task = Task( id=task_data["id"], name=task_data["name"], args=tuple(task_data["args"]), kwargs=task_data["kwargs"], max_retries=task_data["max_retries"], retry_delay=task_data["retry_delay"], current_retry=task_data["current_retry"], priority=task_data["priority"], cpu_bound=task_data.get("cpu_bound", False), # Default to False for backward compatibility ) return task except asyncio.TimeoutError: return None except Exception as e: raise BackendError(f"Failed to pop task from Redis queue: {e}") from e
[docs] async def size(self) -> int: """Get the number of tasks in the queue. Uses LLEN to get the length of the Redis list. Returns: Number of pending tasks Raises: BackendError: If Redis operation fails """ await self._ensure_connected() try: size = await self._redis.llen(self._queue_key) return size except Exception as e: raise BackendError(f"Failed to get queue size from Redis: {e}") from e
[docs] async def close(self) -> None: """Close Redis connections.""" if self._redis: await self._redis.aclose() self._redis = None