flowrra.management package

Submodules

flowrra.management.manager module

Core management API for Flowrra applications.

class flowrra.management.manager.FlowrraManager(app)[source]

Bases: object

Management interface for querying Flowrra application state.

This class provides a framework-agnostic API for querying: - System statistics (executor status, task counts) - Registered tasks - Pending/completed tasks - Scheduler state and schedules

Parameters:

app (Flowrra) – Flowrra application instance

Example

manager = FlowrraManager(app) stats = await manager.get_stats() tasks = await manager.list_registered_tasks()

__init__(app)[source]

Initialize manager with Flowrra app.

async get_stats()[source]

Get comprehensive system statistics.

Returns:

{

“app”: {“running”: bool}, “executors”: {

”io”: {“running”: bool, “workers”: int} | None, “cpu”: {“running”: bool, “workers”: int} | None

}, “tasks”: {

”registered”: int, “pending”: int

}, “scheduler”: {

”enabled”: bool, “schedules”: int

} | None

}

Return type:

Dictionary with system state

async list_registered_tasks()[source]

List all registered tasks.

Returns:

[
{

“name”: str, “cpu_bound”: bool, “max_retries”: int, “retry_delay”: float

]

Return type:

List of task information dictionaries

async get_task_info(task_name)[source]

Get detailed information about a specific task.

Parameters:

task_name (str) – Name of the task

Return type:

Optional[Dict[str, Any]]

Returns:

Task information dictionary or None if not found

async list_pending_tasks(limit=None)[source]

List pending tasks waiting for execution.

Parameters:

limit (Optional[int]) – Maximum number of tasks to return (None = no limit)

Return type:

List[Dict[str, Any]]

Returns:

List of pending task dictionaries, ordered by submission time (newest first)

async list_running_tasks(limit=None)[source]

List currently running tasks.

Return type:

List[Dict[str, Any]]

async list_completed_tasks(limit=None)[source]

List successfully completed tasks.

Return type:

List[Dict[str, Any]]

async list_failed_tasks(limit=None)[source]

List failed tasks.

Return type:

List[Dict[str, Any]]

async get_task_result(task_id)[source]

Get result of a completed task.

Parameters:

task_id (str) – Task identifier

Returns:

{

“task_id”: str, “status”: str, “result”: Any, “error”: str | None, “completed_at”: datetime | None

}

Return type:

Task result dictionary or None if not found

async list_schedules(enabled_only=False)[source]

List all scheduled tasks.

Parameters:

enabled_only (bool) – If True, only return enabled schedules

Return type:

List[Dict[str, Any]]

Returns:

List of schedule dictionaries (empty if no scheduler configured)

async get_schedule(schedule_id)[source]

Get details of a specific schedule.

Parameters:

schedule_id (str) – Schedule identifier

Return type:

Optional[Dict[str, Any]]

Returns:

Schedule dictionary or None if not found/no scheduler

async get_scheduler_stats()[source]

Get scheduler statistics.

Return type:

Optional[Dict[str, Any]]

Returns:

Scheduler stats dictionary or None if no scheduler

async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]

Create a cron-based schedule.

Parameters:
  • task_name (str) – Name of registered task

  • cron (str) – Cron expression (e.g., “0 9 * * *”)

  • args (tuple) – Positional arguments for task

  • kwargs (Optional[dict]) – Keyword arguments for task

  • enabled (bool) – Whether schedule starts enabled

  • description (Optional[str]) – Optional description

  • priority (int) – Task priority (higher = more important)

Return type:

str

Returns:

Schedule ID

Raises:

ValueError – If no scheduler configured

async create_schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]

Create an interval-based schedule.

Parameters:
  • task_name (str) – Name of registered task

  • interval (float) – Interval in seconds

  • args (tuple) – Positional arguments for task

  • kwargs (Optional[dict]) – Keyword arguments for task

  • enabled (bool) – Whether schedule starts enabled

  • description (Optional[str]) – Optional description

  • priority (int) – Task priority

Return type:

str

Returns:

Schedule ID

Raises:

ValueError – If no scheduler configured

async enable_schedule(schedule_id)[source]

Enable a scheduled task.

Parameters:

schedule_id (str) – Schedule identifier

Raises:

ValueError – If no scheduler configured or schedule not found

Return type:

None

async disable_schedule(schedule_id)[source]

Disable a scheduled task.

Parameters:

schedule_id (str) – Schedule identifier

Raises:

ValueError – If no scheduler configured or schedule not found

Return type:

None

async delete_schedule(schedule_id)[source]

Delete a scheduled task.

Parameters:

schedule_id (str) – Schedule identifier

Return type:

bool

Returns:

True if deleted, False if not found

Raises:

ValueError – If no scheduler configured

async health_check()[source]

Perform health check of the Flowrra application.

Returns:

{

“healthy”: bool, “timestamp”: datetime, “components”: {

”app”: {“healthy”: bool, “message”: str}, “io_executor”: {“healthy”: bool, “message”: str} | None, “cpu_executor”: {“healthy”: bool, “message”: str} | None

}

}

Return type:

Health status dictionary

Module contents

Management API for querying Flowrra state.

This module provides a pure Python API for querying and managing Flowrra applications. It has NO web framework dependencies and can be used by: - CLI tools - Web UI adapters (FastAPI, Flask, Django) - Monitoring scripts - Testing utilities

Example

from flowrra import Flowrra from flowrra.management import FlowrraManager

app = Flowrra.from_urls() manager = FlowrraManager(app)

# Query system state stats = await manager.get_stats() tasks = await manager.list_registered_tasks() schedules = await manager.list_schedules()

class flowrra.management.FlowrraManager(app)[source]

Bases: object

Management interface for querying Flowrra application state.

This class provides a framework-agnostic API for querying: - System statistics (executor status, task counts) - Registered tasks - Pending/completed tasks - Scheduler state and schedules

Parameters:

app (Flowrra) – Flowrra application instance

Example

manager = FlowrraManager(app) stats = await manager.get_stats() tasks = await manager.list_registered_tasks()

__init__(app)[source]

Initialize manager with Flowrra app.

async get_stats()[source]

Get comprehensive system statistics.

Returns:

{

“app”: {“running”: bool}, “executors”: {

”io”: {“running”: bool, “workers”: int} | None, “cpu”: {“running”: bool, “workers”: int} | None

}, “tasks”: {

”registered”: int, “pending”: int

}, “scheduler”: {

”enabled”: bool, “schedules”: int

} | None

}

Return type:

Dictionary with system state

async list_registered_tasks()[source]

List all registered tasks.

Returns:

[
{

“name”: str, “cpu_bound”: bool, “max_retries”: int, “retry_delay”: float

]

Return type:

List of task information dictionaries

async get_task_info(task_name)[source]

Get detailed information about a specific task.

Parameters:

task_name (str) – Name of the task

Return type:

Optional[Dict[str, Any]]

Returns:

Task information dictionary or None if not found

async list_pending_tasks(limit=None)[source]

List pending tasks waiting for execution.

Parameters:

limit (Optional[int]) – Maximum number of tasks to return (None = no limit)

Return type:

List[Dict[str, Any]]

Returns:

List of pending task dictionaries, ordered by submission time (newest first)

async list_running_tasks(limit=None)[source]

List currently running tasks.

Return type:

List[Dict[str, Any]]

async list_completed_tasks(limit=None)[source]

List successfully completed tasks.

Return type:

List[Dict[str, Any]]

async list_failed_tasks(limit=None)[source]

List failed tasks.

Return type:

List[Dict[str, Any]]

async get_task_result(task_id)[source]

Get result of a completed task.

Parameters:

task_id (str) – Task identifier

Returns:

{

“task_id”: str, “status”: str, “result”: Any, “error”: str | None, “completed_at”: datetime | None

}

Return type:

Task result dictionary or None if not found

async list_schedules(enabled_only=False)[source]

List all scheduled tasks.

Parameters:

enabled_only (bool) – If True, only return enabled schedules

Return type:

List[Dict[str, Any]]

Returns:

List of schedule dictionaries (empty if no scheduler configured)

async get_schedule(schedule_id)[source]

Get details of a specific schedule.

Parameters:

schedule_id (str) – Schedule identifier

Return type:

Optional[Dict[str, Any]]

Returns:

Schedule dictionary or None if not found/no scheduler

async get_scheduler_stats()[source]

Get scheduler statistics.

Return type:

Optional[Dict[str, Any]]

Returns:

Scheduler stats dictionary or None if no scheduler

async create_schedule_cron(task_name, cron, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]

Create a cron-based schedule.

Parameters:
  • task_name (str) – Name of registered task

  • cron (str) – Cron expression (e.g., “0 9 * * *”)

  • args (tuple) – Positional arguments for task

  • kwargs (Optional[dict]) – Keyword arguments for task

  • enabled (bool) – Whether schedule starts enabled

  • description (Optional[str]) – Optional description

  • priority (int) – Task priority (higher = more important)

Return type:

str

Returns:

Schedule ID

Raises:

ValueError – If no scheduler configured

async create_schedule_interval(task_name, interval, args=(), kwargs=None, enabled=True, description=None, priority=0)[source]

Create an interval-based schedule.

Parameters:
  • task_name (str) – Name of registered task

  • interval (float) – Interval in seconds

  • args (tuple) – Positional arguments for task

  • kwargs (Optional[dict]) – Keyword arguments for task

  • enabled (bool) – Whether schedule starts enabled

  • description (Optional[str]) – Optional description

  • priority (int) – Task priority

Return type:

str

Returns:

Schedule ID

Raises:

ValueError – If no scheduler configured

async enable_schedule(schedule_id)[source]

Enable a scheduled task.

Parameters:

schedule_id (str) – Schedule identifier

Raises:

ValueError – If no scheduler configured or schedule not found

Return type:

None

async disable_schedule(schedule_id)[source]

Disable a scheduled task.

Parameters:

schedule_id (str) – Schedule identifier

Raises:

ValueError – If no scheduler configured or schedule not found

Return type:

None

async delete_schedule(schedule_id)[source]

Delete a scheduled task.

Parameters:

schedule_id (str) – Schedule identifier

Return type:

bool

Returns:

True if deleted, False if not found

Raises:

ValueError – If no scheduler configured

async health_check()[source]

Perform health check of the Flowrra application.

Returns:

{

“healthy”: bool, “timestamp”: datetime, “components”: {

”app”: {“healthy”: bool, “message”: str}, “io_executor”: {“healthy”: bool, “message”: str} | None, “cpu_executor”: {“healthy”: bool, “message”: str} | None

}

}

Return type:

Health status dictionary