Source code for flowrra.scheduler.models

"""Scheduler models for persistent task scheduling."""

from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any


[docs] class ScheduleType(Enum): """Type of schedule for a task.""" CRON = "cron" INTERVAL = "interval" ONE_TIME = "one_time"
[docs] @dataclass class ScheduledTask: """A task scheduled for execution. Attributes: id: Unique identifier for scheduled task task_name: Name of the registered task to execute schedule_type: Type of schedule (cron, interval, one_time) schedule: Schedule definition (cron expression or interval in seconds) args: Positional arguments for task execution kwargs: Keyword arguments for task execution enabled: Whether the scheduled task is active last_run_at: Timestamp of last execution next_run_at: Timestamp of next scheduled execution created_at: When the scheduled task was created updated_at: When the scheduled task was last updated description: Optional description of the task max_retries: Maximum retry attempts on failure retry_delay: Delay between retries in seconds priority: Task priority (higher = more important) """ id: str task_name: str schedule_type: ScheduleType schedule: str args: tuple = field(default_factory=tuple) kwargs: dict = field(default_factory=dict) enabled: bool = True last_run_at: datetime | None = None next_run_at: datetime | None = None created_at: datetime = field(default_factory=datetime.now) updated_at: datetime = field(default_factory=datetime.now) description: str | None = None max_retries: int = 3 retry_delay: float = 1.0 priority: int = 0
[docs] def to_dict(self) -> dict[str, Any]: """Convert to dictionary for storage.""" return { "id": self.id, "task_name": self.task_name, "schedule_type": self.schedule_type.value, "schedule": self.schedule, "args": list(self.args), "kwargs": self.kwargs, "enabled": self.enabled, "last_run_at": self.last_run_at.isoformat() if self.last_run_at else None, "next_run_at": self.next_run_at.isoformat() if self.next_run_at else None, "created_at": self.created_at.isoformat(), "updated_at": self.updated_at.isoformat(), "description": self.description, "max_retries": self.max_retries, "retry_delay": self.retry_delay, "priority": self.priority, }
[docs] @classmethod def from_dict(cls, data: dict[str, Any]) -> "ScheduledTask": """Create from dictionary.""" return cls( id=data["id"], task_name=data["task_name"], schedule_type=ScheduleType(data["schedule_type"]), schedule=data["schedule"], args=tuple(data.get("args", [])), kwargs=data.get("kwargs", {}), enabled=data.get("enabled", True), last_run_at=datetime.fromisoformat(data["last_run_at"]) if data.get("last_run_at") else None, next_run_at=datetime.fromisoformat(data["next_run_at"]) if data.get("next_run_at") else None, created_at=datetime.fromisoformat(data["created_at"]), updated_at=datetime.fromisoformat(data["updated_at"]), description=data.get("description"), max_retries=data.get("max_retries", 3), retry_delay=data.get("retry_delay", 1.0), priority=data.get("priority", 0), )