Source code for haive.mcp.tools.server_selector

r"""Intelligent MCP server selection and filtering tools for AI agents.

This module provides sophisticated tools for filtering, selecting, and recommending
MCP servers based on various criteria including prefixes, capabilities, task analysis,
and performance metrics. Designed to help AI agents make intelligent decisions about
which servers to use for specific tasks.

The server selector provides:
    - Namespace/prefix-based filtering for organized server groups
    - Capability-based server recommendations
    - Task analysis for automatic server selection
    - Performance-aware server ranking
    - Interactive selection interfaces
    - Smart server combinations and workflows

Classes:
    MCPServerSelector: Main class for intelligent server selection
    ServerFilter: Flexible filtering system for servers
    TaskAnalyzer: Analyzes tasks to recommend appropriate servers
    ServerRecommender: Provides smart server recommendations

Example:
    Basic server selection:

    .. code-block:: python

        from haive.mcp.tools import MCPServerSelector
        from haive.mcp.documentation import MCPDocumentationLoader

        # Create selector with all available servers
        loader = MCPDocumentationLoader()
        all_servers = loader.load_all_mcp_documents()

        selector = MCPServerSelector(all_servers)

        # Filter by prefix (organization/namespace)
        anthropic_servers = selector.filter_by_prefix("anthropic/")
        openai_servers = selector.filter_by_prefix("openai/")

        # Get recommendations for a task
        task = "I need to analyze a GitHub repository for security issues"
        recommendations = selector.recommend_for_task(task)

        # Interactive selection
        chosen_servers = await selector.interactive_select(
            "Choose servers for code analysis",
            categories=["development", "security"]
        )

Note:
    This system is designed to make server selection intelligent and context-aware,
    reducing the need for manual server configuration.
"""

import logging
import re
from dataclasses import dataclass
from typing import Any

from haive.mcp.config import MCPConfig, MCPServerConfig, MCPTransport
from haive.mcp.documentation.doc_loader import MCPDocumentationLoader

logger = logging.getLogger(__name__)


[docs] @dataclass class ServerScore: """Score and metadata for a server recommendation.""" server_name: str score: float reasons: list[str] capabilities_match: list[str] category_match: bool prefix_match: bool
[docs] @dataclass class TaskRequirements: """Analyzed requirements from a task description.""" keywords: list[str] required_capabilities: list[str] preferred_categories: list[str] suggested_servers: list[str] complexity_score: float
[docs] class ServerFilter: """Flexible filtering system for MCP servers.""" def __init__(self, servers: list[dict[str, Any]]): """Initialize filter with server list. Args: servers: List of server documentation dictionaries """ self.servers = servers self._build_indices() def _build_indices(self): """Build search indices for efficient filtering.""" self.by_prefix = {} self.by_category = {} self.by_capability = {} for server in self.servers: metadata = server.get("metadata", {}) name = metadata.get("name", "") category = metadata.get("category", "") # Build prefix index if "/" in name: prefix = name.split("/")[0] + "/" if prefix not in self.by_prefix: self.by_prefix[prefix] = [] self.by_prefix[prefix].append(server) # Build category index if category: if category not in self.by_category: self.by_category[category] = [] self.by_category[category].append(server) # Build capability index (from description) description = metadata.get("description", "").lower() for word in description.split(): if len(word) > 3: # Skip short words if word not in self.by_capability: self.by_capability[word] = [] self.by_capability[word].append(server)
[docs] def filter_by_prefix(self, prefix: str) -> list[dict[str, Any]]: """Filter servers by name prefix (namespace). Args: prefix: Prefix to filter by (e.g., "anthropic/", "openai/") Returns: List of servers matching the prefix """ if not prefix.endswith("/"): prefix += "/" return self.by_prefix.get(prefix, [])
[docs] def filter_by_category(self, category: str) -> list[dict[str, Any]]: """Filter servers by category. Args: category: Category to filter by Returns: List of servers in the category """ return self.by_category.get(category, [])
[docs] def filter_by_capability_keyword(self, keyword: str) -> list[dict[str, Any]]: """Filter servers by capability keyword in description. Args: keyword: Keyword to search for Returns: List of servers mentioning the keyword """ keyword = keyword.lower() matching = [] for server in self.servers: metadata = server.get("metadata", {}) description = metadata.get("description", "").lower() readme = server.get("readme_content", "").lower() if keyword in description or keyword in readme: matching.append(server) return matching
[docs] def filter_by_multiple_criteria( self, prefixes: list[str] | None = None, categories: list[str] | None = None, keywords: list[str] | None = None, exclude_prefixes: list[str] | None = None, ) -> list[dict[str, Any]]: """Filter servers by multiple criteria. Args: prefixes: List of prefixes to include categories: List of categories to include keywords: List of capability keywords exclude_prefixes: List of prefixes to exclude Returns: List of servers matching all criteria """ # Start with all servers candidates = set(range(len(self.servers))) # Apply prefix filters if prefixes: prefix_matches = set() for prefix in prefixes: for server in self.filter_by_prefix(prefix): prefix_matches.add(self.servers.index(server)) candidates &= prefix_matches # Apply category filters if categories: category_matches = set() for category in categories: for server in self.filter_by_category(category): category_matches.add(self.servers.index(server)) candidates &= category_matches # Apply keyword filters if keywords: keyword_matches = set() for keyword in keywords: for server in self.filter_by_capability_keyword(keyword): keyword_matches.add(self.servers.index(server)) candidates &= keyword_matches # Apply exclusions if exclude_prefixes: for prefix in exclude_prefixes: excluded = set() for server in self.filter_by_prefix(prefix): excluded.add(self.servers.index(server)) candidates -= excluded return [self.servers[i] for i in candidates]
[docs] class TaskAnalyzer: """Analyzes task descriptions to determine server requirements.""" def __init__(self): """Initialize task analyzer.""" self.capability_patterns = { "filesystem": ["file", "directory", "read", "write", "path", "folder"], "database": [ "database", "sql", "query", "table", "postgres", "mysql", "sqlite", ], "git": [ "git", "github", "repository", "repo", "commit", "branch", "pull request", ], "web": ["http", "web", "api", "fetch", "download", "scrape", "url"], "search": ["search", "find", "lookup", "query", "index"], "calendar": ["calendar", "schedule", "appointment", "meeting", "event"], "email": ["email", "mail", "send", "inbox", "message"], "image": ["image", "picture", "photo", "generate", "edit", "vision"], "security": ["security", "vulnerability", "scan", "audit", "penetration"], "development": ["code", "programming", "debug", "test", "ci/cd", "deploy"], "documentation": ["docs", "documentation", "readme", "wiki", "help"], "time": ["time", "date", "timestamp", "timezone", "duration"], "ai": ["ai", "ml", "model", "predict", "train", "inference"], } self.server_patterns = { "github": ["github", "git", "repository", "repo", "issue", "pr"], "filesystem": ["file", "directory", "path", "folder", "local"], "postgres": ["postgres", "postgresql", "database", "sql"], "brave-search": ["search", "web search", "internet", "lookup"], "fetch": ["http", "download", "web", "api", "url"], "sqlite": ["sqlite", "database", "local database"], "time": ["time", "date", "timezone", "timestamp"], "memory": ["memory", "cache", "store", "remember"], "arxiv": ["arxiv", "research", "papers", "academic"], "everart": ["image", "art", "generate", "picture"], "gmail": ["gmail", "email", "mail", "send"], "google-calendar": ["calendar", "schedule", "appointment"], "notion": ["notion", "notes", "wiki", "knowledge"], }
[docs] def analyze_task(self, task_description: str) -> TaskRequirements: """Analyze a task description to determine requirements. Args: task_description: Natural language task description Returns: TaskRequirements with analyzed needs """ task_lower = task_description.lower() # Extract keywords keywords = re.findall(r"\b\w+\b", task_lower) keywords = [w for w in keywords if len(w) > 2] # Determine required capabilities required_capabilities = [] for capability, patterns in self.capability_patterns.items(): if any(pattern in task_lower for pattern in patterns): required_capabilities.append(capability) # Determine preferred categories category_scores = {} for capability in required_capabilities: if capability in ["filesystem", "database", "git"]: category_scores["development"] = ( category_scores.get("development", 0) + 1 ) elif capability in ["web", "search", "ai"]: category_scores["utilities"] = category_scores.get("utilities", 0) + 1 elif capability in ["calendar", "email"]: category_scores["productivity"] = ( category_scores.get("productivity", 0) + 1 ) preferred_categories = sorted( category_scores.keys(), key=lambda x: category_scores[x], reverse=True ) # Suggest specific servers suggested_servers = [] for server, patterns in self.server_patterns.items(): score = sum(1 for pattern in patterns if pattern in task_lower) if score > 0: suggested_servers.append((server, score)) suggested_servers.sort(key=lambda x: x[1], reverse=True) suggested_servers = [s[0] for s in suggested_servers[:5]] # Calculate complexity complexity_score = len(required_capabilities) + len(suggested_servers) * 0.5 return TaskRequirements( keywords=keywords[:10], # Top 10 keywords required_capabilities=required_capabilities, preferred_categories=preferred_categories, suggested_servers=suggested_servers, complexity_score=min(complexity_score, 10.0), )
[docs] class MCPServerSelector: """Intelligent MCP server selection and recommendation system.""" def __init__(self, servers: list[dict[str, Any]] | None = None): """Initialize server selector. Args: servers: List of server documentation. If None, loads all available. """ if servers is None: loader = MCPDocumentationLoader() servers_dict = loader.load_all_mcp_documents() servers = list(servers_dict.values()) self.servers = servers self.filter = ServerFilter(servers) self.analyzer = TaskAnalyzer() # Build name to server mapping self.server_map = {} for server in servers: metadata = server.get("metadata", {}) name = metadata.get("name", "") if name: self.server_map[name] = server
[docs] def get_available_prefixes(self) -> list[str]: """Get all available server prefixes/namespaces. Returns: List of unique prefixes found in server names """ return sorted(self.filter.by_prefix.keys())
[docs] def get_available_categories(self) -> list[str]: """Get all available server categories. Returns: List of unique categories """ return sorted(self.filter.by_category.keys())
[docs] def filter_by_prefix(self, prefix: str) -> list[dict[str, Any]]: """Filter servers by prefix/namespace. Args: prefix: Prefix to filter by (e.g., "modelcontextprotocol/") Returns: List of servers with matching prefix """ return self.filter.filter_by_prefix(prefix)
[docs] def recommend_for_task( self, task_description: str, max_servers: int = 5, include_experimental: bool = False, ) -> list[ServerScore]: """Recommend servers for a specific task. Args: task_description: Natural language description of the task max_servers: Maximum number of servers to recommend include_experimental: Whether to include experimental servers Returns: List of ServerScore objects ranked by relevance """ requirements = self.analyzer.analyze_task(task_description) scores = [] for server in self.servers: metadata = server.get("metadata", {}) name = metadata.get("name", "") description = metadata.get("description", "").lower() category = metadata.get("category", "") # Skip experimental servers if not requested if not include_experimental and "experimental" in name.lower(): continue score = 0.0 reasons = [] capabilities_match = [] category_match = False prefix_match = False # Score based on suggested servers if any(suggested in name for suggested in requirements.suggested_servers): score += 5.0 reasons.append("Directly mentioned in task analysis") # Score based on capability keywords for capability in requirements.required_capabilities: if capability in description: score += 2.0 capabilities_match.append(capability) reasons.append(f"Provides {capability} capability") # Score based on category match if category in requirements.preferred_categories: score += 1.0 category_match = True reasons.append(f"Matches preferred category: {category}") # Score based on keyword overlap task_keywords = set(requirements.keywords) desc_keywords = set(description.split()) overlap = len(task_keywords & desc_keywords) if overlap > 0: score += overlap * 0.5 reasons.append(f"Keyword overlap: {overlap} matches") # Bonus for well-known/stable servers if name.startswith("modelcontextprotocol/"): score += 0.5 prefix_match = True reasons.append("Official ModelContextProtocol server") if score > 0: scores.append( ServerScore( server_name=name, score=score, reasons=reasons, capabilities_match=capabilities_match, category_match=category_match, prefix_match=prefix_match, ) ) # Sort by score and return top results scores.sort(key=lambda x: x.score, reverse=True) return scores[:max_servers]
[docs] async def interactive_select( self, prompt: str = "Select MCP servers to use:", categories: list[str] | None = None, prefixes: list[str] | None = None, max_selections: int | None = None, ) -> list[str]: """Interactive server selection interface. Args: prompt: Prompt to display to user categories: Filter by categories prefixes: Filter by prefixes max_selections: Maximum number of selections allowed Returns: List of selected server names """ # Filter servers based on criteria if categories or prefixes: candidates = self.filter.filter_by_multiple_criteria( prefixes=prefixes, categories=categories ) else: candidates = self.servers if not candidates: return [] # Display available servers for _i, server in enumerate(candidates, 1): metadata = server.get("metadata", {}) name = metadata.get("name", "Unknown") metadata.get("description", "No description") metadata.get("category", "Uncategorized") # Get user selections selections = [] while True: try: user_input = input( f"\nEnter server numbers (1-{len(candidates)}) separated by commas, or 'done': " ).strip() if user_input.lower() == "done": break if user_input.lower() == "all": selections = list(range(len(candidates))) break # Parse selections numbers = [int(x.strip()) for x in user_input.split(",") if x.strip()] valid_numbers = [n for n in numbers if 1 <= n <= len(candidates)] if valid_numbers: selections.extend( [n - 1 for n in valid_numbers] ) # Convert to 0-based selections = list(set(selections)) # Remove duplicates if max_selections and len(selections) >= max_selections: selections = selections[:max_selections] break for idx in selections: name = candidates[idx].get("metadata", {}).get("name", "Unknown") except (ValueError, IndexError): pass except KeyboardInterrupt: return [] # Return selected server names selected_names = [] for idx in selections: name = candidates[idx].get("metadata", {}).get("name") if name: selected_names.append(name) return selected_names
[docs] def create_config_for_selection( self, selected_servers: list[str], lazy_init: bool = True ) -> MCPConfig: """Create MCPConfig for selected servers. Args: selected_servers: List of server names to include lazy_init: Whether to use lazy initialization Returns: MCPConfig with selected servers """ # Use documentation loader to get setup info and create configs loader = MCPDocumentationLoader() servers = {} for server_name in selected_servers: if server_name in self.server_map: server_doc = self.server_map[server_name] setup_info = loader.extract_setup_info(server_doc) # Create MCPServerConfig config = self._create_server_config_from_setup(setup_info) if config: servers[config.name] = config return MCPConfig( enabled=True, servers=servers, lazy_init=lazy_init, auto_discover=False, categories=None, required_capabilities=None, on_server_connected=None, on_server_failed=None, on_tool_discovered=None, )
def _create_server_config_from_setup( self, setup_info: dict[str, Any] ) -> MCPServerConfig | None: """Create MCPServerConfig from setup information.""" try: # Determine transport and connection info transport = MCPTransport.STDIO # Default command = None args = [] url = None # Extract from installation steps for step in setup_info.get("installation", []): if "npx" in step: command = "npx" # Extract package name parts = step.split() idx = parts.index("npx") if "npx" in parts else -1 if idx >= 0 and idx + 1 < len(parts): args = parts[idx + 1 :] elif "http" in step: # URL-based server transport = MCPTransport.SSE url = step return MCPServerConfig( name=setup_info.get("name", "unknown").replace("/", "_"), transport=transport, command=command, args=args, url=url, capabilities=setup_info.get("capabilities", []), category=setup_info.get("category", ""), description=setup_info.get("description", ""), env=setup_info.get("configuration", {}), api_key=setup_info.get("api_key"), health_check_interval=setup_info.get("health_check_interval", 60), ) except Exception as e: logger.exception(f"Failed to create server config: {e}") return None
[docs] def get_selection_summary(self, selected_servers: list[str]) -> dict[str, Any]: """Get summary of selected servers. Args: selected_servers: List of selected server names Returns: Summary dictionary with statistics and details """ categories = {} total_capabilities = set() prefixes = set() server_details = [] for name in selected_servers: if name in self.server_map: server = self.server_map[name] metadata = server.get("metadata", {}) category = metadata.get("category", "Uncategorized") categories[category] = categories.get(category, 0) + 1 # Extract capabilities from description (simple heuristic) description = metadata.get("description", "").lower() if "file" in description: total_capabilities.add("filesystem") if "database" in description or "sql" in description: total_capabilities.add("database") if "web" in description or "http" in description: total_capabilities.add("web") if "git" in description: total_capabilities.add("git") if "/" in name: prefixes.add(name.split("/")[0]) server_details.append( { "name": name, "category": category, "description": metadata.get("description", ""), } ) return { "total_servers": len(selected_servers), "categories": categories, "capabilities": list(total_capabilities), "prefixes": list(prefixes), "servers": server_details, }