flowrra.ui package
Subpackages
- flowrra.ui.base package
- Submodules
- flowrra.ui.base.adapter module
BaseUIAdapterBaseUIAdapter.__init__()BaseUIAdapter.get_routes()BaseUIAdapter.templates_dirBaseUIAdapter.static_dirBaseUIAdapter.get_dashboard_data()BaseUIAdapter.get_tasks_page_data()BaseUIAdapter.get_schedules_page_data()BaseUIAdapter.format_datetime()BaseUIAdapter.format_duration()BaseUIAdapter.get_status_color()BaseUIAdapter.get_stats()BaseUIAdapter.health_check()BaseUIAdapter.list_tasks()BaseUIAdapter.get_task()BaseUIAdapter.list_schedules()BaseUIAdapter.get_schedule()BaseUIAdapter.create_schedule_cron()BaseUIAdapter.enable_schedule()BaseUIAdapter.disable_schedule()BaseUIAdapter.delete_schedule()
- flowrra.ui.base.formatter module
- flowrra.ui.base.schedule_service module
- flowrra.ui.base.ui_service module
- Module contents
BaseUIAdapterBaseUIAdapter.__init__()BaseUIAdapter.get_routes()BaseUIAdapter.templates_dirBaseUIAdapter.static_dirBaseUIAdapter.get_dashboard_data()BaseUIAdapter.get_tasks_page_data()BaseUIAdapter.get_schedules_page_data()BaseUIAdapter.format_datetime()BaseUIAdapter.format_duration()BaseUIAdapter.get_status_color()BaseUIAdapter.get_stats()BaseUIAdapter.health_check()BaseUIAdapter.list_tasks()BaseUIAdapter.get_task()BaseUIAdapter.list_schedules()BaseUIAdapter.get_schedule()BaseUIAdapter.create_schedule_cron()BaseUIAdapter.enable_schedule()BaseUIAdapter.disable_schedule()BaseUIAdapter.delete_schedule()
FormatterUIServiceScheduleService
Submodules
flowrra.ui.django module
flowrra.ui.django_websocket module
Django WebSocket Consumer for Flowrra
Provides real-time task updates via WebSocket using Django-Channels.
This module is optional and requires Django-Channels to be installed. Users who don’t want WebSocket support can rely on the frontend’s polling fallback mechanism (15-second intervals).
- Usage Example:
- Install Django-Channels:
pip install channels channels-redis
- Configure ASGI in your Django settings.py:
ASGI_APPLICATION = ‘your_project.asgi.application’ CHANNEL_LAYERS = {
- ‘default’: {
‘BACKEND’: ‘channels_redis.core.RedisChannelLayer’, ‘CONFIG’: {
‘hosts’: [(‘127.0.0.1’, 6379)],
},
},
}
- Create/update your routing.py:
from channels.routing import ProtocolTypeRouter, URLRouter from django.core.asgi import get_asgi_application from flowrra.ui.django_websocket import websocket_urlpatterns
- application = ProtocolTypeRouter({
‘http’: get_asgi_application(), ‘websocket’: URLRouter(websocket_urlpatterns),
})
- Deploy with an ASGI server:
daphne -b 0.0.0.0 -p 8000 your_project.asgi:application # or uvicorn your_project.asgi:application –host 0.0.0.0 –port 8000
Without WebSocket setup, the frontend automatically falls back to polling.
- class flowrra.ui.django_websocket.FlowrraConsumer(*args, **kwargs)[source]
Bases:
AsyncWebsocketConsumerWebSocket consumer for real-time Flowrra task updates.
This consumer subscribes to the global event_bus and forwards task update events to connected WebSocket clients.
- Architecture:
Uses an asyncio.Queue to decouple event_bus from WebSocket send
Subscriber function puts events into the queue
Separate task reads from queue and sends to WebSocket
Prevents blocking the event_bus if WebSocket is slow
- Events Format:
- {
‘type’: ‘task.update’, ‘task’: {
‘task_id’: str, ‘status’: ‘pending|running|success|failed|retrying’, ‘task_name’: str, ‘result’: Any, ‘error’: str | None, ‘submitted_at’: ISO datetime, ‘started_at’: ISO datetime, ‘finished_at’: ISO datetime, ‘retries’: int, # … additional task fields
}
}
flowrra.ui.fastapi module
flowrra.ui.flask module
flowrra.ui.quart_websocket module
Quart WebSocket Support for Flowrra
Provides real-time task updates via WebSocket for Flask/Quart applications.
Note: This requires Quart, which is an async-compatible reimplementation of Flask. Standard Flask does not support WebSockets natively.
- Installation:
pip install quart
- Migration from Flask to Quart:
Quart is API-compatible with Flask, making migration straightforward: - Replace ‘from flask import Flask’ with ‘from quart import Quart’ - Replace ‘flask run’ with ‘quart run’ or use hypercorn/uvicorn - Most Flask extensions work with Quart equivalents
- Usage Example:
from quart import Quart from flowrra import Flowrra from flowrra.ui.quart_websocket import setup_websocket
app = Quart(__name__) flowrra = Flowrra.from_urls()
# Setup WebSocket endpoint setup_websocket(app)
# Run with ASGI server # quart run # or: hypercorn app:app # or: uvicorn app:app
Without Quart/WebSocket, the frontend falls back to polling every 15 seconds.
- async flowrra.ui.quart_websocket.websocket_endpoint()[source]
Quart WebSocket endpoint for Flowrra real-time task updates.
This function should be registered as a WebSocket route in your Quart app.
- Architecture:
Uses an asyncio.Queue to decouple event_bus from WebSocket send
Subscriber function puts events into the queue
Separate task reads from queue and sends to WebSocket
Prevents blocking the event_bus if WebSocket is slow
- Events Format:
- {
‘type’: ‘task.update’, ‘task’: {
‘task_id’: str, ‘status’: ‘pending|running|success|failed|retrying’, ‘task_name’: str, ‘result’: Any, ‘error’: str | None, ‘submitted_at’: ISO datetime, ‘started_at’: ISO datetime, ‘finished_at’: ISO datetime, ‘retries’: int, # … additional task fields
}
}
- Raises:
ImportError – If Quart is not installed
- flowrra.ui.quart_websocket.setup_websocket(app, path='/flowrra/ws')[source]
Setup WebSocket endpoint in a Quart application.
This is a convenience function to register the WebSocket route.
- Parameters:
app (
None) – Quart application instancepath (
str) – WebSocket endpoint path (default: ‘/flowrra/ws’)
Example
from quart import Quart from flowrra.ui.quart_websocket import setup_websocket
app = Quart(__name__) setup_websocket(app)
- Raises:
ImportError – If Quart is not installed
Module contents
UI adapters for mounting Flowrra into web frameworks.
This module provides optional UI adapters that allow mounting Flowrra’s management interface into existing web applications. The adapters are framework-specific and have NO dependencies in the core Flowrra package.
- Available Adapters:
FastAPI: flowrra.ui.fastapi.create_router()
Flask: flowrra.ui.flask.create_blueprint()
Django: flowrra.ui.django.get_urls()
Important
Core Flowrra has ZERO web framework dependencies
UI adapters are optional imports
- Users install only what they need:
pip install flowrra[ui-fastapi] pip install flowrra[ui-flask] pip install flowrra[ui-django]
- Example (FastAPI):
from fastapi import FastAPI from flowrra import Flowrra from flowrra.ui.fastapi import create_router
app = FastAPI() flowrra = Flowrra.from_urls()
# Mount Flowrra UI app.include_router(
create_router(flowrra), prefix=”/flowrra”
)
- Example (Flask):
from flask import Flask from flowrra import Flowrra from flowrra.ui.flask import create_blueprint
app = Flask(__name__) flowrra = Flowrra.from_urls()
# Mount Flowrra UI app.register_blueprint(
create_blueprint(flowrra), url_prefix=”/flowrra”
)
- class flowrra.ui.BaseUIAdapter(flowrra_app)[source]
Bases:
ABCBase adapter for framework-specific UI implementations.
Framework-specific adapters (FastAPI, Flask, Django) inherit from this class and implement get_routes() to provide their routing logic.
- __init__(flowrra_app)[source]
Initialize adapter with services.
- Parameters:
flowrra_app (
Flowrra) – Flowrra application instance
- abstractmethod get_routes()[source]
Return framework-specific routes.
- Returns:
FastAPI: APIRouter
Flask: Blueprint
Django: List[URLPattern]
- Return type:
Framework-specific router/blueprint/urls object
- property templates_dir: Path
Get templates directory path.
- Returns:
Path to templates directory
- property static_dir: Path
Get static files directory path.
- Returns:
Path to static files directory
- async get_dashboard_data()[source]
Get data for dashboard page.
- Return type:
Dict[str,Any]- Returns:
Dictionary with dashboard data
- async get_tasks_page_data(status=None, limit=50)[source]
Get data for tasks page.
- Parameters:
status (
Optional[str]) – Filter by status (pending/running/success/failed)limit (
int) – Maximum tasks to return
- Return type:
Dict[str,Any]- Returns:
Dictionary with tasks data
- async get_schedules_page_data(enabled_only=False)[source]
Get data for schedules page.
- Parameters:
enabled_only (
bool) – If True, only return enabled schedules- Return type:
Dict[str,Any]- Returns:
Dictionary with schedules data
- static format_datetime(dt)[source]
Format datetime for display.
- Parameters:
dt (
Any) – Datetime object or ISO string- Return type:
str- Returns:
Formatted datetime string
- static format_duration(seconds)[source]
Format duration in seconds to human-readable string.
- Parameters:
seconds (
float) – Duration in seconds- Return type:
str- Returns:
Formatted duration (e.g., “2h 30m”, “45s”)
- static get_status_color(status)[source]
Get color class for task status.
- Parameters:
status (
str) – Task status string- Return type:
str- Returns:
CSS color class name
- async get_stats()[source]
API endpoint: Get system statistics.
- Return type:
Dict[str,Any]- Returns:
System statistics dictionary
- async health_check()[source]
API endpoint: Health check.
- Return type:
Dict[str,Any]- Returns:
Health status dictionary
- async list_tasks()[source]
API endpoint: List registered tasks.
- Return type:
List[Dict[str,Any]]- Returns:
List of registered tasks
- async get_task(task_name)[source]
API endpoint: Get specific task info.
- Parameters:
task_name (
str) – Task name- Return type:
Optional[Dict[str,Any]]- Returns:
Task information or None
- async list_schedules(enabled_only=False)[source]
API endpoint: List schedules.
- Parameters:
enabled_only (
bool) – If True, only return enabled schedules- Return type:
List[Dict[str,Any]]- Returns:
List of schedules
- async get_schedule(schedule_id)[source]
API endpoint: Get specific schedule.
- Parameters:
schedule_id (
str) – Schedule ID- Return type:
Optional[Dict[str,Any]]- Returns:
Schedule details or None
- async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]
API endpoint: Create cron schedule.
- Parameters:
task_name (
str) – Task namecron (
str) – Cron expressionargs (
tuple) – Task argumentskwargs (
Optional[dict]) – Task keyword argumentsenabled (
bool) – Start enableddescription (
Optional[str]) – Optional descriptionpriority (
int) – Task priority
- Return type:
Dict[str,str]- Returns:
Dictionary with schedule_id
- async enable_schedule(schedule_id)[source]
API endpoint: Enable schedule.
- Parameters:
schedule_id (
str) – Schedule ID- Return type:
Dict[str,str]- Returns:
Success message
- class flowrra.ui.Formatter[source]
Bases:
objectStatic utility class for formatting values in the UI.
Provides consistent formatting across all UI adapters: - Datetime formatting - Duration formatting - Status color mapping
- static format_datetime(dt)[source]
Format datetime for display.
- Parameters:
dt (
Any) – Datetime object or ISO string- Return type:
str- Returns:
Formatted datetime string
- class flowrra.ui.UIService(manager)[source]
Bases:
objectService for preparing data for UI pages.
Handles data aggregation and transformation for: - Dashboard page - Tasks page - Schedules page
This service acts as a bridge between FlowrraManager (raw data) and UI templates (formatted data).
- __init__(manager)[source]
Initialize UI service.
- Parameters:
manager (
FlowrraManager) – FlowrraManager instance for data access
- async get_dashboard_data()[source]
Get data for dashboard page.
- Returns:
- {
“stats”: {…}, “recent_failed_tasks”: […], “schedules”: […], “total_schedules”: int
}
- Return type:
Dictionary with dashboard data
- class flowrra.ui.ScheduleService(manager)[source]
Bases:
objectService for managing schedules via the UI.
Provides schedule CRUD operations: - List schedules - Get schedule details - Create schedules - Enable/disable schedules - Delete schedules
This service wraps FlowrraManager schedule operations and provides a consistent API for UI adapters.
- __init__(manager)[source]
Initialize schedule service.
- Parameters:
manager (
FlowrraManager) – FlowrraManager instance for schedule operations
- async list_schedules(enabled_only=False)[source]
List schedules.
- Parameters:
enabled_only (
bool) – If True, only return enabled schedules- Return type:
List[Dict[str,Any]]- Returns:
List of schedules
- async get_schedule(schedule_id)[source]
Get specific schedule.
- Parameters:
schedule_id (
str) – Schedule ID- Return type:
Optional[Dict[str,Any]]- Returns:
Schedule details or None
- async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]
Create cron schedule.
- Parameters:
task_name (
str) – Task namecron (
str) – Cron expressionargs (
tuple) – Task argumentskwargs (
Optional[dict]) – Task keyword argumentsenabled (
bool) – Start enableddescription (
Optional[str]) – Optional descriptionpriority (
int) – Task priority
- Return type:
Dict[str,str]- Returns:
Dictionary with schedule_id
- async enable_schedule(schedule_id)[source]
Enable schedule.
- Parameters:
schedule_id (
str) – Schedule ID- Return type:
Dict[str,str]- Returns:
Success message