Source code for flowrra.ui.quart_websocket

"""
Quart WebSocket Support for Flowrra

Provides real-time task updates via WebSocket for Flask/Quart applications.

Note: This requires Quart, which is an async-compatible reimplementation of Flask.
Standard Flask does not support WebSockets natively.

Installation:
    pip install quart

Migration from Flask to Quart:
    Quart is API-compatible with Flask, making migration straightforward:
    - Replace 'from flask import Flask' with 'from quart import Quart'
    - Replace 'flask run' with 'quart run' or use hypercorn/uvicorn
    - Most Flask extensions work with Quart equivalents

Usage Example:
    from quart import Quart
    from flowrra import Flowrra
    from flowrra.ui.quart_websocket import setup_websocket

    app = Quart(__name__)
    flowrra = Flowrra.from_urls()

    # Setup WebSocket endpoint
    setup_websocket(app)

    # Run with ASGI server
    # quart run
    # or: hypercorn app:app
    # or: uvicorn app:app

Without Quart/WebSocket, the frontend falls back to polling every 15 seconds.
"""

import asyncio
import json
import logging
from typing import Optional

try:
    from quart import websocket, Quart
    QUART_AVAILABLE = True
except ImportError:
    QUART_AVAILABLE = False
    websocket = None
    Quart = None

from flowrra.events import event_bus

logger = logging.getLogger(__name__)


[docs] async def websocket_endpoint(): """ Quart WebSocket endpoint for Flowrra real-time task updates. This function should be registered as a WebSocket route in your Quart app. Architecture: - Uses an asyncio.Queue to decouple event_bus from WebSocket send - Subscriber function puts events into the queue - Separate task reads from queue and sends to WebSocket - Prevents blocking the event_bus if WebSocket is slow Events Format: { 'type': 'task.update', 'task': { 'task_id': str, 'status': 'pending|running|success|failed|retrying', 'task_name': str, 'result': Any, 'error': str | None, 'submitted_at': ISO datetime, 'started_at': ISO datetime, 'finished_at': ISO datetime, 'retries': int, # ... additional task fields } } Raises: ImportError: If Quart is not installed """ if not QUART_AVAILABLE: raise ImportError( "Quart is required for WebSocket support. " "Install it with: pip install quart" ) queue: asyncio.Queue[dict] = asyncio.Queue() async def subscriber(event: dict): await queue.put(event) event_bus.subscribe(subscriber) # Create task to forward events from queue to WebSocket async def forward_events(): try: while True: event = await queue.get() await websocket.send(json.dumps(event)) except asyncio.CancelledError: pass except Exception as e: logger.error("Error forwarding event: %s", e, exc_info=True) forward_task = asyncio.create_task(forward_events()) logger.info("WebSocket connected") try: while True: try: await websocket.receive() except Exception: break finally: forward_task.cancel() try: await forward_task except asyncio.CancelledError: pass if subscriber in event_bus._subscribers: event_bus._subscribers.remove(subscriber) logger.info("WebSocket disconnected")
[docs] def setup_websocket(app: 'Quart', path: str = '/flowrra/ws'): """ Setup WebSocket endpoint in a Quart application. This is a convenience function to register the WebSocket route. Args: app: Quart application instance path: WebSocket endpoint path (default: '/flowrra/ws') Example: from quart import Quart from flowrra.ui.quart_websocket import setup_websocket app = Quart(__name__) setup_websocket(app) Raises: ImportError: If Quart is not installed """ if not QUART_AVAILABLE: raise ImportError( "Quart is required for WebSocket support. " "Install it with: pip install quart" ) @app.websocket(path) async def flowrra_ws(): await websocket_endpoint() logger.info("WebSocket endpoint registered at %s", path)
__all__ = ['websocket_endpoint', 'setup_websocket', 'QUART_AVAILABLE']