dataflow.server_managementΒΆ

Server management framework for Haive.

This module provides a generic, type-safe server management framework using Pydantic models and validation. It serves as the foundation for managing various types of servers (MCP, HAP, Docker, etc.) across the Haive ecosystem.

Key Features: - Generic base classes with full type safety - Pydantic validation for all configurations - Extensible architecture for different server types - Smart defaults and auto-configuration - Comprehensive error handling with clear messages

Example

Basic usage with MCP servers:

from haive.dataflow.server_management import BaseServerManager
from haive.mcp.servers import MCPServerConfig, MCPServerInfo

class MCPServerManager(BaseServerManager[MCPServerConfig, MCPServerInfo]):
    config_class = Field(default=MCPServerConfig, exclude=True)
    info_class = Field(default=MCPServerInfo, exclude=True)

    async def start_server(self, name: str, config: Optional[MCPServerConfig] = None) -> MCPServerInfo:
        # Implementation
        pass

SubmodulesΒΆ

ClassesΒΆ

BaseServerConfig

Base configuration all servers must have.

BaseServerInfo

Base runtime info all servers have.

BaseServerManager

Generic base class for all server managers.

ConfigLoaderProtocol

Protocol for configuration loading capabilities.

HealthMonitorProtocol

Protocol for health monitoring capabilities.

MetricsProtocol

Protocol for metrics collection capabilities.

ServerDiscoveryProtocol

Protocol for server discovery capabilities.

ServerManagerProtocol

Protocol defining the server manager interface.

ServerStatus

Universal server status enum.

Package ContentsΒΆ

class dataflow.server_management.BaseServerConfig(/, **data: Any)ΒΆ

Bases: pydantic.BaseModel

Base configuration all servers must have.

This model defines the minimum configuration required for any server type. Subclasses should extend this with type-specific fields.

nameΒΆ

Unique server identifier

commandΒΆ

Command to execute (executable + args)

descriptionΒΆ

Human-readable description

working_directoryΒΆ

Optional working directory for process

environmentΒΆ

Additional environment variables

timeout_secondsΒΆ

Startup timeout in seconds

auto_restartΒΆ

Whether to restart on failure

health_check_enabledΒΆ

Whether to perform health checks

Example

Basic server configuration:

config = BaseServerConfig(
    name="my-server",
    command=["python", "-m", "server"],
    description="My custom server"
)
classmethod validate_command_not_empty(v: List[str]) List[str]ΒΆ

Validate command has executable and no empty strings.

classmethod validate_environment_vars(v: Dict[str, str]) Dict[str, str]ΒΆ

Validate environment variable names and values.

classmethod validate_name_format(v: str) strΒΆ

Validate name follows naming convention.

classmethod validate_working_directory(v: str | None) str | NoneΒΆ

Validate working directory if provided.

auto_restart: bool = NoneΒΆ
command: List[str] = NoneΒΆ
description: str = NoneΒΆ
environment: Dict[str, str] = NoneΒΆ
health_check_enabled: bool = NoneΒΆ
model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str = NoneΒΆ
timeout_seconds: int = NoneΒΆ
working_directory: str | None = NoneΒΆ
class dataflow.server_management.BaseServerInfo(/, **data: Any)ΒΆ

Bases: pydantic.BaseModel

Base runtime info all servers have.

This model represents the runtime state of a server process. It excludes the actual process handle from serialization.

nameΒΆ

Server name

pidΒΆ

Process ID

statusΒΆ

Current server status

started_atΒΆ

When the server was started

config_snapshotΒΆ

Configuration used to start server

error_messageΒΆ

Last error message if any

restart_countΒΆ

Number of times server has been restarted

last_health_checkΒΆ

Last successful health check time

uptime_secondsΒΆ

Computed uptime in seconds

record_health_check(success: bool) NoneΒΆ

Record health check result.

update_status(new_status: ServerStatus, error: str | None = None) NoneΒΆ

Update server status with optional error message.

config_snapshot: Dict[str, Any] = NoneΒΆ
error_message: str | None = NoneΒΆ
property is_running: boolΒΆ

Check if server process is currently running.

last_health_check: datetime.datetime | None = NoneΒΆ
model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

name: str = NoneΒΆ
pid: int = NoneΒΆ
process_handle: Any | None = NoneΒΆ
restart_count: int = NoneΒΆ
started_at: datetime.datetime = NoneΒΆ
status: ServerStatus = NoneΒΆ
property uptime_seconds: floatΒΆ

Calculate server uptime in seconds.

class dataflow.server_management.BaseServerManager(/, **data: Any)ΒΆ

Bases: pydantic.BaseModel, abc.ABC, Generic[ConfigT, InfoT]

Generic base class for all server managers.

This class provides a type-safe foundation for managing servers of any type. Subclasses should specify the concrete ConfigT and InfoT types.

Type Parameters:

ConfigT: Server configuration type (must extend BaseServerConfig) InfoT: Server runtime info type (must extend BaseServerInfo)

serversΒΆ

Currently running servers mapped by name

available_configsΒΆ

Available server configurations

config_classΒΆ

Configuration class for type validation

info_classΒΆ

Info class for runtime information

auto_restartΒΆ

Whether to automatically restart failed servers

max_restart_attemptsΒΆ

Maximum restart attempts before giving up

health_check_intervalΒΆ

Seconds between health checks

restart_trackingΒΆ

Tracks restart attempts per server

Example

Creating a concrete server manager:

class MyServerManager(BaseServerManager[MyConfig, MyInfo]):
    config_class = Field(default=MyConfig, exclude=True)
    info_class = Field(default=MyInfo, exclude=True)

    async def start_server(self, name: str, config: Optional[MyConfig] = None) -> MyInfo:
        # Implementation
        pass
add_config(name: str, config: ConfigT | Dict[str, Any]) ConfigTΒΆ

Add or update a server configuration.

Parameters:
  • name – Server name

  • config – Configuration object or dict

Returns:

Validated configuration object

Raises:

ValidationError – If configuration is invalid

async cleanup() NoneΒΆ

Clean up resources and stop all servers.

get_config(name: str) ConfigT | NoneΒΆ

Get server configuration by name.

Parameters:

name – Server name

Returns:

Configuration object or None if not found

get_server_info(name: str) InfoT | NoneΒΆ

Get runtime information for a server.

Parameters:

name – Server name

Returns:

Server info or None if not running

get_stats() Dict[str, Any]ΒΆ

Get server manager statistics.

Returns:

  • total_configured: Number of configured servers

  • total_running: Number of running servers

  • servers_by_status: Count by status

  • restart_counts: Restart attempts per server

Return type:

Dictionary with stats including

abstractmethod health_check(name: str) boolΒΆ
Async:

Check if server is healthy.

Parameters:

name – Server name

Returns:

True if healthy, False otherwise

is_running(name: str) boolΒΆ

Check if server is running.

Parameters:

name – Server name

Returns:

True if server is running

list_servers(status: dataflow.server_management.models.ServerStatus | None = None) List[str]ΒΆ

List servers by status.

Parameters:

status – Optional status filter

Returns:

List of server names

model_post_init(__context: Any) NoneΒΆ

Additional initialization after validation.

remove_config(name: str) boolΒΆ

Remove a server configuration.

Parameters:

name – Server name

Returns:

True if removed, False if not found

Raises:

RuntimeError – If server is currently running

abstractmethod restart_server(name: str) InfoTΒΆ
Async:

Restart a server.

Parameters:

name – Server name

Returns:

New server runtime information

async start_health_monitoring(name: str) NoneΒΆ

Start health monitoring for a server.

Parameters:

name – Server name

abstractmethod start_server(name: str, config: ConfigT | None = None) InfoTΒΆ
Async:

Start a server with given configuration.

Parameters:
  • name – Server name

  • config – Optional configuration override

Returns:

Server runtime information

Raises:
async stop_health_monitoring(name: str) NoneΒΆ

Stop health monitoring for a server.

Parameters:

name – Server name

abstractmethod stop_server(name: str, force: bool = False) boolΒΆ
Async:

Stop a running server.

Parameters:
  • name – Server name

  • force – Force stop if true

Returns:

True if stopped successfully

classmethod validate_config_types(v: Dict[str, Any], info: pydantic.ValidationInfo) Dict[str, Any]ΒΆ

Ensure all configs are the correct type.

classmethod validate_info_types(v: Dict[str, Any], info: pydantic.ValidationInfo) Dict[str, Any]ΒΆ

Ensure all server info objects are the correct type.

validate_restart_policy() BaseServerManagerΒΆ

Validate restart configuration consistency.

validate_server_consistency() BaseServerManagerΒΆ

Ensure running servers have configurations.

auto_restart: bool = NoneΒΆ
available_configs: Dict[str, ConfigT] = NoneΒΆ
config_class: Type[ConfigT] = NoneΒΆ
health_check_interval: int = NoneΒΆ
health_check_tasks: Dict[str, asyncio.Task] = NoneΒΆ
info_class: Type[InfoT] = NoneΒΆ
max_restart_attempts: int = NoneΒΆ
model_configΒΆ

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

restart_tracking: Dict[str, int] = NoneΒΆ
servers: Dict[str, InfoT] = NoneΒΆ
class dataflow.server_management.ConfigLoaderProtocolΒΆ

Bases: Protocol

Protocol for configuration loading capabilities.

async load_configs_from_file(path: str) Dict[str, dataflow.server_management.models.BaseServerConfig]ΒΆ

Load server configurations from a file.

async save_configs_to_file(path: str) NoneΒΆ

Save current configurations to a file.

class dataflow.server_management.HealthMonitorProtocolΒΆ

Bases: Protocol

Protocol for health monitoring capabilities.

async start_health_monitoring(name: str) NoneΒΆ

Start health monitoring for a server.

async stop_health_monitoring(name: str) NoneΒΆ

Stop health monitoring for a server.

class dataflow.server_management.MetricsProtocolΒΆ

Bases: Protocol

Protocol for metrics collection capabilities.

async export_metrics(exporter: str) NoneΒΆ

Export metrics to external system.

get_aggregate_metrics() Dict[str, float]ΒΆ

Get aggregated metrics for all servers.

get_server_metrics(name: str) Dict[str, float]ΒΆ

Get metrics for a specific server.

class dataflow.server_management.ServerDiscoveryProtocolΒΆ

Bases: Protocol

Protocol for server discovery capabilities.

async adopt_server(server_info: dataflow.server_management.models.BaseServerInfo) boolΒΆ

Adopt an externally started server.

async discover_servers() List[dataflow.server_management.models.BaseServerInfo]ΒΆ

Discover running servers not managed by this instance.

class dataflow.server_management.ServerManagerProtocolΒΆ

Bases: Protocol

Protocol defining the server manager interface.

This protocol ensures all server managers implement the required methods for managing server lifecycles, regardless of the specific server type.

add_config(name: str, config: dataflow.server_management.models.BaseServerConfig | Dict[str, Any]) dataflow.server_management.models.BaseServerConfigΒΆ

Add or update a server configuration.

async cleanup() NoneΒΆ

Clean up resources and stop all servers.

get_config(name: str) dataflow.server_management.models.BaseServerConfig | NoneΒΆ

Get server configuration by name.

get_server_info(name: str) dataflow.server_management.models.BaseServerInfo | NoneΒΆ

Get runtime information for a server.

get_stats() Dict[str, Any]ΒΆ

Get server manager statistics.

abstractmethod health_check(name: str) boolΒΆ
Async:

Check if server is healthy.

is_running(name: str) boolΒΆ

Check if server is running.

list_servers(status: dataflow.server_management.models.ServerStatus | None = None) List[str]ΒΆ

List servers by status.

remove_config(name: str) boolΒΆ

Remove a server configuration.

abstractmethod restart_server(name: str) dataflow.server_management.models.BaseServerInfoΒΆ
Async:

Restart a server.

abstractmethod start_server(name: str, config: dataflow.server_management.models.BaseServerConfig | None = None) dataflow.server_management.models.BaseServerInfoΒΆ
Async:

Start a server with given configuration.

abstractmethod stop_server(name: str, force: bool = False) boolΒΆ
Async:

Stop a running server.

available_configs: Dict[str, dataflow.server_management.models.BaseServerConfig]ΒΆ
servers: Dict[str, dataflow.server_management.models.BaseServerInfo]ΒΆ
class dataflow.server_management.ServerStatusΒΆ

Bases: str, enum.Enum

Universal server status enum.

ERROR = 'error'ΒΆ
HEALTH_CHECK_FAILED = 'health_check_failed'ΒΆ
RESTARTING = 'restarting'ΒΆ
RUNNING = 'running'ΒΆ
STARTING = 'starting'ΒΆ
STOPPED = 'stopped'ΒΆ
STOPPING = 'stopping'ΒΆ