dataflow.server_managementΒΆ
Server management framework for Haive.
This module provides a generic, type-safe server management framework using Pydantic models and validation. It serves as the foundation for managing various types of servers (MCP, HAP, Docker, etc.) across the Haive ecosystem.
Key Features: - Generic base classes with full type safety - Pydantic validation for all configurations - Extensible architecture for different server types - Smart defaults and auto-configuration - Comprehensive error handling with clear messages
Example
Basic usage with MCP servers:
from haive.dataflow.server_management import BaseServerManager
from haive.mcp.servers import MCPServerConfig, MCPServerInfo
class MCPServerManager(BaseServerManager[MCPServerConfig, MCPServerInfo]):
config_class = Field(default=MCPServerConfig, exclude=True)
info_class = Field(default=MCPServerInfo, exclude=True)
async def start_server(self, name: str, config: Optional[MCPServerConfig] = None) -> MCPServerInfo:
# Implementation
pass
SubmodulesΒΆ
ClassesΒΆ
Base configuration all servers must have. |
|
Base runtime info all servers have. |
|
Generic base class for all server managers. |
|
Protocol for configuration loading capabilities. |
|
Protocol for health monitoring capabilities. |
|
Protocol for metrics collection capabilities. |
|
Protocol for server discovery capabilities. |
|
Protocol defining the server manager interface. |
|
Universal server status enum. |
Package ContentsΒΆ
- class dataflow.server_management.BaseServerConfig(/, **data: Any)ΒΆ
Bases:
pydantic.BaseModelBase configuration all servers must have.
This model defines the minimum configuration required for any server type. Subclasses should extend this with type-specific fields.
- nameΒΆ
Unique server identifier
- commandΒΆ
Command to execute (executable + args)
- descriptionΒΆ
Human-readable description
- working_directoryΒΆ
Optional working directory for process
- environmentΒΆ
Additional environment variables
- timeout_secondsΒΆ
Startup timeout in seconds
- auto_restartΒΆ
Whether to restart on failure
- health_check_enabledΒΆ
Whether to perform health checks
Example
Basic server configuration:
config = BaseServerConfig( name="my-server", command=["python", "-m", "server"], description="My custom server" )
- classmethod validate_command_not_empty(v: List[str]) List[str]ΒΆ
Validate command has executable and no empty strings.
- classmethod validate_environment_vars(v: Dict[str, str]) Dict[str, str]ΒΆ
Validate environment variable names and values.
- classmethod validate_working_directory(v: str | None) str | NoneΒΆ
Validate working directory if provided.
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class dataflow.server_management.BaseServerInfo(/, **data: Any)ΒΆ
Bases:
pydantic.BaseModelBase runtime info all servers have.
This model represents the runtime state of a server process. It excludes the actual process handle from serialization.
- nameΒΆ
Server name
- pidΒΆ
Process ID
- statusΒΆ
Current server status
- started_atΒΆ
When the server was started
- config_snapshotΒΆ
Configuration used to start server
- error_messageΒΆ
Last error message if any
- restart_countΒΆ
Number of times server has been restarted
- last_health_checkΒΆ
Last successful health check time
- uptime_secondsΒΆ
Computed uptime in seconds
- update_status(new_status: ServerStatus, error: str | None = None) NoneΒΆ
Update server status with optional error message.
- last_health_check: datetime.datetime | None = NoneΒΆ
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- started_at: datetime.datetime = NoneΒΆ
- status: ServerStatus = NoneΒΆ
- class dataflow.server_management.BaseServerManager(/, **data: Any)ΒΆ
Bases:
pydantic.BaseModel,abc.ABC,Generic[ConfigT,InfoT]Generic base class for all server managers.
This class provides a type-safe foundation for managing servers of any type. Subclasses should specify the concrete ConfigT and InfoT types.
- Type Parameters:
ConfigT: Server configuration type (must extend BaseServerConfig) InfoT: Server runtime info type (must extend BaseServerInfo)
- serversΒΆ
Currently running servers mapped by name
- available_configsΒΆ
Available server configurations
- config_classΒΆ
Configuration class for type validation
- info_classΒΆ
Info class for runtime information
- auto_restartΒΆ
Whether to automatically restart failed servers
- max_restart_attemptsΒΆ
Maximum restart attempts before giving up
- health_check_intervalΒΆ
Seconds between health checks
- restart_trackingΒΆ
Tracks restart attempts per server
Example
Creating a concrete server manager:
class MyServerManager(BaseServerManager[MyConfig, MyInfo]): config_class = Field(default=MyConfig, exclude=True) info_class = Field(default=MyInfo, exclude=True) async def start_server(self, name: str, config: Optional[MyConfig] = None) -> MyInfo: # Implementation pass
- add_config(name: str, config: ConfigT | Dict[str, Any]) ConfigTΒΆ
Add or update a server configuration.
- Parameters:
name β Server name
config β Configuration object or dict
- Returns:
Validated configuration object
- Raises:
ValidationError β If configuration is invalid
- get_config(name: str) ConfigT | NoneΒΆ
Get server configuration by name.
- Parameters:
name β Server name
- Returns:
Configuration object or None if not found
- get_server_info(name: str) InfoT | NoneΒΆ
Get runtime information for a server.
- Parameters:
name β Server name
- Returns:
Server info or None if not running
- get_stats() Dict[str, Any]ΒΆ
Get server manager statistics.
- Returns:
total_configured: Number of configured servers
total_running: Number of running servers
servers_by_status: Count by status
restart_counts: Restart attempts per server
- Return type:
Dictionary with stats including
- abstractmethod health_check(name: str) boolΒΆ
- Async:
Check if server is healthy.
- Parameters:
name β Server name
- Returns:
True if healthy, False otherwise
- is_running(name: str) boolΒΆ
Check if server is running.
- Parameters:
name β Server name
- Returns:
True if server is running
- list_servers(status: dataflow.server_management.models.ServerStatus | None = None) List[str]ΒΆ
List servers by status.
- Parameters:
status β Optional status filter
- Returns:
List of server names
- remove_config(name: str) boolΒΆ
Remove a server configuration.
- Parameters:
name β Server name
- Returns:
True if removed, False if not found
- Raises:
RuntimeError β If server is currently running
- abstractmethod restart_server(name: str) InfoTΒΆ
- Async:
Restart a server.
- Parameters:
name β Server name
- Returns:
New server runtime information
- async start_health_monitoring(name: str) NoneΒΆ
Start health monitoring for a server.
- Parameters:
name β Server name
- abstractmethod start_server(name: str, config: ConfigT | None = None) InfoTΒΆ
- Async:
Start a server with given configuration.
- Parameters:
name β Server name
config β Optional configuration override
- Returns:
Server runtime information
- Raises:
ValueError β If no configuration found
RuntimeError β If server already running
- async stop_health_monitoring(name: str) NoneΒΆ
Stop health monitoring for a server.
- Parameters:
name β Server name
- abstractmethod stop_server(name: str, force: bool = False) boolΒΆ
- Async:
Stop a running server.
- Parameters:
name β Server name
force β Force stop if true
- Returns:
True if stopped successfully
- classmethod validate_config_types(v: Dict[str, Any], info: pydantic.ValidationInfo) Dict[str, Any]ΒΆ
Ensure all configs are the correct type.
- classmethod validate_info_types(v: Dict[str, Any], info: pydantic.ValidationInfo) Dict[str, Any]ΒΆ
Ensure all server info objects are the correct type.
- validate_restart_policy() BaseServerManagerΒΆ
Validate restart configuration consistency.
- validate_server_consistency() BaseServerManagerΒΆ
Ensure running servers have configurations.
- config_class: Type[ConfigT] = NoneΒΆ
- health_check_tasks: Dict[str, asyncio.Task] = NoneΒΆ
- info_class: Type[InfoT] = NoneΒΆ
- model_configΒΆ
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class dataflow.server_management.ConfigLoaderProtocolΒΆ
Bases:
ProtocolProtocol for configuration loading capabilities.
- async load_configs_from_file(path: str) Dict[str, dataflow.server_management.models.BaseServerConfig]ΒΆ
Load server configurations from a file.
- class dataflow.server_management.HealthMonitorProtocolΒΆ
Bases:
ProtocolProtocol for health monitoring capabilities.
- class dataflow.server_management.MetricsProtocolΒΆ
Bases:
ProtocolProtocol for metrics collection capabilities.
- class dataflow.server_management.ServerDiscoveryProtocolΒΆ
Bases:
ProtocolProtocol for server discovery capabilities.
- async adopt_server(server_info: dataflow.server_management.models.BaseServerInfo) boolΒΆ
Adopt an externally started server.
- async discover_servers() List[dataflow.server_management.models.BaseServerInfo]ΒΆ
Discover running servers not managed by this instance.
- class dataflow.server_management.ServerManagerProtocolΒΆ
Bases:
ProtocolProtocol defining the server manager interface.
This protocol ensures all server managers implement the required methods for managing server lifecycles, regardless of the specific server type.
- add_config(name: str, config: dataflow.server_management.models.BaseServerConfig | Dict[str, Any]) dataflow.server_management.models.BaseServerConfigΒΆ
Add or update a server configuration.
- get_config(name: str) dataflow.server_management.models.BaseServerConfig | NoneΒΆ
Get server configuration by name.
- get_server_info(name: str) dataflow.server_management.models.BaseServerInfo | NoneΒΆ
Get runtime information for a server.
- list_servers(status: dataflow.server_management.models.ServerStatus | None = None) List[str]ΒΆ
List servers by status.
- abstractmethod restart_server(name: str) dataflow.server_management.models.BaseServerInfoΒΆ
- Async:
Restart a server.
- abstractmethod start_server(name: str, config: dataflow.server_management.models.BaseServerConfig | None = None) dataflow.server_management.models.BaseServerInfoΒΆ
- Async:
Start a server with given configuration.
- available_configs: Dict[str, dataflow.server_management.models.BaseServerConfig]ΒΆ
- servers: Dict[str, dataflow.server_management.models.BaseServerInfo]ΒΆ