"""Core management API for Flowrra applications."""
from typing import Any, Dict, List, Optional, TYPE_CHECKING
from datetime import datetime
if TYPE_CHECKING:
from flowrra.task import TaskStatus
[docs]
class FlowrraManager:
"""Management interface for querying Flowrra application state.
This class provides a framework-agnostic API for querying:
- System statistics (executor status, task counts)
- Registered tasks
- Pending/completed tasks
- Scheduler state and schedules
Args:
app: Flowrra application instance
Example:
manager = FlowrraManager(app)
stats = await manager.get_stats()
tasks = await manager.list_registered_tasks()
"""
[docs]
def __init__(self, app: "Flowrra"): # noqa: F821
"""Initialize manager with Flowrra app."""
self.app = app
# ============================================
# System Statistics
# ============================================
[docs]
async def get_stats(self) -> Dict[str, Any]:
"""Get comprehensive system statistics.
Returns:
Dictionary with system state:
{
"app": {"running": bool},
"executors": {
"io": {"running": bool, "workers": int} | None,
"cpu": {"running": bool, "workers": int} | None
},
"tasks": {
"registered": int,
"pending": int
},
"scheduler": {
"enabled": bool,
"schedules": int
} | None
}
"""
stats = {
"app": {"running": self.app.is_running},
"executors": {
"io": self._get_io_executor_stats(),
"cpu": self._get_cpu_executor_stats(),
},
"tasks": {
"registered": len(self.app.registry._tasks),
"pending": await self._count_pending_tasks(),
},
"scheduler": await self._get_scheduler_stats_for_overview(),
}
return stats
async def _get_scheduler_stats_for_overview(self) -> Optional[Dict[str, Any]]:
"""Get scheduler stats for overview (called by get_stats)."""
if not self.app.scheduler:
return None
all_schedules = await self.app.scheduler.backend.list_all()
return {
"enabled": True,
"running": self.app.scheduler.is_running,
"total_schedules": len(all_schedules),
"enabled_schedules": sum(1 for s in all_schedules if s.enabled),
}
def _get_io_executor_stats(self) -> Optional[Dict[str, Any]]:
"""Get IO executor statistics."""
if not self.app._io_executor:
return None
executor = self.app._io_executor
return {
"running": executor.is_running,
"workers": executor._io_workers,
}
def _get_cpu_executor_stats(self) -> Optional[Dict[str, Any]]:
"""Get CPU executor statistics."""
if not self.app._cpu_executor:
return None
executor = self.app._cpu_executor
return {
"running": executor.is_running,
"workers": executor._cpu_workers,
}
async def _count_pending_tasks(self) -> int:
"""Count pending tasks across all executors."""
count = 0
if self.app._io_executor and self.app._io_executor.is_running:
try:
if self.app._io_executor.is_broker():
count += await self.app._io_executor.broker.size()
else:
count += self.app._io_executor.broker.qsize()
except (AttributeError, NotImplementedError):
pass
if self.app._cpu_executor and self.app._cpu_executor.is_running:
try:
count += await self.app._cpu_executor.broker.size()
except (AttributeError, Exception):
pass
return count
# ============================================
# Task Queries
# ============================================
[docs]
async def list_registered_tasks(self) -> List[Dict[str, Any]]:
"""List all registered tasks.
Returns:
List of task information dictionaries:
[
{
"name": str,
"cpu_bound": bool,
"max_retries": int,
"retry_delay": float
},
...
]
"""
tasks = []
for task_name, task_func in self.app.registry._tasks.items():
task_info = {
"name": task_name,
"cpu_bound": getattr(task_func, "cpu_bound", False),
"max_retries": getattr(task_func, "max_retries", 0),
"retry_delay": getattr(task_func, "retry_delay", 0.0),
}
tasks.append(task_info)
return tasks
[docs]
async def get_task_info(self, task_name: str) -> Optional[Dict[str, Any]]:
"""Get detailed information about a specific task.
Args:
task_name: Name of the task
Returns:
Task information dictionary or None if not found
"""
task_func = self.app.registry.get(task_name)
if not task_func:
return None
return {
"name": task_name,
"cpu_bound": getattr(task_func, "cpu_bound", False),
"max_retries": getattr(task_func, "max_retries", 0),
"retry_delay": getattr(task_func, "retry_delay", 0.0),
"module": task_func.__module__,
"qualname": task_func.__qualname__,
}
[docs]
async def list_pending_tasks(
self, limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""List pending tasks waiting for execution.
Args:
limit: Maximum number of tasks to return (None = no limit)
Returns:
List of pending task dictionaries, ordered by submission time (newest first)
"""
backend = None
if self.app._io_executor:
backend = self.app._io_executor.results
elif self.app._cpu_executor:
backend = self.app._cpu_executor.results
if backend is None:
return []
try:
from flowrra.task import TaskStatus
pending_results = await backend.list_by_status(
status=TaskStatus.PENDING,
limit=limit
)
return [
{
"task_id": result.task_id,
"task_name": result.task_name,
"status": result.status.value,
"submitted_at": result.submitted_at,
"args": result.args,
"kwargs": result.kwargs,
"retries": result.retries,
}
for result in pending_results
]
except NotImplementedError:
import logging
logger = logging.getLogger("flowrra")
logger.warning(
f"Backend {backend.__class__.__name__} doesn't support list_by_status()"
)
return []
except Exception as e:
import logging
logger = logging.getLogger("flowrra")
logger.error(f"Failed to list pending tasks: {e}")
return []
[docs]
async def list_running_tasks(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""List currently running tasks."""
from flowrra.task import TaskStatus
return await self._list_tasks_by_status(TaskStatus.RUNNING, limit)
[docs]
async def list_completed_tasks(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""List successfully completed tasks."""
from flowrra.task import TaskStatus
return await self._list_tasks_by_status(TaskStatus.SUCCESS, limit)
[docs]
async def list_failed_tasks(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""List failed tasks."""
from flowrra.task import TaskStatus
return await self._list_tasks_by_status(TaskStatus.FAILED, limit)
async def _list_tasks_by_status(
self,
status: "TaskStatus",
limit: Optional[int] = None
) -> List[Dict[str, Any]]:
"""Internal helper to list tasks by status."""
backend = None
if self.app._io_executor:
backend = self.app._io_executor.results
elif self.app._cpu_executor:
backend = self.app._cpu_executor.results
if backend is None:
return []
try:
results = await backend.list_by_status(status=status, limit=limit)
return [
{
"task_id": result.task_id,
"task_name": result.task_name,
"status": result.status.value,
"submitted_at": result.submitted_at,
"started_at": result.started_at,
"finished_at": result.finished_at,
"args": result.args,
"kwargs": result.kwargs,
"result": result.result if result.status.value == "success" else None,
"error": result.error if result.status.value == "failed" else None,
"retries": result.retries,
}
for result in results
]
except NotImplementedError:
import logging
logger = logging.getLogger("flowrra")
logger.warning(
f"Backend {backend.__class__.__name__} doesn't support list_by_status()"
)
return []
except Exception as e:
import logging
logger = logging.getLogger("flowrra")
logger.error(f"Failed to list {status.value} tasks: {e}")
return []
[docs]
async def get_task_result(self, task_id: str) -> Optional[Dict[str, Any]]:
"""Get result of a completed task.
Args:
task_id: Task identifier
Returns:
Task result dictionary or None if not found:
{
"task_id": str,
"status": str,
"result": Any,
"error": str | None,
"completed_at": datetime | None
}
"""
result = await self.app.get_result(task_id)
if not result:
return None
return {
"task_id": task_id,
"status": result.status,
"result": result.result,
"error": result.error,
"completed_at": None, # Not tracked yet
}
# ============================================
# Scheduler Queries
# ============================================
[docs]
async def list_schedules(
self, enabled_only: bool = False
) -> List[Dict[str, Any]]:
"""List all scheduled tasks.
Args:
enabled_only: If True, only return enabled schedules
Returns:
List of schedule dictionaries (empty if no scheduler configured)
"""
if not self.app.scheduler:
return []
# Get schedules from backend
if enabled_only:
schedules = await self.app.scheduler.backend.list_enabled()
else:
schedules = await self.app.scheduler.backend.list_all()
return [
{
"id": s.id,
"task_name": s.task_name,
"schedule_type": s.schedule_type.value,
"schedule": s.schedule,
"enabled": s.enabled,
"next_run_at": s.next_run_at,
"last_run_at": s.last_run_at,
"priority": s.priority,
"description": s.description,
"created_at": s.created_at,
"updated_at": s.updated_at,
}
for s in schedules
]
[docs]
async def get_schedule(self, schedule_id: str) -> Optional[Dict[str, Any]]:
"""Get details of a specific schedule.
Args:
schedule_id: Schedule identifier
Returns:
Schedule dictionary or None if not found/no scheduler
"""
if not self.app.scheduler:
return None
task = await self.app.scheduler.get_scheduled_task(schedule_id)
if not task:
return None
return {
"id": task.id,
"task_name": task.task_name,
"schedule_type": task.schedule_type.value,
"schedule": task.schedule,
"enabled": task.enabled,
"next_run_at": task.next_run_at,
"last_run_at": task.last_run_at,
"priority": task.priority,
"description": task.description,
"args": task.args,
"kwargs": task.kwargs,
"max_retries": task.max_retries,
"retry_delay": task.retry_delay,
"created_at": task.created_at,
"updated_at": task.updated_at,
}
[docs]
async def get_scheduler_stats(self) -> Optional[Dict[str, Any]]:
"""Get scheduler statistics.
Returns:
Scheduler stats dictionary or None if no scheduler
"""
if not self.app.scheduler:
return None
all_schedules = await self.app.scheduler.backend.list_all()
return {
"running": self.app.scheduler.is_running,
"check_interval": self.app.scheduler.check_interval,
"total_schedules": len(all_schedules),
"enabled_schedules": sum(1 for s in all_schedules if s.enabled),
"disabled_schedules": sum(1 for s in all_schedules if not s.enabled),
}
# ============================================
# Scheduler Management
# ============================================
[docs]
async def create_schedule_cron(
self,
task_name: str,
cron: str,
args: tuple = (),
kwargs: Optional[dict] = None,
enabled: bool = True,
description: Optional[str] = None,
priority: int = 0,
) -> str:
"""Create a cron-based schedule.
Args:
task_name: Name of registered task
cron: Cron expression (e.g., "0 9 * * *")
args: Positional arguments for task
kwargs: Keyword arguments for task
enabled: Whether schedule starts enabled
description: Optional description
priority: Task priority (higher = more important)
Returns:
Schedule ID
Raises:
ValueError: If no scheduler configured
"""
if not self.app.scheduler:
raise ValueError(
"No scheduler configured. Create one with "
"app.create_scheduler() first."
)
return await self.app.scheduler.schedule_cron(
task_name=task_name,
cron=cron,
args=args,
kwargs=kwargs or {},
enabled=enabled,
description=description,
priority=priority,
)
[docs]
async def create_schedule_interval(
self,
task_name: str,
interval: float,
args: tuple = (),
kwargs: Optional[dict] = None,
enabled: bool = True,
description: Optional[str] = None,
priority: int = 0,
) -> str:
"""Create an interval-based schedule.
Args:
task_name: Name of registered task
interval: Interval in seconds
args: Positional arguments for task
kwargs: Keyword arguments for task
enabled: Whether schedule starts enabled
description: Optional description
priority: Task priority
Returns:
Schedule ID
Raises:
ValueError: If no scheduler configured
"""
if not self.app.scheduler:
raise ValueError(
"No scheduler configured. Create one with "
"app.create_scheduler() first."
)
return await self.app.scheduler.schedule_interval(
task_name=task_name,
interval=interval,
args=args,
kwargs=kwargs or {},
enabled=enabled,
description=description,
priority=priority,
)
[docs]
async def enable_schedule(self, schedule_id: str) -> None:
"""Enable a scheduled task.
Args:
schedule_id: Schedule identifier
Raises:
ValueError: If no scheduler configured or schedule not found
"""
if not self.app.scheduler:
raise ValueError("No scheduler configured")
await self.app.scheduler.enable_task(schedule_id)
[docs]
async def disable_schedule(self, schedule_id: str) -> None:
"""Disable a scheduled task.
Args:
schedule_id: Schedule identifier
Raises:
ValueError: If no scheduler configured or schedule not found
"""
if not self.app.scheduler:
raise ValueError("No scheduler configured")
await self.app.scheduler.disable_task(schedule_id)
[docs]
async def delete_schedule(self, schedule_id: str) -> bool:
"""Delete a scheduled task.
Args:
schedule_id: Schedule identifier
Returns:
True if deleted, False if not found
Raises:
ValueError: If no scheduler configured
"""
if not self.app.scheduler:
raise ValueError("No scheduler configured")
return await self.app.scheduler.unschedule(schedule_id)
# ============================================
# Health Check
# ============================================
[docs]
async def health_check(self) -> Dict[str, Any]:
"""Perform health check of the Flowrra application.
Returns:
Health status dictionary:
{
"healthy": bool,
"timestamp": datetime,
"components": {
"app": {"healthy": bool, "message": str},
"io_executor": {"healthy": bool, "message": str} | None,
"cpu_executor": {"healthy": bool, "message": str} | None
}
}
"""
components = {
"app": self._check_app_health(),
"io_executor": self._check_io_executor_health(),
"cpu_executor": self._check_cpu_executor_health(),
"scheduler": await self._check_scheduler_health(),
}
# Overall health: all components must be healthy (or None)
healthy = all(
comp is None or comp["healthy"] for comp in components.values()
)
return {
"healthy": healthy,
"timestamp": datetime.now(),
"components": components,
}
def _check_app_health(self) -> Dict[str, Any]:
"""Check app health."""
running = self.app.is_running
return {
"healthy": running,
"message": "App is running" if running else "App is not running",
}
def _check_io_executor_health(self) -> Optional[Dict[str, Any]]:
"""Check IO executor health."""
if not self.app._io_executor:
return None
running = self.app._io_executor.is_running
return {
"healthy": running,
"message": "IO executor is running"
if running
else "IO executor is not running",
}
def _check_cpu_executor_health(self) -> Optional[Dict[str, Any]]:
"""Check CPU executor health."""
if not self.app._cpu_executor:
return None
running = self.app._cpu_executor.is_running
return {
"healthy": running,
"message": "CPU executor is running"
if running
else "CPU executor is not running",
}
async def _check_scheduler_health(self) -> Optional[Dict[str, Any]]:
"""Check scheduler health."""
if not self.app.scheduler:
return None
running = self.app.scheduler.is_running
return {
"healthy": running,
"message": "Scheduler is running" if running else "Scheduler is not running",
}