Source code for flowrra.scheduler.backends.postgresql

"""PostgreSQL backend for scheduler storage.

This backend uses asyncpg for high-performance async PostgreSQL access,
ideal for distributed/production deployments.
"""

import json
from datetime import datetime
from typing import List

from flowrra.scheduler.backends.base import BaseSchedulerBackend
from flowrra.scheduler.models import ScheduledTask, ScheduleType


[docs] class PostgreSQLSchedulerBackend(BaseSchedulerBackend): """PostgreSQL-based scheduler backend. Features: - High-performance async operations via asyncpg - Connection pooling - Transaction support - JSONB for efficient JSON storage and queries - Good for distributed/production deployments Args: url: PostgreSQL connection URL Format: postgresql://user:pass@host:port/dbname """
[docs] def __init__(self, url: str): """Initialize PostgreSQL backend. Args: url: PostgreSQL connection URL Raises: ImportError: If asyncpg is not installed """ self.url = url self._pool = None
async def _ensure_connected(self): """Ensure database connection pool exists.""" if self._pool is None: try: import asyncpg except ImportError as e: raise ImportError( "PostgreSQL backend requires asyncpg package. " "Install with: pip install flowrra[postgresql]" ) from e self._pool = await asyncpg.create_pool(self.url) await self._create_schema() return self._pool async def _create_schema(self) -> None: """Create database schema if it doesn't exist.""" pool = self._pool async with pool.acquire() as conn: await conn.execute( """ CREATE TABLE IF NOT EXISTS scheduled_tasks ( id TEXT PRIMARY KEY, task_name TEXT NOT NULL, schedule_type TEXT NOT NULL, schedule TEXT NOT NULL, args JSONB NOT NULL, kwargs JSONB NOT NULL, enabled BOOLEAN NOT NULL DEFAULT TRUE, last_run_at TIMESTAMP, next_run_at TIMESTAMP, created_at TIMESTAMP NOT NULL, updated_at TIMESTAMP NOT NULL, description TEXT, max_retries INTEGER NOT NULL DEFAULT 3, retry_delay REAL NOT NULL DEFAULT 1.0, priority INTEGER NOT NULL DEFAULT 0 ) """ ) await conn.execute( "CREATE INDEX IF NOT EXISTS idx_enabled ON scheduled_tasks(enabled)" ) await conn.execute( "CREATE INDEX IF NOT EXISTS idx_next_run ON scheduled_tasks(next_run_at)" ) # Index for idempotency checks (find_by_definition) await conn.execute( "CREATE INDEX IF NOT EXISTS idx_task_definition ON scheduled_tasks(task_name, schedule_type, schedule)" )
[docs] async def create(self, task: ScheduledTask) -> None: """Create a new scheduled task.""" pool = await self._ensure_connected() async with pool.acquire() as conn: # Check if exists exists = await conn.fetchval( "SELECT 1 FROM scheduled_tasks WHERE id = $1", task.id ) if exists: raise ValueError(f"Scheduled task with ID '{task.id}' already exists") await conn.execute( """ INSERT INTO scheduled_tasks ( id, task_name, schedule_type, schedule, args, kwargs, enabled, last_run_at, next_run_at, created_at, updated_at, description, max_retries, retry_delay, priority ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) """, task.id, task.task_name, task.schedule_type.value, task.schedule, json.dumps(list(task.args)), json.dumps(task.kwargs), task.enabled, task.last_run_at, task.next_run_at, task.created_at, task.updated_at, task.description, task.max_retries, task.retry_delay, task.priority, )
[docs] async def update(self, task: ScheduledTask) -> None: """Update an existing scheduled task.""" pool = await self._ensure_connected() task.updated_at = datetime.now() async with pool.acquire() as conn: result = await conn.execute( """ UPDATE scheduled_tasks SET task_name = $1, schedule_type = $2, schedule = $3, args = $4, kwargs = $5, enabled = $6, last_run_at = $7, next_run_at = $8, updated_at = $9, description = $10, max_retries = $11, retry_delay = $12, priority = $13 WHERE id = $14 """, task.task_name, task.schedule_type.value, task.schedule, json.dumps(list(task.args)), json.dumps(task.kwargs), task.enabled, task.last_run_at, task.next_run_at, task.updated_at, task.description, task.max_retries, task.retry_delay, task.priority, task.id, ) if result == "UPDATE 0": raise ValueError(f"Scheduled task with ID '{task.id}' not found")
[docs] async def delete(self, task_id: str) -> bool: """Delete a scheduled task.""" pool = await self._ensure_connected() async with pool.acquire() as conn: result = await conn.execute( "DELETE FROM scheduled_tasks WHERE id = $1", task_id ) return result != "DELETE 0"
[docs] async def get(self, task_id: str) -> ScheduledTask | None: """Get a scheduled task by ID.""" pool = await self._ensure_connected() async with pool.acquire() as conn: row = await conn.fetchrow( "SELECT * FROM scheduled_tasks WHERE id = $1", task_id ) if row is None: return None return self._row_to_task(row)
[docs] async def list_all(self) -> List[ScheduledTask]: """List all scheduled tasks.""" pool = await self._ensure_connected() async with pool.acquire() as conn: rows = await conn.fetch("SELECT * FROM scheduled_tasks ORDER BY created_at") return [self._row_to_task(row) for row in rows]
[docs] async def list_enabled(self) -> List[ScheduledTask]: """List only enabled scheduled tasks.""" pool = await self._ensure_connected() async with pool.acquire() as conn: rows = await conn.fetch( "SELECT * FROM scheduled_tasks WHERE enabled = TRUE ORDER BY next_run_at" ) return [self._row_to_task(row) for row in rows]
[docs] async def list_due(self, now: datetime | None = None) -> List[ScheduledTask]: """List tasks that are due for execution.""" if now is None: now = datetime.now() pool = await self._ensure_connected() async with pool.acquire() as conn: rows = await conn.fetch( """ SELECT * FROM scheduled_tasks WHERE enabled = TRUE AND next_run_at <= $1 ORDER BY priority DESC, next_run_at """, now, ) return [self._row_to_task(row) for row in rows]
[docs] async def update_run_times( self, task_id: str, last_run: datetime, next_run: datetime ) -> None: """Update task run times after execution.""" pool = await self._ensure_connected() async with pool.acquire() as conn: result = await conn.execute( """ UPDATE scheduled_tasks SET last_run_at = $1, next_run_at = $2, updated_at = $3 WHERE id = $4 """, last_run, next_run, datetime.now(), task_id, ) if result == "UPDATE 0": raise ValueError(f"Scheduled task with ID '{task_id}' not found")
[docs] async def clear(self) -> int: """Clear all scheduled tasks.""" pool = await self._ensure_connected() async with pool.acquire() as conn: result = await conn.execute("DELETE FROM scheduled_tasks") # Parse "DELETE N" to get count return int(result.split()[1]) if result else 0
[docs] async def find_by_definition( self, task_name: str, schedule_type: ScheduleType, schedule: str, args: tuple = (), kwargs: dict | None = None, ) -> ScheduledTask | None: """Find exact schedule match by complete definition. Used for idempotency - finds schedules with identical parameters. Args: task_name: Task name to match schedule_type: Type of schedule (CRON, INTERVAL, ONE_TIME) schedule: Schedule expression/value args: Task arguments tuple kwargs: Task keyword arguments dict Returns: Matching ScheduledTask if found, None otherwise """ pool = await self._ensure_connected() # Normalize kwargs normalized_kwargs = kwargs or {} # Serialize args and kwargs for comparison args_json = json.dumps(list(args)) kwargs_json = json.dumps(normalized_kwargs) async with pool.acquire() as conn: row = await conn.fetchrow( """ SELECT * FROM scheduled_tasks WHERE task_name = $1 AND schedule_type = $2 AND schedule = $3 AND args::text = $4::text AND kwargs::text = $5::text LIMIT 1 """, task_name, schedule_type.value, schedule, args_json, kwargs_json, ) if row is None: return None return self._row_to_task(row)
[docs] async def close(self) -> None: """Close database connection pool.""" if self._pool is not None: await self._pool.close() self._pool = None
def _row_to_task(self, row) -> ScheduledTask: """Convert PostgreSQL row to ScheduledTask.""" return ScheduledTask( id=row["id"], task_name=row["task_name"], schedule_type=ScheduleType(row["schedule_type"]), schedule=row["schedule"], args=tuple(json.loads(row["args"])), kwargs=json.loads(row["kwargs"]), enabled=row["enabled"], last_run_at=row["last_run_at"], next_run_at=row["next_run_at"], created_at=row["created_at"], updated_at=row["updated_at"], description=row["description"], max_retries=row["max_retries"], retry_delay=row["retry_delay"], priority=row["priority"], )