"""I/O-bound task executor for async operations."""
import asyncio
import logging
from datetime import datetime
from flowrra.executors.base import BaseTaskExecutor
from flowrra.registry import TaskRegistry
from flowrra.task import Task, TaskResult, TaskStatus
from flowrra.config import Config
logger = logging.getLogger("flowrra")
[docs]
class IOExecutor(BaseTaskExecutor):
"""Executor for I/O-bound async tasks.
Best for:
- Network requests (HTTP, database queries)
- File I/O operations
- API calls
- Any async/await operations
"""
[docs]
def __init__(
self,
config: Config,
registry: TaskRegistry | None = None
):
"""Initialize I/O executor.
Args:
config: Configuration object (optional, defaults to Config())
registry: Shared TaskRegistry instance (optional, creates new if None)
Example:
# With full config
from flowrra import Config, BrokerConfig, BackendConfig, ExecutorConfig
config = Config(
broker=BrokerConfig(url='redis://localhost:6379/0'),
backend=BackendConfig(url='redis://localhost:6379/1'),
executor=ExecutorConfig(io_workers=8)
)
executor = IOExecutor(config=config)
# With default config (uses InMemoryBackend and asyncio.PriorityQueue)
executor = IOExecutor()
"""
io_workers = config.executor.io_workers
super().__init__(config=config, registry=registry, queue_suffix=":io")
self._io_workers = io_workers
self._config = config
[docs]
def is_broker(self) -> bool:
"""Check if executor uses a broker for task queueing."""
return self.broker is not None
[docs]
def task(self, name: str | None = None, max_retries: int = 3, retry_delay: float = 1.0):
"""Register an async I/O-bound task.
Args:
name: Custom task name (defaults to function name)
max_retries: Max retry attempts on failure
retry_delay: Seconds between retries
Returns:
Decorator function
Raises:
TypeError: If decorated function is not async or is CPU-bound
"""
return self.registry.task(name=name, cpu_bound=False, max_retries=max_retries, retry_delay=retry_delay)
async def _execute_task(self, task: Task, worker_id: int):
"""Execute an async I/O-bound task with retry logic.
Args:
task: Task to execute
worker_id: ID of worker executing the task
"""
task_func = self.registry.get(task.name)
result = TaskResult(
task_id=task.id,
task_name=task.name,
status=TaskStatus.RUNNING,
started_at=datetime.now(),
retries=task.current_retry,
)
await self._store_and_emit(result)
logger.info(f"Worker-{worker_id} running {task.name}[{task.id[:8]}]")
try:
output = await task_func(*task.args, **task.kwargs)
result.status = TaskStatus.SUCCESS
result.result = output
result.finished_at = datetime.now()
await self._store_and_emit(result)
logger.info(f"Task {task.name}[{task.id[:8]}] succeeded")
except Exception as e:
if task.current_retry < task.max_retries:
task.current_retry += 1
result.status = TaskStatus.RETRYING
result.retries = task.current_retry
await self._store_and_emit(result)
logger.warning(
f"Task {task.name}[{task.id[:8]}] failed, "
f"retry {task.current_retry}/{task.max_retries} in {task.retry_delay}s"
)
await asyncio.sleep(task.retry_delay)
# Re-queue task for retry
if self.broker is not None:
await self.broker.push(task)
else:
await self._queue.put(task)
else:
result.status = TaskStatus.FAILED
result.error = str(e)
result.finished_at = datetime.now()
await self._store_and_emit(result)
logger.error(f"Task {task.name}[{task.id[:8]}] failed: {e}")
[docs]
async def start(self):
"""Start the I/O executor and worker pool."""
if self._running:
return
self._running = True
self._workers = [
asyncio.create_task(self._worker(i))
for i in range(self._io_workers)
]
logger.info(f'IOExecutor started: io_workers={self._io_workers}')
[docs]
async def stop(self, wait: bool = True, timeout: float | None = 30.0):
"""Stop the I/O executor.
Args:
wait: If True, wait for pending tasks to complete
timeout: Max seconds to wait for pending tasks (default: 30.0)
"""
if not self._running:
return
self._running = False
if wait and self._queue is not None:
try:
await asyncio.wait_for(self._queue.join(), timeout=timeout)
except asyncio.TimeoutError:
logger.warning("Shutdown timeout, cancelling workers")
for worker in self._workers:
worker.cancel()
await asyncio.gather(*self._workers, return_exceptions=True)
self._workers.clear()
logger.info("IOExecutor stopped")