from abc import ABC, abstractmethod
from flowrra.task import TaskResult, TaskStatus
[docs]
class BaseResultBackend(ABC):
"""Abstract base for result storage backends.
Implementations must provide:
- store(): Persist a task result
- get(): Retrieve a task result by ID
- wait_for(): Block until a task completes
"""
[docs]
@abstractmethod
async def store(self, task_id: str, result: TaskResult) -> None:
"""Store a task result.
Args:
task_id: Unique task identifier
result: TaskResult object to store
"""
pass
[docs]
@abstractmethod
async def get(self, task_id: str) -> TaskResult | None:
"""Retrieve a task result.
Args:
task_id: Unique task identifier
Returns:
TaskResult if found, None otherwise
"""
pass
[docs]
@abstractmethod
async def wait_for(self, task_id: str, timeout: float | None) -> TaskResult:
"""Wait for a task to complete.
Args:
task_id: Unique task identifier
timeout: Maximum seconds to wait (None = wait forever)
Returns:
TaskResult when task completes
Raises:
asyncio.TimeoutError: If timeout exceeded
"""
pass
[docs]
async def delete(self, task_id: str) -> bool:
"""Delete a task result. Optional to implement.
Returns:
True if deleted, False if not found
"""
raise NotImplementedError("delete() not supported by this backend")
[docs]
async def clear(self) -> int:
"""Clear all results. Optional to implement.
Returns:
Number of results cleared
"""
raise NotImplementedError("clear() not supported by this backend")
[docs]
async def list_by_status(
self,
status: TaskStatus,
limit: int | None = None,
offset: int = 0
) -> list[TaskResult]:
"""List tasks by status with optional pagination.
Optional to implement. Backends that don't support querying should
raise NotImplementedError (default behavior).
Args:
status: Task status to filter by (PENDING, RUNNING, SUCCESS, FAILED, RETRYING)
limit: Maximum number of results to return (None = no limit)
offset: Number of results to skip (for pagination)
Returns:
List of TaskResult objects matching the status, ordered by submitted_at DESC
(most recent first). Empty list if no matches found.
Raises:
NotImplementedError: If backend doesn't support status queries
Note:
- Results are ordered by submitted_at DESC (newest first)
- If submitted_at is None, those tasks are ordered last
- InMemoryBackend and RedisBackend both support this operation
"""
raise NotImplementedError(
f"list_by_status() not supported by {self.__class__.__name__}"
)