r"""MCP server testing and validation tools.
This module provides tools for testing MCP server configurations, validating
connections, and ensuring servers are working correctly before use in production.
The testing system provides:
- Connection testing for individual servers
- Batch testing for multiple servers
- Health monitoring and diagnostics
- Performance benchmarking
- Configuration validation
Classes:
MCPServerTester: Main testing interface
TestResult: Results from server testing
HealthMonitor: Continuous health monitoring
Example:
Testing server configurations:
.. code-block:: python
from haive.mcp.tools import MCPServerTester
from haive.mcp.config import MCPServerConfig
# Test individual server
tester = MCPServerTester()
server_config = MCPServerConfig(
name="test_server",
transport="stdio",
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"]
)
result = await tester.test_server(server_config)
if result.success:
print(f"✅ {server_config.name} is working")
else:
print(f"❌ {server_config.name} failed: {result.error}")
# Test multiple servers
results = await tester.test_multiple_servers([server_config])
# Monitor health
monitor = tester.create_health_monitor()
await monitor.start_monitoring([server_config])
Note:
Server testing requires the actual MCP packages to be installed
and may need appropriate environment variables.
"""
import asyncio
import contextlib
import logging
import os
import time
from dataclasses import dataclass
from typing import Any
from langchain_mcp_adapters.client import MultiServerMCPClient
from haive.mcp.config import MCPConfig, MCPServerConfig
logger = logging.getLogger(__name__)
[docs]
@dataclass
class TestResult:
"""Result from testing an MCP server."""
server_name: str
success: bool
response_time: float # seconds
error: str | None = None
tools_discovered: int = 0
capabilities_found: list[str] = None
warnings: list[str] = None
def __post_init__(self):
""" Post Init .
"""
if self.capabilities_found is None:
self.capabilities_found = []
if self.warnings is None:
self.warnings = []
[docs]
@dataclass
class HealthStatus:
"""Health status for a server."""
server_name: str
healthy: bool
last_check: float
consecutive_failures: int = 0
average_response_time: float = 0.0
uptime_percentage: float = 100.0
[docs]
class HealthMonitor:
"""Continuous health monitoring for MCP servers."""
def __init__(self, check_interval: int = 60):
"""Initialize health monitor.
Args:
check_interval: Seconds between health checks
"""
self.check_interval = check_interval
self.monitoring = False
self.health_status: dict[str, HealthStatus] = {}
self.monitor_task = None
[docs]
async def start_monitoring(self, servers: list[MCPServerConfig]):
"""Start monitoring the specified servers.
Args:
servers: List of server configurations to monitor
"""
self.monitoring = True
# Initialize health status
for server in servers:
self.health_status[server.name] = HealthStatus(
server_name=server.name, healthy=True, last_check=time.time()
)
# Start monitoring task
self.monitor_task = asyncio.create_task(self._monitor_loop(servers))
logger.info(f"Started monitoring {len(servers)} servers")
[docs]
async def stop_monitoring(self):
"""Stop health monitoring."""
self.monitoring = False
if self.monitor_task:
self.monitor_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await self.monitor_task
logger.info("Stopped health monitoring")
async def _monitor_loop(self, servers: list[MCPServerConfig]):
"""Main monitoring loop."""
tester = MCPServerTester()
while self.monitoring:
try:
# Test all servers
results = await tester.test_multiple_servers(servers, timeout=30)
# Update health status
for result in results:
if result.server_name in self.health_status:
status = self.health_status[result.server_name]
status.last_check = time.time()
if result.success:
status.healthy = True
status.consecutive_failures = 0
# Update average response time
if status.average_response_time == 0:
status.average_response_time = result.response_time
else:
status.average_response_time = (
status.average_response_time * 0.8
+ result.response_time * 0.2
)
else:
status.consecutive_failures += 1
if status.consecutive_failures >= 3:
status.healthy = False
# Wait before next check
await asyncio.sleep(self.check_interval)
except Exception as e:
logger.exception(f"Error in monitoring loop: {e}")
await asyncio.sleep(self.check_interval)
[docs]
def get_health_report(self) -> dict[str, HealthStatus]:
"""Get current health status for all monitored servers."""
return self.health_status.copy()
[docs]
def get_unhealthy_servers(self) -> list[str]:
"""Get list of unhealthy server names."""
return [
name for name, status in self.health_status.items() if not status.healthy
]
[docs]
class MCPServerTester:
"""Main testing interface for MCP servers."""
def __init__(self):
"""Initialize server tester."""
self.test_history: list[TestResult] = []
[docs]
async def test_server(
self, server_config: MCPServerConfig, timeout: int = 60
) -> TestResult:
"""Test a single MCP server configuration.
Args:
server_config: Server configuration to test
timeout: Timeout in seconds for the test
Returns:
TestResult with test outcome
"""
start_time = time.time()
try:
# Import MCP client dependencies
try:
from langchain_mcp_adapters import MCPAdapter
except ImportError:
return TestResult(
server_name=server_config.name,
success=False,
response_time=0.0,
error="MCP client dependencies not available (install langchain-mcp-adapters)",
)
# Create test configuration
test_config = {
server_config.name: self._create_client_config(server_config)
}
# Test connection
client = None
try:
client = MultiServerMCPClient(test_config)
# Try to get tools as a connection test
tools = await asyncio.wait_for(
client.get_tools(server_name=server_config.name), timeout=timeout
)
response_time = time.time() - start_time
# Analyze discovered tools
tools_count = len(tools) if tools else 0
capabilities = self._analyze_capabilities(tools, server_config)
warnings = self._check_for_warnings(server_config, tools)
result = TestResult(
server_name=server_config.name,
success=True,
response_time=response_time,
tools_discovered=tools_count,
capabilities_found=capabilities,
warnings=warnings,
)
finally:
# Clean up client
if client and hasattr(client, "close"):
with contextlib.suppress(Exception):
await client.close()
except TimeoutError:
result = TestResult(
server_name=server_config.name,
success=False,
response_time=timeout,
error=f"Server did not respond within {timeout}s",
)
except Exception as e:
result = TestResult(
server_name=server_config.name,
success=False,
response_time=time.time() - start_time,
error=str(e),
)
# Store result in history
self.test_history.append(result)
return result
[docs]
async def test_multiple_servers(
self, servers: list[MCPServerConfig], timeout: int = 60, parallel: bool = True
) -> list[TestResult]:
"""Test multiple servers.
Args:
servers: List of server configurations to test
timeout: Timeout per server in seconds
parallel: Whether to test servers in parallel
Returns:
List of TestResult objects
"""
if parallel:
# Test in parallel for speed
tasks = [self.test_server(server, timeout) for server in servers]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exceptions
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
final_results.append(
TestResult(
server_name=servers[i].name,
success=False,
response_time=0.0,
error=f"Test failed with exception: {result}",
)
)
else:
final_results.append(result)
return final_results
# Test sequentially
results = []
for server in servers:
result = await self.test_server(server, timeout)
results.append(result)
return results
[docs]
async def test_config(self, config: MCPConfig) -> dict[str, TestResult]:
"""Test an entire MCP configuration.
Args:
config: MCP configuration to test
Returns:
Dictionary mapping server names to test results
"""
servers = list(config.servers.values())
results = await self.test_multiple_servers(servers)
return {result.server_name: result for result in results}
def _create_client_config(self, server_config: MCPServerConfig) -> dict[str, Any]:
"""Create client configuration from server config."""
config = {}
if server_config.transport == "stdio":
config["command"] = server_config.command
config["args"] = server_config.args
config["transport"] = "stdio"
elif server_config.transport in ["sse", "streamable_http"]:
config["url"] = server_config.url
config["transport"] = server_config.transport
# Add environment variables
if server_config.env:
config["env"] = server_config.env
return config
def _analyze_capabilities(
self, tools: list[Any], server_config: MCPServerConfig
) -> list[str]:
"""Analyze discovered tools to determine capabilities."""
capabilities = set()
if not tools:
return list(capabilities)
# Analyze tool names for capability hints
for tool in tools:
tool_name = getattr(tool, "name", str(tool)).lower()
# File system capabilities
if any(
keyword in tool_name
for keyword in ["file", "read", "write", "directory"]
):
capabilities.add("filesystem")
# Database capabilities
if any(
keyword in tool_name
for keyword in ["query", "sql", "database", "table"]
):
capabilities.add("database")
# Web capabilities
if any(keyword in tool_name for keyword in ["http", "fetch", "web", "api"]):
capabilities.add("web")
# Git capabilities
if any(
keyword in tool_name for keyword in ["git", "repo", "commit", "branch"]
):
capabilities.add("git")
# Search capabilities
if any(keyword in tool_name for keyword in ["search", "find", "lookup"]):
capabilities.add("search")
return list(capabilities)
def _check_for_warnings(
self, server_config: MCPServerConfig, tools: list[Any]
) -> list[str]:
"""Check for potential issues and generate warnings."""
warnings = []
# Check if no tools were discovered
if not tools:
warnings.append("No tools discovered - server may not be fully configured")
# Check for missing environment variables
if server_config.env:
missing_vars = [var for var in server_config.env if var not in os.environ]
if missing_vars:
warnings.append(
f"Environment variables not set: {', '.join(missing_vars)}"
)
# Check for common authentication requirements
server_name = server_config.name.lower()
if (
any(
service in server_name
for service in ["github", "gmail", "notion", "calendar"]
)
and not server_config.env
):
warnings.append(
"Server likely requires authentication but no env vars configured"
)
return warnings
[docs]
def create_health_monitor(self, check_interval: int = 60) -> HealthMonitor:
"""Create a health monitor instance.
Args:
check_interval: Seconds between health checks
Returns:
HealthMonitor instance
"""
return HealthMonitor(check_interval)
[docs]
def get_test_history(self) -> list[TestResult]:
"""Get history of all test results."""
return self.test_history.copy()
[docs]
def get_success_rate(self, server_name: str | None = None) -> float:
"""Get success rate for a server or overall.
Args:
server_name: Specific server name, or None for overall
Returns:
Success rate as percentage (0.0 to 100.0)
"""
if server_name:
relevant_tests = [
test for test in self.test_history if test.server_name == server_name
]
else:
relevant_tests = self.test_history
if not relevant_tests:
return 0.0
successful = sum(1 for test in relevant_tests if test.success)
return (successful / len(relevant_tests)) * 100.0
[docs]
def generate_test_report(self) -> dict[str, Any]:
"""Generate a comprehensive test report.
Returns:
Dictionary with test statistics and details
"""
if not self.test_history:
return {"error": "No tests performed yet"}
# Group by server
by_server = {}
for test in self.test_history:
if test.server_name not in by_server:
by_server[test.server_name] = []
by_server[test.server_name].append(test)
# Calculate statistics
total_tests = len(self.test_history)
successful_tests = sum(1 for test in self.test_history if test.success)
server_stats = {}
for server_name, tests in by_server.items():
successful = sum(1 for test in tests if test.success)
avg_response_time = sum(test.response_time for test in tests) / len(tests)
server_stats[server_name] = {
"total_tests": len(tests),
"successful": successful,
"success_rate": (successful / len(tests)) * 100.0,
"average_response_time": avg_response_time,
"last_test": tests[-1].success if tests else False,
}
return {
"summary": {
"total_tests": total_tests,
"successful_tests": successful_tests,
"overall_success_rate": (successful_tests / total_tests) * 100.0,
"servers_tested": len(by_server),
},
"server_statistics": server_stats,
"recommendations": self._generate_recommendations(by_server),
}
def _generate_recommendations(
self, by_server: dict[str, list[TestResult]]
) -> list[str]:
"""Generate recommendations based on test results."""
recommendations = []
for server_name, tests in by_server.items():
latest_test = tests[-1]
success_rate = sum(1 for test in tests if test.success) / len(tests)
if success_rate < 0.5:
recommendations.append(
f"Server '{server_name}' has low success rate ({success_rate:.1%}) - check configuration"
)
if latest_test.response_time > 30:
recommendations.append(
f"Server '{server_name}' has slow response time ({latest_test.response_time:.1f}s)"
)
if latest_test.warnings:
recommendations.append(
f"Server '{server_name}' has warnings: {'; '.join(latest_test.warnings)}"
)
return recommendations