dataflow.server_management.base¶

Generic base server manager with full type safety.

This module provides the abstract base class for all server managers in Haive. It uses Python generics to ensure type safety while providing common functionality.

Attributes¶

Classes¶

BaseServerManager

Generic base class for all server managers.

ServerLifecycle

Protocol for server lifecycle operations.

Module Contents¶

class dataflow.server_management.base.BaseServerManager(/, **data: Any)¶

Bases: pydantic.BaseModel, abc.ABC, Generic[ConfigT, InfoT]

Generic base class for all server managers.

This class provides a type-safe foundation for managing servers of any type. Subclasses should specify the concrete ConfigT and InfoT types.

Type Parameters:

ConfigT: Server configuration type (must extend BaseServerConfig) InfoT: Server runtime info type (must extend BaseServerInfo)

servers¶

Currently running servers mapped by name

available_configs¶

Available server configurations

config_class¶

Configuration class for type validation

info_class¶

Info class for runtime information

auto_restart¶

Whether to automatically restart failed servers

max_restart_attempts¶

Maximum restart attempts before giving up

health_check_interval¶

Seconds between health checks

restart_tracking¶

Tracks restart attempts per server

Example

Creating a concrete server manager:

class MyServerManager(BaseServerManager[MyConfig, MyInfo]):
    config_class = Field(default=MyConfig, exclude=True)
    info_class = Field(default=MyInfo, exclude=True)

    async def start_server(self, name: str, config: Optional[MyConfig] = None) -> MyInfo:
        # Implementation
        pass
add_config(name: str, config: ConfigT | Dict[str, Any]) ConfigT¶

Add or update a server configuration.

Parameters:
  • name – Server name

  • config – Configuration object or dict

Returns:

Validated configuration object

Raises:

ValidationError – If configuration is invalid

async cleanup() None¶

Clean up resources and stop all servers.

get_config(name: str) ConfigT | None¶

Get server configuration by name.

Parameters:

name – Server name

Returns:

Configuration object or None if not found

get_server_info(name: str) InfoT | None¶

Get runtime information for a server.

Parameters:

name – Server name

Returns:

Server info or None if not running

get_stats() Dict[str, Any]¶

Get server manager statistics.

Returns:

  • total_configured: Number of configured servers

  • total_running: Number of running servers

  • servers_by_status: Count by status

  • restart_counts: Restart attempts per server

Return type:

Dictionary with stats including

abstractmethod health_check(name: str) bool¶
Async:

Check if server is healthy.

Parameters:

name – Server name

Returns:

True if healthy, False otherwise

is_running(name: str) bool¶

Check if server is running.

Parameters:

name – Server name

Returns:

True if server is running

list_servers(status: dataflow.server_management.models.ServerStatus | None = None) List[str]¶

List servers by status.

Parameters:

status – Optional status filter

Returns:

List of server names

model_post_init(__context: Any) None¶

Additional initialization after validation.

remove_config(name: str) bool¶

Remove a server configuration.

Parameters:

name – Server name

Returns:

True if removed, False if not found

Raises:

RuntimeError – If server is currently running

abstractmethod restart_server(name: str) InfoT¶
Async:

Restart a server.

Parameters:

name – Server name

Returns:

New server runtime information

async start_health_monitoring(name: str) None¶

Start health monitoring for a server.

Parameters:

name – Server name

abstractmethod start_server(name: str, config: ConfigT | None = None) InfoT¶
Async:

Start a server with given configuration.

Parameters:
  • name – Server name

  • config – Optional configuration override

Returns:

Server runtime information

Raises:
async stop_health_monitoring(name: str) None¶

Stop health monitoring for a server.

Parameters:

name – Server name

abstractmethod stop_server(name: str, force: bool = False) bool¶
Async:

Stop a running server.

Parameters:
  • name – Server name

  • force – Force stop if true

Returns:

True if stopped successfully

classmethod validate_config_types(v: Dict[str, Any], info: pydantic.ValidationInfo) Dict[str, Any]¶

Ensure all configs are the correct type.

classmethod validate_info_types(v: Dict[str, Any], info: pydantic.ValidationInfo) Dict[str, Any]¶

Ensure all server info objects are the correct type.

validate_restart_policy() BaseServerManager¶

Validate restart configuration consistency.

validate_server_consistency() BaseServerManager¶

Ensure running servers have configurations.

auto_restart: bool = None¶
available_configs: Dict[str, ConfigT] = None¶
config_class: Type[ConfigT] = None¶
health_check_interval: int = None¶
health_check_tasks: Dict[str, asyncio.Task] = None¶
info_class: Type[InfoT] = None¶
max_restart_attempts: int = None¶
model_config¶

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

restart_tracking: Dict[str, int] = None¶
servers: Dict[str, InfoT] = None¶
class dataflow.server_management.base.ServerLifecycle¶

Bases: Protocol

Protocol for server lifecycle operations.

async health_check(name: str) bool¶

Check if server is healthy.

async restart_server(name: str) Any¶

Restart a server.

async start_server(name: str, config: Any | None = None) Any¶

Start a server with given configuration.

async stop_server(name: str, force: bool = False) bool¶

Stop a running server.

dataflow.server_management.base.ConfigT¶
dataflow.server_management.base.InfoT¶
dataflow.server_management.base.logger¶