flowrra.management package
Submodules
flowrra.management.manager module
Core management API for Flowrra applications.
- class flowrra.management.manager.FlowrraManager(app)[source]
Bases:
objectManagement interface for querying Flowrra application state.
This class provides a framework-agnostic API for querying: - System statistics (executor status, task counts) - Registered tasks - Pending/completed tasks - Scheduler state and schedules
- Parameters:
app (Flowrra) – Flowrra application instance
Example
manager = FlowrraManager(app) stats = await manager.get_stats() tasks = await manager.list_registered_tasks()
- async get_stats()[source]
Get comprehensive system statistics.
- Returns:
- {
“app”: {“running”: bool}, “executors”: {
”io”: {“running”: bool, “workers”: int} | None, “cpu”: {“running”: bool, “workers”: int} | None
}, “tasks”: {
”registered”: int, “pending”: int
}, “scheduler”: {
”enabled”: bool, “schedules”: int
} | None
}
- Return type:
Dictionary with system state
- async list_registered_tasks()[source]
List all registered tasks.
- Returns:
- [
- {
“name”: str, “cpu_bound”: bool, “max_retries”: int, “retry_delay”: float
]
- Return type:
List of task information dictionaries
- async get_task_info(task_name)[source]
Get detailed information about a specific task.
- Parameters:
task_name (
str) – Name of the task- Return type:
Optional[Dict[str,Any]]- Returns:
Task information dictionary or None if not found
- async list_pending_tasks(limit=None)[source]
List pending tasks waiting for execution.
- Parameters:
limit (
Optional[int]) – Maximum number of tasks to return (None = no limit)- Return type:
List[Dict[str,Any]]- Returns:
List of pending task dictionaries, ordered by submission time (newest first)
- async list_running_tasks(limit=None)[source]
List currently running tasks.
- Return type:
List[Dict[str,Any]]
- async list_completed_tasks(limit=None)[source]
List successfully completed tasks.
- Return type:
List[Dict[str,Any]]
- async get_task_result(task_id)[source]
Get result of a completed task.
- Parameters:
task_id (
str) – Task identifier- Returns:
- {
“task_id”: str, “status”: str, “result”: Any, “error”: str | None, “completed_at”: datetime | None
}
- Return type:
Task result dictionary or None if not found
- async list_schedules(enabled_only=False)[source]
List all scheduled tasks.
- Parameters:
enabled_only (
bool) – If True, only return enabled schedules- Return type:
List[Dict[str,Any]]- Returns:
List of schedule dictionaries (empty if no scheduler configured)
- async get_schedule(schedule_id)[source]
Get details of a specific schedule.
- Parameters:
schedule_id (
str) – Schedule identifier- Return type:
Optional[Dict[str,Any]]- Returns:
Schedule dictionary or None if not found/no scheduler
- async get_scheduler_stats()[source]
Get scheduler statistics.
- Return type:
Optional[Dict[str,Any]]- Returns:
Scheduler stats dictionary or None if no scheduler
- async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]
Create a cron-based schedule.
- Parameters:
task_name (
str) – Name of registered taskcron (
str) – Cron expression (e.g., “0 9 * * *”)args (
tuple) – Positional arguments for taskkwargs (
Optional[dict]) – Keyword arguments for taskenabled (
bool) – Whether schedule starts enableddescription (
Optional[str]) – Optional descriptionpriority (
int) – Task priority (higher = more important)
- Return type:
str- Returns:
Schedule ID
- Raises:
ValueError – If no scheduler configured
- async create_schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]
Create an interval-based schedule.
- Parameters:
task_name (
str) – Name of registered taskinterval (
float) – Interval in secondsargs (
tuple) – Positional arguments for taskkwargs (
Optional[dict]) – Keyword arguments for taskenabled (
bool) – Whether schedule starts enableddescription (
Optional[str]) – Optional descriptionpriority (
int) – Task priority
- Return type:
str- Returns:
Schedule ID
- Raises:
ValueError – If no scheduler configured
- async enable_schedule(schedule_id)[source]
Enable a scheduled task.
- Parameters:
schedule_id (
str) – Schedule identifier- Raises:
ValueError – If no scheduler configured or schedule not found
- Return type:
None
- async disable_schedule(schedule_id)[source]
Disable a scheduled task.
- Parameters:
schedule_id (
str) – Schedule identifier- Raises:
ValueError – If no scheduler configured or schedule not found
- Return type:
None
- async delete_schedule(schedule_id)[source]
Delete a scheduled task.
- Parameters:
schedule_id (
str) – Schedule identifier- Return type:
bool- Returns:
True if deleted, False if not found
- Raises:
ValueError – If no scheduler configured
- async health_check()[source]
Perform health check of the Flowrra application.
- Returns:
- {
“healthy”: bool, “timestamp”: datetime, “components”: {
”app”: {“healthy”: bool, “message”: str}, “io_executor”: {“healthy”: bool, “message”: str} | None, “cpu_executor”: {“healthy”: bool, “message”: str} | None
}
}
- Return type:
Health status dictionary
Module contents
Management API for querying Flowrra state.
This module provides a pure Python API for querying and managing Flowrra applications. It has NO web framework dependencies and can be used by: - CLI tools - Web UI adapters (FastAPI, Flask, Django) - Monitoring scripts - Testing utilities
Example
from flowrra import Flowrra from flowrra.management import FlowrraManager
app = Flowrra.from_urls() manager = FlowrraManager(app)
# Query system state stats = await manager.get_stats() tasks = await manager.list_registered_tasks() schedules = await manager.list_schedules()
- class flowrra.management.FlowrraManager(app)[source]
Bases:
objectManagement interface for querying Flowrra application state.
This class provides a framework-agnostic API for querying: - System statistics (executor status, task counts) - Registered tasks - Pending/completed tasks - Scheduler state and schedules
- Parameters:
app (Flowrra) – Flowrra application instance
Example
manager = FlowrraManager(app) stats = await manager.get_stats() tasks = await manager.list_registered_tasks()
- async get_stats()[source]
Get comprehensive system statistics.
- Returns:
- {
“app”: {“running”: bool}, “executors”: {
”io”: {“running”: bool, “workers”: int} | None, “cpu”: {“running”: bool, “workers”: int} | None
}, “tasks”: {
”registered”: int, “pending”: int
}, “scheduler”: {
”enabled”: bool, “schedules”: int
} | None
}
- Return type:
Dictionary with system state
- async list_registered_tasks()[source]
List all registered tasks.
- Returns:
- [
- {
“name”: str, “cpu_bound”: bool, “max_retries”: int, “retry_delay”: float
]
- Return type:
List of task information dictionaries
- async get_task_info(task_name)[source]
Get detailed information about a specific task.
- Parameters:
task_name (
str) – Name of the task- Return type:
Optional[Dict[str,Any]]- Returns:
Task information dictionary or None if not found
- async list_pending_tasks(limit=None)[source]
List pending tasks waiting for execution.
- Parameters:
limit (
Optional[int]) – Maximum number of tasks to return (None = no limit)- Return type:
List[Dict[str,Any]]- Returns:
List of pending task dictionaries, ordered by submission time (newest first)
- async list_running_tasks(limit=None)[source]
List currently running tasks.
- Return type:
List[Dict[str,Any]]
- async list_completed_tasks(limit=None)[source]
List successfully completed tasks.
- Return type:
List[Dict[str,Any]]
- async get_task_result(task_id)[source]
Get result of a completed task.
- Parameters:
task_id (
str) – Task identifier- Returns:
- {
“task_id”: str, “status”: str, “result”: Any, “error”: str | None, “completed_at”: datetime | None
}
- Return type:
Task result dictionary or None if not found
- async list_schedules(enabled_only=False)[source]
List all scheduled tasks.
- Parameters:
enabled_only (
bool) – If True, only return enabled schedules- Return type:
List[Dict[str,Any]]- Returns:
List of schedule dictionaries (empty if no scheduler configured)
- async get_schedule(schedule_id)[source]
Get details of a specific schedule.
- Parameters:
schedule_id (
str) – Schedule identifier- Return type:
Optional[Dict[str,Any]]- Returns:
Schedule dictionary or None if not found/no scheduler
- async get_scheduler_stats()[source]
Get scheduler statistics.
- Return type:
Optional[Dict[str,Any]]- Returns:
Scheduler stats dictionary or None if no scheduler
- async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]
Create a cron-based schedule.
- Parameters:
task_name (
str) – Name of registered taskcron (
str) – Cron expression (e.g., “0 9 * * *”)args (
tuple) – Positional arguments for taskkwargs (
Optional[dict]) – Keyword arguments for taskenabled (
bool) – Whether schedule starts enableddescription (
Optional[str]) – Optional descriptionpriority (
int) – Task priority (higher = more important)
- Return type:
str- Returns:
Schedule ID
- Raises:
ValueError – If no scheduler configured
- async create_schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]
Create an interval-based schedule.
- Parameters:
task_name (
str) – Name of registered taskinterval (
float) – Interval in secondsargs (
tuple) – Positional arguments for taskkwargs (
Optional[dict]) – Keyword arguments for taskenabled (
bool) – Whether schedule starts enableddescription (
Optional[str]) – Optional descriptionpriority (
int) – Task priority
- Return type:
str- Returns:
Schedule ID
- Raises:
ValueError – If no scheduler configured
- async enable_schedule(schedule_id)[source]
Enable a scheduled task.
- Parameters:
schedule_id (
str) – Schedule identifier- Raises:
ValueError – If no scheduler configured or schedule not found
- Return type:
None
- async disable_schedule(schedule_id)[source]
Disable a scheduled task.
- Parameters:
schedule_id (
str) – Schedule identifier- Raises:
ValueError – If no scheduler configured or schedule not found
- Return type:
None
- async delete_schedule(schedule_id)[source]
Delete a scheduled task.
- Parameters:
schedule_id (
str) – Schedule identifier- Return type:
bool- Returns:
True if deleted, False if not found
- Raises:
ValueError – If no scheduler configured
- async health_check()[source]
Perform health check of the Flowrra application.
- Returns:
- {
“healthy”: bool, “timestamp”: datetime, “components”: {
”app”: {“healthy”: bool, “message”: str}, “io_executor”: {“healthy”: bool, “message”: str} | None, “cpu_executor”: {“healthy”: bool, “message”: str} | None
}
}
- Return type:
Health status dictionary