Source code for flowrra.ui.django_websocket

"""
Django WebSocket Consumer for Flowrra

Provides real-time task updates via WebSocket using Django-Channels.

This module is optional and requires Django-Channels to be installed.
Users who don't want WebSocket support can rely on the frontend's
polling fallback mechanism (15-second intervals).

Usage Example:
    1. Install Django-Channels:
        pip install channels channels-redis

    2. Configure ASGI in your Django settings.py:
        ASGI_APPLICATION = 'your_project.asgi.application'
        CHANNEL_LAYERS = {
            'default': {
                'BACKEND': 'channels_redis.core.RedisChannelLayer',
                'CONFIG': {
                    'hosts': [('127.0.0.1', 6379)],
                },
            },
        }

    3. Create/update your routing.py:
        from channels.routing import ProtocolTypeRouter, URLRouter
        from django.core.asgi import get_asgi_application
        from flowrra.ui.django_websocket import websocket_urlpatterns

        application = ProtocolTypeRouter({
            'http': get_asgi_application(),
            'websocket': URLRouter(websocket_urlpatterns),
        })

    4. Deploy with an ASGI server:
        daphne -b 0.0.0.0 -p 8000 your_project.asgi:application
        # or
        uvicorn your_project.asgi:application --host 0.0.0.0 --port 8000

Without WebSocket setup, the frontend automatically falls back to polling.
"""

import asyncio
import json
import logging
from typing import Optional

try:
    from channels.generic.websocket import AsyncWebsocketConsumer
    from django.urls import path
    CHANNELS_AVAILABLE = True
except ImportError:
    CHANNELS_AVAILABLE = False
    # Provide a dummy base class when channels is not installed
    class AsyncWebsocketConsumer:
        pass


from flowrra.events import event_bus

logger = logging.getLogger(__name__)


[docs] class FlowrraConsumer(AsyncWebsocketConsumer): """ WebSocket consumer for real-time Flowrra task updates. This consumer subscribes to the global event_bus and forwards task update events to connected WebSocket clients. Architecture: - Uses an asyncio.Queue to decouple event_bus from WebSocket send - Subscriber function puts events into the queue - Separate task reads from queue and sends to WebSocket - Prevents blocking the event_bus if WebSocket is slow Events Format: { 'type': 'task.update', 'task': { 'task_id': str, 'status': 'pending|running|success|failed|retrying', 'task_name': str, 'result': Any, 'error': str | None, 'submitted_at': ISO datetime, 'started_at': ISO datetime, 'finished_at': ISO datetime, 'retries': int, # ... additional task fields } } """
[docs] def __init__(self, *args, **kwargs): if not CHANNELS_AVAILABLE: raise ImportError( "Django-Channels is required for WebSocket support. " "Install it with: pip install channels" ) super().__init__(*args, **kwargs) self.queue: Optional[asyncio.Queue] = None self.subscriber: Optional[callable] = None self.forward_task: Optional[asyncio.Task] = None
[docs] async def connect(self): """ Handle WebSocket connection. Creates a queue for this connection, subscribes to the event_bus, and starts a task to forward events from queue to WebSocket. """ await self.accept() self.queue = asyncio.Queue() async def subscriber(event: dict): if self.queue: await self.queue.put(event) self.subscriber = subscriber event_bus.subscribe(subscriber) self.forward_task = asyncio.create_task(self._forward_events()) logger.info("WebSocket connected: %s", self.channel_name)
[docs] async def disconnect(self, close_code): """ Handle WebSocket disconnection. Stops the forwarding task, unsubscribes from event_bus, and cleans up resources. """ if self.forward_task and not self.forward_task.done(): self.forward_task.cancel() try: await self.forward_task except asyncio.CancelledError: pass if self.subscriber and self.subscriber in event_bus._subscribers: event_bus._subscribers.remove(self.subscriber) self.queue = None self.subscriber = None self.forward_task = None logger.info("WebSocket disconnected: %s (code: %s)", self.channel_name, close_code)
async def _forward_events(self): """ Forward events from queue to WebSocket. This runs in a separate task to decouple event_bus emission from WebSocket send operations. """ try: while True: event = await self.queue.get() await self.send(text_data=json.dumps(event)) except asyncio.CancelledError: # Normal cancellation during disconnect pass except Exception as e: logger.error("Error forwarding event: %s", e, exc_info=True)
if CHANNELS_AVAILABLE: websocket_urlpatterns = [ path('flowrra/ws', FlowrraConsumer.as_asgi()), ] else: # Provide empty list when channels is not available websocket_urlpatterns = [] __all__ = ['FlowrraConsumer', 'websocket_urlpatterns', 'CHANNELS_AVAILABLE']