Source code for haive.mcp.discovery.analyzer

"""MCP server analyzer for component discovery integration.

This module provides analysis capabilities for discovering and configuring MCP
servers from various sources. It can analyze dictionaries, objects, and files
to extract valid MCP server configurations.

The analyzer supports:
    - Dictionary-based configurations (JSON/YAML)
    - Object attribute extraction
    - Directory scanning for config files
    - Registry file parsing
    - Component info generation for discovery system

Classes:
    MCPServerAnalyzer: Main analyzer for MCP server discovery

Examples:
    Analyzing server configurations:

    .. code-block:: python

        from haive.mcp.discovery import MCPServerAnalyzer

        analyzer = MCPServerAnalyzer()

        # Analyze a dictionary config
        config_dict = {
            "name": "my_server",
            "command": "npx",
            "args": ["-y", "@myorg/mcp-server"],
            "capabilities": ["data_access"]
        }

        server_config = analyzer.analyze(config_dict)
        if server_config:
            print(f"Found server: {server_config.name}")

        # Discover from directory
        configs = analyzer.discover_from_directory(Path("./mcp_configs"))
        print(f"Discovered {len(configs)} servers")

Note:
    The analyzer is designed to be flexible and handle various configuration
    formats commonly used for MCP servers.
"""

import json
import logging
from pathlib import Path
from typing import Any

import yaml

from haive.mcp.config import MCPServerConfig, MCPTransport

logger = logging.getLogger(__name__)


[docs] class MCPServerAnalyzer: """Analyzer for discovering and analyzing MCP servers. MCPServerAnalyzer provides comprehensive analysis capabilities for MCP server configurations. It can identify MCP servers from various sources and convert them to standardized MCPServerConfig instances. The analyzer integrates with haive-core's component discovery system to automatically find and register MCP servers. It supports multiple configuration formats and can extract server information from objects, dictionaries, and files. Attributes: discovered_servers: Dictionary of servers found during analysis Examples: Basic server analysis: .. code-block:: python analyzer = MCPServerAnalyzer() # Check if object can be analyzed if analyzer.can_analyze(my_object): config = analyzer.analyze(my_object) if config: print(f"Server: {config.name}") print(f"Transport: {config.transport}") print(f"Capabilities: {config.capabilities}") """ def __init__(self): """Initialize the MCP server analyzer. Sets up the analyzer with an empty dictionary to track discovered servers during analysis operations. """ self.discovered_servers: dict[str, MCPServerConfig] = {}
[docs] def can_analyze(self, obj: Any) -> bool: """Check if an object is an MCP server or configuration. Determines whether an object appears to be an MCP server or configuration by checking for characteristic attributes and patterns. Detection criteria: - Dictionary with MCP config keys (command, url, transport) - Class name containing "MCPServer" or "MCP" - Object with MCP-related attributes Args: obj: Object to check for MCP characteristics Returns: bool: True if the object appears to be MCP-related Examples: Checking various objects: .. code-block:: python # Dictionary config config = {"command": "npx", "args": [...]} assert analyzer.can_analyze(config) == True # Non-MCP object assert analyzer.can_analyze({"foo": "bar"}) == False # MCP server instance server = MCPServer(...) assert analyzer.can_analyze(server) == True """ # Check for MCP server characteristics if isinstance(obj, dict): # MCP server config pattern return any(key in obj for key in ["command", "url", "transport"]) # Check for MCP server class if hasattr(obj, "__class__"): class_name = obj.__class__.__name__ return "MCPServer" in class_name or "MCP" in class_name return False
[docs] def analyze(self, obj: Any, source: str | None = None) -> MCPServerConfig | None: """Analyze an object and extract MCP server configuration. Attempts to extract a valid MCPServerConfig from various object types. Supports dictionaries, MCPServerConfig instances, objects with conversion methods, and arbitrary objects with MCP attributes. Args: obj: Object to analyze. Can be: - Dictionary with MCP configuration - MCPServerConfig instance - Object with to_mcp_config() method - Any object with MCP-related attributes source: Optional source identifier (file path, registry key, etc.) Used for naming and debugging. Returns: Optional[MCPServerConfig]: Extracted configuration if successful, None if the object cannot be converted to a valid config. Examples: Analyzing different object types: .. code-block:: python # From dictionary config = analyzer.analyze({ "name": "test", "command": "test-server", "capabilities": ["test"] }) # From custom object class MyServer: def to_mcp_config(self): return MCPServerConfig(name="custom", ...) server = MyServer() config = analyzer.analyze(server) """ try: if isinstance(obj, dict): return self._analyze_dict_config(obj, source) if isinstance(obj, MCPServerConfig): return obj if hasattr(obj, "to_mcp_config"): # Custom MCP server with conversion method return obj.to_mcp_config() # Try to extract config from object attributes return self._analyze_object(obj, source) except Exception as e: logger.exception(f"Failed to analyze MCP server: {e}") return None
def _analyze_dict_config( self, config: dict[str, Any], source: str | None = None ) -> MCPServerConfig | None: """Analyze a dictionary configuration. Extracts MCP server configuration from a dictionary, determining the transport type and building a complete MCPServerConfig. Transport detection: - "command" present -> stdio transport - "url" present -> SSE or specified transport - Neither -> Invalid configuration Args: config: Dictionary containing server configuration source: Optional source identifier for naming Returns: Optional[MCPServerConfig]: Valid configuration or None Note: Missing names are generated from source or defaulted. All optional fields use sensible defaults. """ try: # Determine transport type if "command" in config: transport = MCPTransport.STDIO elif "url" in config: transport = config.get("transport", MCPTransport.SSE) else: return None # Extract name name = config.get("name") if not name and source: # Generate name from source name = Path(source).stem.replace("-", "_").replace(" ", "_") if not name: name = "unnamed_mcp_server" # Build server config server_config = MCPServerConfig( name=name, transport=transport, command=config.get("command"), args=config.get("args", []), url=config.get("url"), env=config.get("env", {}), api_key=config.get("api_key"), category=config.get("category"), description=config.get("description"), capabilities=config.get("capabilities", []), timeout=config.get("timeout", 30), retry_attempts=config.get("retry_attempts", 3), auto_start=config.get("auto_start", True), health_check_interval=config.get("health_check_interval"), ) return server_config except Exception as e: logger.exception(f"Failed to analyze dict config: {e}") return None def _analyze_object( self, obj: Any, source: str | None = None ) -> MCPServerConfig | None: """Analyze an arbitrary object for MCP server configuration. Attempts to extract MCP configuration by examining object attributes. Uses a mapping of common attribute names to find relevant data. Attribute mappings: - command: "command", "cmd", "executable" - args: "args", "arguments", "params" - url: "url", "endpoint", "server_url" - name: "name", "id", "identifier" - transport: "transport", "protocol" - capabilities: "capabilities", "features", "supported_operations" Args: obj: Object to analyze for MCP attributes source: Optional source identifier Returns: Optional[MCPServerConfig]: Extracted configuration or None """ try: # Try to extract configuration attributes config = {} # Common attribute names attr_mapping = { "command": ["command", "cmd", "executable"], "args": ["args", "arguments", "params"], "url": ["url", "endpoint", "server_url"], "name": ["name", "id", "identifier"], "transport": ["transport", "protocol"], "capabilities": ["capabilities", "features", "supported_operations"], } for target, possible_attrs in attr_mapping.items(): for attr in possible_attrs: if hasattr(obj, attr): value = getattr(obj, attr) if value is not None: config[target] = value break if config: return self._analyze_dict_config(config, source) return None except Exception as e: logger.exception(f"Failed to analyze object: {e}") return None
[docs] def discover_from_directory(self, directory: Path) -> list[MCPServerConfig]: """Discover MCP server configurations from a directory. Recursively searches a directory for MCP server configuration files. Supports JSON and YAML formats (if PyYAML is available). File patterns: - **/*.json: JSON configuration files - **/*.yaml: YAML configuration files (requires PyYAML) Configuration formats: - Single server: {"command": "...", "capabilities": [...]} - Multiple servers: [{...}, {...}, ...] Args: directory: Directory path to search recursively Returns: List[MCPServerConfig]: All valid configurations found Examples: Discovering from a config directory: .. code-block:: python configs_dir = Path("~/.mcp/configs").expanduser() servers = analyzer.discover_from_directory(configs_dir) for server in servers: print(f"Found: {server.name} ({server.transport})") Note: Invalid files are skipped with debug logging. YAML support is optional and fails gracefully. """ discovered = [] # Look for JSON config files for config_file in directory.glob("**/*.json"): try: with open(config_file) as f: data = json.load(f) # Check if it's an MCP config if isinstance(data, dict) and self.can_analyze(data): config = self.analyze(data, str(config_file)) if config: discovered.append(config) elif isinstance(data, list): # List of configs for item in data: if self.can_analyze(item): config = self.analyze(item, str(config_file)) if config: discovered.append(config) except Exception as e: logger.debug(f"Failed to read {config_file}: {e}") # Look for YAML config files try: for config_file in directory.glob("**/*.yaml"): try: with open(config_file) as f: data = yaml.safe_load(f) if isinstance(data, dict) and self.can_analyze(data): config = self.analyze(data, str(config_file)) if config: discovered.append(config) except Exception as e: logger.debug(f"Failed to read {config_file}: {e}") except ImportError: # YAML not available pass return discovered
[docs] def discover_from_registry( self, registry_path: Path | None = None ) -> list[MCPServerConfig]: """Discover MCP servers from a registry file. Loads MCP server configurations from a JSON registry file. If no path is provided, checks standard registry locations. Registry format: { "servers": { "server_name": { "command": "...", "capabilities": [...] } } } Default locations checked: 1. ~/.mcp/registry.json 2. ~/.config/mcp/servers.json 3. /etc/mcp/servers.json Args: registry_path: Optional path to registry file. If not provided, searches default locations. Returns: List[MCPServerConfig]: Configurations from the registry Examples: Using a custom registry: .. code-block:: python # Load from specific registry servers = analyzer.discover_from_registry( Path("/opt/mcp/registry.json") ) # Use default locations servers = analyzer.discover_from_registry() """ if not registry_path: # Check default locations possible_paths = [ Path.home() / ".mcp" / "registry.json", Path.home() / ".config" / "mcp" / "servers.json", Path("/etc/mcp/servers.json"), ] for path in possible_paths: if path.exists(): registry_path = path break if not registry_path or not registry_path.exists(): return [] discovered = [] try: with open(registry_path) as f: registry = json.load(f) if isinstance(registry, dict): # Registry format: {"servers": {...}} servers = registry.get("servers", registry) for server_name, server_config in servers.items(): if isinstance(server_config, dict): server_config["name"] = server_name config = self.analyze(server_config, str(registry_path)) if config: discovered.append(config) except Exception as e: logger.exception(f"Failed to read registry: {e}") return discovered
[docs] def create_component_info(self, server_config: MCPServerConfig) -> dict[str, Any]: """Create component info for registration with component discovery.""" return { "name": server_config.name, "component_type": "mcp", "capabilities": server_config.capabilities, "capability_categories": ["integration"], "tags": ( ["mcp", server_config.category] if server_config.category else ["mcp"] ), "description": server_config.description or f"MCP Server: {server_config.name}", "config": server_config.dict(), "transport": server_config.transport, "enabled": server_config.enabled, }