"""MySQL backend for scheduler storage.
This backend uses aiomysql for async MySQL access,
ideal for distributed/production deployments.
"""
import json
from datetime import datetime
from typing import List
from urllib.parse import urlparse
from flowrra.scheduler.backends.base import BaseSchedulerBackend
from flowrra.scheduler.models import ScheduledTask, ScheduleType
[docs]
class MySQLSchedulerBackend(BaseSchedulerBackend):
"""MySQL-based scheduler backend.
Features:
- High-performance async operations via aiomysql
- Connection pooling
- Transaction support
- JSON column support
- Good for distributed/production deployments
- Compatible with MariaDB
Args:
url: MySQL connection URL
Format: mysql://user:pass@host:port/dbname
"""
[docs]
def __init__(self, url: str):
"""Initialize MySQL backend.
Args:
url: MySQL connection URL
Raises:
ImportError: If aiomysql is not installed
"""
self.url = url
self._pool = None
async def _ensure_connected(self):
"""Ensure database connection pool exists."""
if self._pool is None:
try:
import aiomysql
except ImportError as e:
raise ImportError(
"MySQL backend requires aiomysql package. "
"Install with: pip install flowrra[mysql]"
) from e
parsed = urlparse(self.url)
self._pool = await aiomysql.create_pool(
host=parsed.hostname or "localhost",
port=parsed.port or 3306,
user=parsed.username or "root",
password=parsed.password or "",
db=parsed.path.lstrip("/") if parsed.path else "flowrra",
autocommit=False,
)
await self._create_schema()
return self._pool
async def _create_schema(self) -> None:
"""Create database schema if it doesn't exist."""
pool = self._pool
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"""
CREATE TABLE IF NOT EXISTS scheduled_tasks (
id VARCHAR(255) PRIMARY KEY,
task_name VARCHAR(255) NOT NULL,
schedule_type VARCHAR(50) NOT NULL,
schedule TEXT NOT NULL,
args JSON NOT NULL,
kwargs JSON NOT NULL,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
last_run_at DATETIME,
next_run_at DATETIME,
created_at DATETIME NOT NULL,
updated_at DATETIME NOT NULL,
description TEXT,
max_retries INT NOT NULL DEFAULT 3,
retry_delay FLOAT NOT NULL DEFAULT 1.0,
priority INT NOT NULL DEFAULT 0,
INDEX idx_enabled (enabled),
INDEX idx_next_run (next_run_at),
INDEX idx_task_definition (task_name, schedule_type, schedule(255))
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
)
await conn.commit()
[docs]
async def create(self, task: ScheduledTask) -> None:
"""Create a new scheduled task."""
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
# Check if exists
await cursor.execute(
"SELECT 1 FROM scheduled_tasks WHERE id = %s", (task.id,)
)
exists = await cursor.fetchone()
if exists:
raise ValueError(f"Scheduled task with ID '{task.id}' already exists")
await cursor.execute(
"""
INSERT INTO scheduled_tasks (
id, task_name, schedule_type, schedule, args, kwargs,
enabled, last_run_at, next_run_at, created_at, updated_at,
description, max_retries, retry_delay, priority
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
task.id,
task.task_name,
task.schedule_type.value,
task.schedule,
json.dumps(list(task.args)),
json.dumps(task.kwargs),
task.enabled,
task.last_run_at,
task.next_run_at,
task.created_at,
task.updated_at,
task.description,
task.max_retries,
task.retry_delay,
task.priority,
),
)
await conn.commit()
[docs]
async def update(self, task: ScheduledTask) -> None:
"""Update an existing scheduled task."""
pool = await self._ensure_connected()
task.updated_at = datetime.now()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"""
UPDATE scheduled_tasks
SET task_name = %s, schedule_type = %s, schedule = %s, args = %s, kwargs = %s,
enabled = %s, last_run_at = %s, next_run_at = %s, updated_at = %s,
description = %s, max_retries = %s, retry_delay = %s, priority = %s
WHERE id = %s
""",
(
task.task_name,
task.schedule_type.value,
task.schedule,
json.dumps(list(task.args)),
json.dumps(task.kwargs),
task.enabled,
task.last_run_at,
task.next_run_at,
task.updated_at,
task.description,
task.max_retries,
task.retry_delay,
task.priority,
task.id,
),
)
if cursor.rowcount == 0:
raise ValueError(f"Scheduled task with ID '{task.id}' not found")
await conn.commit()
[docs]
async def delete(self, task_id: str) -> bool:
"""Delete a scheduled task."""
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"DELETE FROM scheduled_tasks WHERE id = %s", (task_id,)
)
deleted = cursor.rowcount > 0
await conn.commit()
return deleted
[docs]
async def get(self, task_id: str) -> ScheduledTask | None:
"""Get a scheduled task by ID."""
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT * FROM scheduled_tasks WHERE id = %s", (task_id,)
)
row = await cursor.fetchone()
if row is None:
return None
# Get column names
columns = [desc[0] for desc in cursor.description]
return self._row_to_task(dict(zip(columns, row)))
[docs]
async def list_all(self) -> List[ScheduledTask]:
"""List all scheduled tasks."""
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("SELECT * FROM scheduled_tasks ORDER BY created_at")
rows = await cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [self._row_to_task(dict(zip(columns, row))) for row in rows]
[docs]
async def list_enabled(self) -> List[ScheduledTask]:
"""List only enabled scheduled tasks."""
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"SELECT * FROM scheduled_tasks WHERE enabled = TRUE ORDER BY next_run_at"
)
rows = await cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [self._row_to_task(dict(zip(columns, row))) for row in rows]
[docs]
async def list_due(self, now: datetime | None = None) -> List[ScheduledTask]:
"""List tasks that are due for execution."""
if now is None:
now = datetime.now()
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"""
SELECT * FROM scheduled_tasks
WHERE enabled = TRUE AND next_run_at <= %s
ORDER BY priority DESC, next_run_at
""",
(now,),
)
rows = await cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return [self._row_to_task(dict(zip(columns, row))) for row in rows]
[docs]
async def update_run_times(
self, task_id: str, last_run: datetime, next_run: datetime
) -> None:
"""Update task run times after execution."""
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"""
UPDATE scheduled_tasks
SET last_run_at = %s, next_run_at = %s, updated_at = %s
WHERE id = %s
""",
(last_run, next_run, datetime.now(), task_id),
)
if cursor.rowcount == 0:
raise ValueError(f"Scheduled task with ID '{task_id}' not found")
await conn.commit()
[docs]
async def clear(self) -> int:
"""Clear all scheduled tasks."""
pool = await self._ensure_connected()
async with pool.acquire() as conn:
async with conn.cursor() as cursor:
await cursor.execute("DELETE FROM scheduled_tasks")
count = cursor.rowcount
await conn.commit()
return count
[docs]
async def find_by_definition(
self,
task_name: str,
schedule_type: ScheduleType,
schedule: str,
args: tuple = (),
kwargs: dict | None = None,
) -> ScheduledTask | None:
"""Find exact schedule match by complete definition.
Used for idempotency - finds schedules with identical parameters.
Args:
task_name: Task name to match
schedule_type: Type of schedule (CRON, INTERVAL, ONE_TIME)
schedule: Schedule expression/value
args: Task arguments tuple
kwargs: Task keyword arguments dict
Returns:
Matching ScheduledTask if found, None otherwise
"""
pool = await self._ensure_connected()
# Normalize kwargs
normalized_kwargs = kwargs or {}
# Serialize args and kwargs for comparison
args_json = json.dumps(list(args))
kwargs_json = json.dumps(normalized_kwargs)
import aiomysql
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(
"""
SELECT * FROM scheduled_tasks
WHERE task_name = %s
AND schedule_type = %s
AND schedule = %s
AND args = %s
AND kwargs = %s
LIMIT 1
""",
(task_name, schedule_type.value, schedule, args_json, kwargs_json),
)
row = await cursor.fetchone()
if row is None:
return None
return self._row_to_task(row)
[docs]
async def close(self) -> None:
"""Close database connection pool."""
if self._pool is not None:
self._pool.close()
await self._pool.wait_closed()
self._pool = None
def _row_to_task(self, row: dict) -> ScheduledTask:
"""Convert MySQL row dict to ScheduledTask."""
return ScheduledTask(
id=row["id"],
task_name=row["task_name"],
schedule_type=ScheduleType(row["schedule_type"]),
schedule=row["schedule"],
args=tuple(json.loads(row["args"])),
kwargs=json.loads(row["kwargs"]),
enabled=bool(row["enabled"]),
last_run_at=row["last_run_at"],
next_run_at=row["next_run_at"],
created_at=row["created_at"],
updated_at=row["updated_at"],
description=row["description"],
max_retries=row["max_retries"],
retry_delay=row["retry_delay"],
priority=row["priority"],
)