"""Base interface for scheduler storage backends."""
from abc import ABC, abstractmethod
from datetime import datetime
from typing import List
from flowrra.scheduler.models import ScheduledTask, ScheduleType
[docs]
class BaseSchedulerBackend(ABC):
"""Abstract base class for scheduler storage backends.
Backends are responsible for persisting scheduled task definitions
and providing query capabilities for the scheduler service.
"""
[docs]
@abstractmethod
async def create(self, task: ScheduledTask) -> None:
"""Create a new scheduled task.
Args:
task: ScheduledTask to store
Raises:
ValueError: If task with same ID already exists
"""
pass
[docs]
@abstractmethod
async def update(self, task: ScheduledTask) -> None:
"""Update an existing scheduled task.
Args:
task: ScheduledTask with updated fields
Raises:
ValueError: If task doesn't exist
"""
pass
[docs]
@abstractmethod
async def delete(self, task_id: str) -> bool:
"""Delete a scheduled task.
Args:
task_id: ID of task to delete
Returns:
True if deleted, False if not found
"""
pass
[docs]
@abstractmethod
async def get(self, task_id: str) -> ScheduledTask | None:
"""Get a scheduled task by ID.
Args:
task_id: Task ID to retrieve
Returns:
ScheduledTask if found, None otherwise
"""
pass
[docs]
@abstractmethod
async def list_all(self) -> List[ScheduledTask]:
"""List all scheduled tasks.
Returns:
List of all ScheduledTask objects
"""
pass
[docs]
@abstractmethod
async def list_enabled(self) -> List[ScheduledTask]:
"""List only enabled scheduled tasks.
Returns:
List of enabled ScheduledTask objects
"""
pass
[docs]
@abstractmethod
async def list_due(self, now: datetime | None = None) -> List[ScheduledTask]:
"""List tasks that are due for execution.
Args:
now: Current time (defaults to datetime.now())
Returns:
List of ScheduledTask objects due for execution
"""
pass
[docs]
@abstractmethod
async def update_run_times(
self, task_id: str, last_run: datetime, next_run: datetime
) -> None:
"""Update task run times after execution.
Args:
task_id: ID of the task
last_run: Timestamp of last execution
next_run: Timestamp of next scheduled execution
Raises:
ValueError: If task doesn't exist
"""
pass
[docs]
@abstractmethod
async def clear(self) -> int:
"""Clear all scheduled tasks.
Returns:
Number of tasks deleted
"""
pass
[docs]
@abstractmethod
async def find_by_definition(
self,
task_name: str,
schedule_type: ScheduleType,
schedule: str,
args: tuple = (),
kwargs: dict | None = None,
) -> ScheduledTask | None:
"""Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
Args:
task_name: Task name to match
schedule_type: Type of schedule (CRON, INTERVAL, ONE_TIME)
schedule: Schedule expression/value
args: Task arguments tuple
kwargs: Task keyword arguments dict
Returns:
Matching ScheduledTask if found, None otherwise
"""
pass
[docs]
@abstractmethod
async def close(self) -> None:
"""Close backend connections and cleanup resources."""
pass