flowrra.ui package

Subpackages

Submodules

flowrra.ui.django module

flowrra.ui.django_websocket module

Django WebSocket Consumer for Flowrra

Provides real-time task updates via WebSocket using Django-Channels.

This module is optional and requires Django-Channels to be installed. Users who don’t want WebSocket support can rely on the frontend’s polling fallback mechanism (15-second intervals).

Usage Example:
  1. Install Django-Channels:

    pip install channels channels-redis

  2. Configure ASGI in your Django settings.py:

    ASGI_APPLICATION = ‘your_project.asgi.application’ CHANNEL_LAYERS = {

    ‘default’: {

    ‘BACKEND’: ‘channels_redis.core.RedisChannelLayer’, ‘CONFIG’: {

    ‘hosts’: [(‘127.0.0.1’, 6379)],

    },

    },

    }

  3. Create/update your routing.py:

    from channels.routing import ProtocolTypeRouter, URLRouter from django.core.asgi import get_asgi_application from flowrra.ui.django_websocket import websocket_urlpatterns

    application = ProtocolTypeRouter({

    ‘http’: get_asgi_application(), ‘websocket’: URLRouter(websocket_urlpatterns),

    })

  4. Deploy with an ASGI server:

    daphne -b 0.0.0.0 -p 8000 your_project.asgi:application # or uvicorn your_project.asgi:application –host 0.0.0.0 –port 8000

Without WebSocket setup, the frontend automatically falls back to polling.

class flowrra.ui.django_websocket.FlowrraConsumer(*args, **kwargs)[source]

Bases: AsyncWebsocketConsumer

WebSocket consumer for real-time Flowrra task updates.

This consumer subscribes to the global event_bus and forwards task update events to connected WebSocket clients.

Architecture:
  • Uses an asyncio.Queue to decouple event_bus from WebSocket send

  • Subscriber function puts events into the queue

  • Separate task reads from queue and sends to WebSocket

  • Prevents blocking the event_bus if WebSocket is slow

Events Format:
{

‘type’: ‘task.update’, ‘task’: {

‘task_id’: str, ‘status’: ‘pending|running|success|failed|retrying’, ‘task_name’: str, ‘result’: Any, ‘error’: str | None, ‘submitted_at’: ISO datetime, ‘started_at’: ISO datetime, ‘finished_at’: ISO datetime, ‘retries’: int, # … additional task fields

}

}

__init__(*args, **kwargs)[source]
async connect()[source]

Handle WebSocket connection.

Creates a queue for this connection, subscribes to the event_bus, and starts a task to forward events from queue to WebSocket.

async disconnect(close_code)[source]

Handle WebSocket disconnection.

Stops the forwarding task, unsubscribes from event_bus, and cleans up resources.

flowrra.ui.fastapi module

flowrra.ui.flask module

flowrra.ui.quart_websocket module

Quart WebSocket Support for Flowrra

Provides real-time task updates via WebSocket for Flask/Quart applications.

Note: This requires Quart, which is an async-compatible reimplementation of Flask. Standard Flask does not support WebSockets natively.

Installation:

pip install quart

Migration from Flask to Quart:

Quart is API-compatible with Flask, making migration straightforward: - Replace ‘from flask import Flask’ with ‘from quart import Quart’ - Replace ‘flask run’ with ‘quart run’ or use hypercorn/uvicorn - Most Flask extensions work with Quart equivalents

Usage Example:

from quart import Quart from flowrra import Flowrra from flowrra.ui.quart_websocket import setup_websocket

app = Quart(__name__) flowrra = Flowrra.from_urls()

# Setup WebSocket endpoint setup_websocket(app)

# Run with ASGI server # quart run # or: hypercorn app:app # or: uvicorn app:app

Without Quart/WebSocket, the frontend falls back to polling every 15 seconds.

async flowrra.ui.quart_websocket.websocket_endpoint()[source]

Quart WebSocket endpoint for Flowrra real-time task updates.

This function should be registered as a WebSocket route in your Quart app.

Architecture:
  • Uses an asyncio.Queue to decouple event_bus from WebSocket send

  • Subscriber function puts events into the queue

  • Separate task reads from queue and sends to WebSocket

  • Prevents blocking the event_bus if WebSocket is slow

Events Format:
{

‘type’: ‘task.update’, ‘task’: {

‘task_id’: str, ‘status’: ‘pending|running|success|failed|retrying’, ‘task_name’: str, ‘result’: Any, ‘error’: str | None, ‘submitted_at’: ISO datetime, ‘started_at’: ISO datetime, ‘finished_at’: ISO datetime, ‘retries’: int, # … additional task fields

}

}

Raises:

ImportError – If Quart is not installed

flowrra.ui.quart_websocket.setup_websocket(app, path='/flowrra/ws')[source]

Setup WebSocket endpoint in a Quart application.

This is a convenience function to register the WebSocket route.

Parameters:
  • app (None) – Quart application instance

  • path (str) – WebSocket endpoint path (default: ‘/flowrra/ws’)

Example

from quart import Quart from flowrra.ui.quart_websocket import setup_websocket

app = Quart(__name__) setup_websocket(app)

Raises:

ImportError – If Quart is not installed

Module contents

UI adapters for mounting Flowrra into web frameworks.

This module provides optional UI adapters that allow mounting Flowrra’s management interface into existing web applications. The adapters are framework-specific and have NO dependencies in the core Flowrra package.

Available Adapters:
  • FastAPI: flowrra.ui.fastapi.create_router()

  • Flask: flowrra.ui.flask.create_blueprint()

  • Django: flowrra.ui.django.get_urls()

Important

  • Core Flowrra has ZERO web framework dependencies

  • UI adapters are optional imports

  • Users install only what they need:

    pip install flowrra[ui-fastapi] pip install flowrra[ui-flask] pip install flowrra[ui-django]

Example (FastAPI):

from fastapi import FastAPI from flowrra import Flowrra from flowrra.ui.fastapi import create_router

app = FastAPI() flowrra = Flowrra.from_urls()

# Mount Flowrra UI app.include_router(

create_router(flowrra), prefix=”/flowrra”

)

Example (Flask):

from flask import Flask from flowrra import Flowrra from flowrra.ui.flask import create_blueprint

app = Flask(__name__) flowrra = Flowrra.from_urls()

# Mount Flowrra UI app.register_blueprint(

create_blueprint(flowrra), url_prefix=”/flowrra”

)

class flowrra.ui.BaseUIAdapter(flowrra_app)[source]

Bases: ABC

Base adapter for framework-specific UI implementations.

Framework-specific adapters (FastAPI, Flask, Django) inherit from this class and implement get_routes() to provide their routing logic.

__init__(flowrra_app)[source]

Initialize adapter with services.

Parameters:

flowrra_app (Flowrra) – Flowrra application instance

abstractmethod get_routes()[source]

Return framework-specific routes.

Returns:

  • FastAPI: APIRouter

  • Flask: Blueprint

  • Django: List[URLPattern]

Return type:

Framework-specific router/blueprint/urls object

property templates_dir: Path

Get templates directory path.

Returns:

Path to templates directory

property static_dir: Path

Get static files directory path.

Returns:

Path to static files directory

async get_dashboard_data()[source]

Get data for dashboard page.

Return type:

Dict[str, Any]

Returns:

Dictionary with dashboard data

async get_tasks_page_data(status=None, limit=50)[source]

Get data for tasks page.

Parameters:
  • status (Optional[str]) – Filter by status (pending/running/success/failed)

  • limit (int) – Maximum tasks to return

Return type:

Dict[str, Any]

Returns:

Dictionary with tasks data

async get_schedules_page_data(enabled_only=False)[source]

Get data for schedules page.

Parameters:

enabled_only (bool) – If True, only return enabled schedules

Return type:

Dict[str, Any]

Returns:

Dictionary with schedules data

static format_datetime(dt)[source]

Format datetime for display.

Parameters:

dt (Any) – Datetime object or ISO string

Return type:

str

Returns:

Formatted datetime string

static format_duration(seconds)[source]

Format duration in seconds to human-readable string.

Parameters:

seconds (float) – Duration in seconds

Return type:

str

Returns:

Formatted duration (e.g., “2h 30m”, “45s”)

static get_status_color(status)[source]

Get color class for task status.

Parameters:

status (str) – Task status string

Return type:

str

Returns:

CSS color class name

async get_stats()[source]

API endpoint: Get system statistics.

Return type:

Dict[str, Any]

Returns:

System statistics dictionary

async health_check()[source]

API endpoint: Health check.

Return type:

Dict[str, Any]

Returns:

Health status dictionary

async list_tasks()[source]

API endpoint: List registered tasks.

Return type:

List[Dict[str, Any]]

Returns:

List of registered tasks

async get_task(task_name)[source]

API endpoint: Get specific task info.

Parameters:

task_name (str) – Task name

Return type:

Optional[Dict[str, Any]]

Returns:

Task information or None

async list_schedules(enabled_only=False)[source]

API endpoint: List schedules.

Parameters:

enabled_only (bool) – If True, only return enabled schedules

Return type:

List[Dict[str, Any]]

Returns:

List of schedules

async get_schedule(schedule_id)[source]

API endpoint: Get specific schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Optional[Dict[str, Any]]

Returns:

Schedule details or None

async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]

API endpoint: Create cron schedule.

Parameters:
  • task_name (str) – Task name

  • cron (str) – Cron expression

  • args (tuple) – Task arguments

  • kwargs (Optional[dict]) – Task keyword arguments

  • enabled (bool) – Start enabled

  • description (Optional[str]) – Optional description

  • priority (int) – Task priority

Return type:

Dict[str, str]

Returns:

Dictionary with schedule_id

async enable_schedule(schedule_id)[source]

API endpoint: Enable schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Dict[str, str]

Returns:

Success message

async disable_schedule(schedule_id)[source]

API endpoint: Disable schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Dict[str, str]

Returns:

Success message

async delete_schedule(schedule_id)[source]

API endpoint: Delete schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Dict[str, Any]

Returns:

Success status

class flowrra.ui.Formatter[source]

Bases: object

Static utility class for formatting values in the UI.

Provides consistent formatting across all UI adapters: - Datetime formatting - Duration formatting - Status color mapping

static format_datetime(dt)[source]

Format datetime for display.

Parameters:

dt (Any) – Datetime object or ISO string

Return type:

str

Returns:

Formatted datetime string

static format_duration(seconds)[source]

Format duration in seconds to human-readable string.

Parameters:

seconds (float) – Duration in seconds

Return type:

str

Returns:

Formatted duration (e.g., “2h 30m”, “45s”)

static get_status_color(status)[source]

Get color class for task status.

Parameters:

status (str) – Task status string

Return type:

str

Returns:

CSS color class name

class flowrra.ui.UIService(manager)[source]

Bases: object

Service for preparing data for UI pages.

Handles data aggregation and transformation for: - Dashboard page - Tasks page - Schedules page

This service acts as a bridge between FlowrraManager (raw data) and UI templates (formatted data).

__init__(manager)[source]

Initialize UI service.

Parameters:

manager (FlowrraManager) – FlowrraManager instance for data access

async get_dashboard_data()[source]

Get data for dashboard page.

Returns:

{

“stats”: {…}, “recent_failed_tasks”: […], “schedules”: […], “total_schedules”: int

}

Return type:

Dictionary with dashboard data

async get_tasks_page_data(status=None, limit=200)[source]

Get data for tasks page.

Parameters:
  • status (Optional[str]) – Filter by status (pending/running/success/failed)

  • limit (int) – Maximum tasks to return (default: 200)

Return type:

Dict[str, Any]

Returns:

Dictionary with tasks data

async get_schedules_page_data(enabled_only=False)[source]

Get data for schedules page.

Parameters:

enabled_only (bool) – If True, only return enabled schedules

Return type:

Dict[str, Any]

Returns:

Dictionary with schedules data

class flowrra.ui.ScheduleService(manager)[source]

Bases: object

Service for managing schedules via the UI.

Provides schedule CRUD operations: - List schedules - Get schedule details - Create schedules - Enable/disable schedules - Delete schedules

This service wraps FlowrraManager schedule operations and provides a consistent API for UI adapters.

__init__(manager)[source]

Initialize schedule service.

Parameters:

manager (FlowrraManager) – FlowrraManager instance for schedule operations

async list_schedules(enabled_only=False)[source]

List schedules.

Parameters:

enabled_only (bool) – If True, only return enabled schedules

Return type:

List[Dict[str, Any]]

Returns:

List of schedules

async get_schedule(schedule_id)[source]

Get specific schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Optional[Dict[str, Any]]

Returns:

Schedule details or None

async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]

Create cron schedule.

Parameters:
  • task_name (str) – Task name

  • cron (str) – Cron expression

  • args (tuple) – Task arguments

  • kwargs (Optional[dict]) – Task keyword arguments

  • enabled (bool) – Start enabled

  • description (Optional[str]) – Optional description

  • priority (int) – Task priority

Return type:

Dict[str, str]

Returns:

Dictionary with schedule_id

async enable_schedule(schedule_id)[source]

Enable schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Dict[str, str]

Returns:

Success message

async disable_schedule(schedule_id)[source]

Disable schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Dict[str, str]

Returns:

Success message

async delete_schedule(schedule_id)[source]

Delete schedule.

Parameters:

schedule_id (str) – Schedule ID

Return type:

Dict[str, Any]

Returns:

Success status