import os
from dataclasses import dataclass, field
from typing import Any
[docs]
@dataclass
class BrokerConfig:
url: str
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
queue_key: str | None = None
[docs]
def __post_init__(self):
"""Validate broker configuration."""
if not self.url:
raise ValueError("Broker URL is required")
if self.max_connections < 1:
raise ValueError("max_connections must be at least 1")
if self.socket_timeout < 0:
raise ValueError("socket_timeout must be non-negative")
[docs]
def create_broker(self, queue_suffix: str = "") -> "BaseBroker":
"""Create broker instance from this configuration.
Args:
queue_suffix: Optional suffix for queue key (e.g., ":io" or ":cpu")
Returns:
Broker instance
Raises:
BrokerError: If URL scheme is unsupported or creation fails
"""
from flowrra.brokers.factory import get_broker
queue_key = self.queue_key if self.queue_key else f"flowrra:queue{queue_suffix}"
return get_broker(
self.url,
max_connections=self.max_connections,
socket_timeout=self.socket_timeout,
retry_on_timeout=self.retry_on_timeout,
queue_key=queue_key,
)
[docs]
@dataclass
class BackendConfig:
url: str
ttl: int | None = None
max_connections: int = 50
socket_timeout: float = 5.0
retry_on_timeout: bool = True
[docs]
def __post_init__(self):
"""Validate backend configuration."""
if not self.url:
raise ValueError("Backend URL is required")
if self.ttl is not None and self.ttl < 1:
raise ValueError("ttl must be at least 1 second")
if self.max_connections < 1:
raise ValueError("max_connections must be at least 1")
if self.socket_timeout < 0:
raise ValueError("socket_timeout must be non-negative")
[docs]
def create_backend(self) -> "BaseResultBackend":
"""Create backend instance from this configuration.
Returns:
Backend instance
Raises:
BackendError: If URL scheme is unsupported or creation fails
"""
from flowrra.backends.factory import get_backend
return get_backend(
self.url,
ttl=self.ttl,
max_connections=self.max_connections,
socket_timeout=self.socket_timeout,
retry_on_timeout=self.retry_on_timeout,
)
[docs]
@dataclass
class ExecutorConfig:
io_workers: int = 4
cpu_workers: int = os.cpu_count() or 1
max_queue_size: int = 1000
max_retries: int = 3
retry_delay: float = 1.0
[docs]
def __post_init__(self):
"""Validate executor configuration."""
if self.io_workers < 1:
raise ValueError("io_workers must be at least 1")
if self.cpu_workers < 1:
raise ValueError("cpu_workers must be at least 1")
if self.max_queue_size < 1:
raise ValueError("max_queue_size must be at least 1")
if self.max_retries < 0:
raise ValueError("max_retries must be non-negative")
if self.retry_delay < 0:
raise ValueError("retry_delay must be non-negative")
[docs]
@dataclass
class SchedulerConfig:
"""Scheduler configuration for persistent task scheduling.
Args:
database_url: Database URL for scheduler storage
- SQLite: "sqlite:///path/to/schedule.db" (default: None for .flowrra_schedule.db)
- PostgreSQL: "postgresql://user:pass@host/db"
- MySQL: "mysql://user:pass@host/db"
check_interval: How often to check for due tasks (seconds)
enabled: Whether scheduler is enabled
"""
database_url: str | None = None
check_interval: float = 60.0
enabled: bool = False
[docs]
def __post_init__(self):
"""Validate scheduler configuration."""
if self.check_interval <= 0:
raise ValueError("check_interval must be positive")
[docs]
def create_backend(self) -> "BaseSchedulerBackend":
"""Create scheduler backend from configuration.
Returns:
Scheduler backend instance
"""
from flowrra.scheduler.backends import get_scheduler_backend
return get_scheduler_backend(self.database_url)
[docs]
@dataclass
class Config:
"""Main Flowrra configuration aggregating component configs.
This class brings together broker, backend, executor, and scheduler configurations
into a single, structured configuration object. All components are optional
with sensible defaults.
Args:
broker: Broker configuration for task queueing (optional, uses asyncio.PriorityQueue if None)
backend: Backend configuration for result storage (optional)
executor: Executor configuration (optional, defaults to ExecutorConfig())
scheduler: Scheduler configuration (optional, disabled by default)
"""
broker: BrokerConfig | None = None
backend: BackendConfig | None = None
executor: ExecutorConfig = field(default_factory=ExecutorConfig)
scheduler: SchedulerConfig | None = None
[docs]
def __post_init__(self):
"""Ensure executor config exists."""
if self.executor is None:
self.executor = ExecutorConfig()
[docs]
def create_broker(self, queue_suffix: str = "") -> "BaseBroker | None":
"""Create broker instance from configuration.
Args:
queue_suffix: Optional suffix for queue key (e.g., ":io" or ":cpu")
Returns:
Broker instance if configured, None otherwise
"""
from flowrra.brokers.factory import get_broker
if self.broker is not None:
return self.broker.create_broker(queue_suffix=queue_suffix)
else:
return get_broker(None)
[docs]
def create_backend(self) -> "BaseResultBackend":
"""Create backend instance from configuration.
Returns:
Backend instance if configured, InMemory otherwise
"""
from flowrra.backends.factory import get_backend
if self.backend is not None:
return self.backend.create_backend()
else:
return get_backend(None)
[docs]
def create_scheduler_backend(self):
"""Create scheduler backend from configuration.
Returns:
Scheduler backend instance if configured, None otherwise
"""
if self.scheduler is not None and self.scheduler.enabled:
return self.scheduler.create_backend()
return None
[docs]
@classmethod
def from_env(cls, prefix: str = "FLOWRRA_") -> "Config":
"""Load configuration from environment variables using mappings."""
env_cache: dict[str, str | None] = {}
def get_env(key: str, default: Any = None, type_cast: type = str) -> Any:
env_key = f"{prefix}{key.upper()}"
if env_key not in env_cache:
env_cache[env_key] = os.getenv(env_key)
value = env_cache[env_key]
if value is None:
return default
if type_cast == bool:
return value.lower() in ('true', '1', 'yes', 'on')
elif type_cast == int:
return int(value)
elif type_cast == float:
return float(value)
else:
return value
broker_map = {
"url": ("broker_url", str, None),
"max_connections": ("broker_max_connections", int, 50),
"socket_timeout": ("broker_socket_timeout", float, 5.0),
"retry_on_timeout": ("broker_retry_on_timeout", bool, True),
}
backend_map = {
"url": ("backend_url", str, None),
"ttl": ("backend_ttl", int, None),
"max_connections": ("backend_max_connections", int, 50),
"socket_timeout": ("backend_socket_timeout", float, 5.0),
"retry_on_timeout": ("backend_retry_on_timeout", bool, True),
}
executor_map = {
"io_workers": ("executor_io_workers", int, 4),
"cpu_workers": ("executor_cpu_workers", int, None),
"max_queue_size": ("executor_max_queue_size", int, 1000),
"max_retries": ("executor_max_retries", int, 3),
"retry_delay": ("executor_retry_delay", float, 1.0),
}
scheduler_map = {
"database_url": ("scheduler_database_url", str, None),
"check_interval": ("scheduler_check_interval", float, 60.0),
"enabled": ("scheduler_enabled", bool, False),
}
def build_config(map_def):
kwargs = {}
for field, (env_name, type_cast, default) in map_def.items():
val = get_env(env_name, default=default, type_cast=type_cast)
kwargs[field] = val
return kwargs
broker_kwargs = build_config(broker_map)
broker = BrokerConfig(**broker_kwargs) if broker_kwargs["url"] else None
backend_kwargs = build_config(backend_map)
backend = BackendConfig(**backend_kwargs) if backend_kwargs["url"] else None
executor_kwargs = build_config(executor_map)
executor = ExecutorConfig(**executor_kwargs)
scheduler_kwargs = build_config(scheduler_map)
scheduler = SchedulerConfig(**scheduler_kwargs) if scheduler_kwargs["enabled"] else None
return cls(broker=broker, backend=backend, executor=executor, scheduler=scheduler)