dataflow.mcp.health

MCP Health Monitoring for haive-dataflow.

This module provides health monitoring and management capabilities for MCP servers, including connection status tracking, performance metrics, and automatic recovery.

Classes:

MCPHealthMonitor: Main health monitoring service MCPHealthChecker: Individual server health checker

Attributes

Classes

MCPHealthChecker

Health checker for individual MCP servers.

MCPHealthMonitor

Health monitoring service for MCP servers.

Module Contents

class dataflow.mcp.health.MCPHealthChecker(server_name: str, server_config: haive.dataflow.registry.models.MCPServerConfig)

Health checker for individual MCP servers.

This class handles health checking for a single MCP server including connection testing, response time measurement, and recovery attempts.

async attempt_recovery()

Attempt to recover the server connection.

async check_health() haive.dataflow.registry.models.MCPServerHealth

Perform health check on the server.

Returns:

Current health status

consecutive_failures = 0
error_count = 0
last_successful_check: datetime.datetime | None = None
server_config
server_name
class dataflow.mcp.health.MCPHealthMonitor(mcp_client=None, monitoring_interval: int = 30)

Health monitoring service for MCP servers.

This class provides comprehensive health monitoring for MCP servers including: - Periodic health checks - Performance metric tracking - Automatic recovery attempts - Health status reporting

mcp_client

Reference to the MCP client

health_checkers

Dictionary of server health checkers

monitoring_interval

Seconds between health checks

is_monitoring

Whether monitoring is currently active

Examples

monitor = MCPHealthMonitor(mcp_client) await monitor.start_monitoring()

# Get health status status = await monitor.get_health_summary() print(f”Healthy servers: {status[‘healthy_count’]}”)

async check_all_servers() dict[str, haive.dataflow.registry.models.MCPServerHealth]

Perform health check on all servers.

Returns:

Dictionary of server name to health status

async get_health_summary() dict[str, any]

Get summary of health status across all servers.

Returns:

Summary dictionary with health metrics

async recover_failed_servers() list[str]

Attempt to recover failed servers.

Returns:

List of server names that were successfully recovered

async start_monitoring()

Start health monitoring for all connected servers.

async stop_monitoring()

Stop health monitoring.

health_checkers: dict[str, MCPHealthChecker]
is_monitoring = False
mcp_client = None
monitoring_interval = 30
dataflow.mcp.health.logger