Source code for flowrra.events

import asyncio
import logging
from typing import Callable, Awaitable

logger = logging.getLogger(__name__)


[docs] class EventBus: """ Thread-safe event bus for broadcasting task events. This implementation uses asyncio.Lock for thread-safe subscriber management and emits events concurrently to all subscribers with error isolation. Features: - Thread-safe subscribe/unsubscribe operations - Concurrent event emission to all subscribers - Error isolation (one subscriber's error doesn't affect others) - Async-safe list iteration Example: async def my_subscriber(event: dict): print(f"Received: {event}") event_bus.subscribe(my_subscriber) await event_bus.emit({'type': 'task.update', 'task': {...}}) await event_bus.unsubscribe(my_subscriber) """
[docs] def __init__(self): self._subscribers: list[Callable[[dict], Awaitable[None]]] = [] self._lock = asyncio.Lock()
[docs] def subscribe(self, fn: Callable[[dict], Awaitable[None]]): """ Subscribe to events (sync method for backward compatibility). Note: This is synchronous but modifies shared state. For async contexts, consider using async_subscribe instead. Args: fn: Async function that receives event dict """ self._subscribers.append(fn)
[docs] async def async_subscribe(self, fn: Callable[[dict], Awaitable[None]]): """ Subscribe to events (async-safe). Args: fn: Async function that receives event dict """ async with self._lock: self._subscribers.append(fn)
[docs] async def unsubscribe(self, fn: Callable[[dict], Awaitable[None]]): """ Unsubscribe from events. Args: fn: The subscriber function to remove """ async with self._lock: if fn in self._subscribers: self._subscribers.remove(fn)
[docs] async def emit(self, event: dict): """ Emit event to all subscribers concurrently. Events are delivered to all subscribers in parallel using asyncio.gather. If a subscriber raises an exception, it's logged but doesn't affect others. Args: event: Event dict to broadcast """ # Get snapshot of subscribers under lock to avoid modification during iteration async with self._lock: subscribers = list(self._subscribers) # Emit to all subscribers concurrently if subscribers: tasks = [ asyncio.create_task(self._safe_emit(fn, event)) for fn in subscribers ] await asyncio.gather(*tasks, return_exceptions=True)
async def _safe_emit(self, fn: Callable[[dict], Awaitable[None]], event: dict): """ Safely emit event to a single subscriber with error handling. Args: fn: Subscriber function event: Event dict """ try: await fn(event) except Exception as e: # Log error but don't crash - isolate subscriber failures logger.error("Error in subscriber %s: %s", fn.__name__, e, exc_info=True)
event_bus = EventBus()