from typing import Callable, TYPE_CHECKING
from flowrra.config import Config, BrokerConfig, BackendConfig
from flowrra.executors.io_executor import IOExecutor
from flowrra.executors.cpu_executor import CPUExecutor
from flowrra.registry import TaskRegistry
from flowrra.task import TaskResult
if TYPE_CHECKING:
from flowrra.scheduler import Scheduler
[docs]
class Flowrra:
"""Unified Flowrra application for task execution.
Automatically routes tasks to appropriate executors based on task type.
Usage:
# Using Config
config = Config(
broker=BrokerConfig(url='redis://localhost:6379/0'),
backend=BackendConfig(url='redis://localhost:6379/1')
)
app = Flowrra(config=config)
# Using from_urls()
app = Flowrra.from_urls(
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
# I/O-bound task (default)
@app.task()
async def fetch_data(url: str):
return await fetch(url)
# CPU-bound task
@app.task(cpu_bound=True)
def heavy_compute(n: int):
return sum(i**2 for i in range(n))
"""
[docs]
def __init__(self, config: Config | None = None):
if config is None:
config = Config()
self._config = config
self._registry = TaskRegistry()
self._io_executor: IOExecutor | None = None
self._cpu_executor: CPUExecutor | None = None
self._scheduler: "Scheduler | None" = None
self._running = False
def _init_executor(self, cpu_bound: bool):
if cpu_bound:
if self._cpu_executor is None:
if self._config.backend is None:
raise ValueError("CPU-bound tasks require backend for cross-process results")
self._cpu_executor = CPUExecutor(config=self._config, registry=self._registry)
return self._cpu_executor
else:
if self._io_executor is None:
self._io_executor = IOExecutor(config=self._config, registry=self._registry)
return self._io_executor
[docs]
@classmethod
def from_urls(
cls,
broker: str | None = None,
backend: str | None = None,
) -> "Flowrra":
"""Convenience method to create Flowrra from URLs.
Args:
broker: Broker connection URL (optional, None = asyncio.PriorityQueue)
backend: Backend connection URL (optional, None = InMemoryBackend for IO, required for CPU)
Returns:
Flowrra instance
"""
broker_config = BrokerConfig(url=broker) if broker else None
backend_config = BackendConfig(url=backend) if backend else None
config = Config(broker=broker_config, backend=backend_config)
return cls(config=config)
[docs]
def task(
self,
name: str | None = None,
cpu_bound: bool = False,
**kwargs
):
"""Register a task with automatic executor routing.
Args:
name: Custom task name (defaults to function name)
cpu_bound: If True, uses CPUExecutor; if False, uses IOExecutor (default)
Returns:
Decorator function
"""
executor = self._cpu_executor if cpu_bound else self._io_executor
if executor is None:
executor = self._init_executor(cpu_bound)
# Register task in executor registry
decorator = executor.task(name=name, **kwargs)
# Attach executor reference to decorated function for easy submit
def wrapper(func):
func._executor = executor
return decorator(func)
return wrapper
[docs]
async def submit(
self,
task_func: Callable,
*args,
**kwargs
) -> str:
"""Submit a task for execution.
Args:
task_func: Registered task function
*args: Positional arguments for the task
**kwargs: Keyword arguments for the task
Returns:
task_id: Unique identifier for tracking
"""
executor = getattr(task_func, "_executor", self._io_executor)
if executor is None:
raise RuntimeError("Task function is not registered with this app")
return await executor.submit(task_func, *args, **kwargs)
[docs]
async def wait_for_result(
self,
task_id: str,
timeout: float | None = None
) -> TaskResult:
"""Wait for a task to complete and return its result.
Args:
task_id: Task identifier
timeout: Max seconds to wait (None = wait forever)
Returns:
TaskResult with status, result, and error information
"""
if self._io_executor and self._io_executor.is_running:
result = await self._io_executor.get_result(task_id)
if result:
if result.is_complete:
return result
return await self._io_executor.wait_for_result(task_id, timeout=timeout)
if self._cpu_executor and self._cpu_executor.is_running:
result = await self._cpu_executor.get_result(task_id)
if result:
if result.is_complete:
return result
return await self._cpu_executor.wait_for_result(task_id, timeout=timeout)
raise ValueError(f"Task {task_id} not found in any executor")
[docs]
async def get_result(self, task_id: str) -> TaskResult | None:
if self._io_executor:
result = await self._io_executor.get_result(task_id)
if result:
return result
if self._cpu_executor:
result = await self._cpu_executor.get_result(task_id)
if result:
return result
return None
[docs]
async def start(self):
if self._running:
return
self._running = True
if self._io_executor:
await self._io_executor.start()
if self._cpu_executor:
await self._cpu_executor.start()
if self._scheduler:
await self._scheduler.start()
[docs]
def create_scheduler(
self,
backend: "BaseSchedulerBackend | str | None" = None,
check_interval: float = 60.0
) -> "Scheduler":
"""Create a scheduler integrated with this app's executors.
The scheduler is automatically registered with the app and will start/stop
with the app lifecycle. Access it via app.scheduler property or use
FlowrraManager for comprehensive schedule management.
Args:
backend: Scheduler backend (URL string or instance)
- None: Uses default SQLite backend (.flowrra_schedule.db)
- String: Database URL (e.g., "postgresql://localhost/db")
- Instance: Custom BaseSchedulerBackend instance
check_interval: How often to check for due tasks (seconds)
Returns:
Scheduler instance configured with this app's executors
Example:
app = Flowrra.from_urls()
@app.task()
async def send_report():
return "Report sent"
# Create integrated scheduler (auto-registered)
scheduler = app.create_scheduler()
# Schedule tasks
await scheduler.schedule_cron(
task_name="send_report",
cron="0 9 * * *"
)
# Start app (automatically starts scheduler too)
await app.start()
# Or use FlowrraManager for comprehensive management
from flowrra.management import FlowrraManager
manager = FlowrraManager(app)
schedules = await manager.list_schedules()
"""
from flowrra.scheduler import Scheduler
from flowrra.scheduler.backends import get_scheduler_backend
if isinstance(backend, str):
scheduler_backend = get_scheduler_backend(backend)
elif backend is None:
scheduler_backend = get_scheduler_backend()
else:
scheduler_backend = backend
scheduler = Scheduler(
backend=scheduler_backend,
registry=self.registry,
check_interval=check_interval,
io_executor=self._io_executor,
cpu_executor=self._cpu_executor
)
# Store reference for management access
self._scheduler = scheduler
return scheduler
[docs]
async def stop(self, wait: bool = True, timeout: float | None = 30.0):
"""Stop the Flowrra application and all executors.
Args:
wait: If True, wait for pending tasks to complete
timeout: Max seconds to wait for pending tasks
"""
if not self._running:
return
self._running = False
if self._io_executor:
await self._io_executor.stop(wait=wait, timeout=timeout)
if self._cpu_executor:
await self._cpu_executor.stop(wait=wait, timeout=timeout)
if self._scheduler:
await self._scheduler.stop()
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
@property
def is_running(self) -> bool:
return self._running
@property
def registry(self):
"""Get the shared task registry.
Returns:
TaskRegistry instance
Note:
Both IOExecutor and CPUExecutor share the same registry instance
"""
return self._registry
@property
def scheduler(self) -> "Scheduler | None":
"""Get the registered scheduler instance.
Returns:
Scheduler instance if created via create_scheduler(), None otherwise
Note:
The scheduler is optional. It only exists if create_scheduler()
was called.
"""
return self._scheduler