#!/usr/bin/env python3
"""Comprehensive MCP Server Manager.
This script provides a complete management interface for MCP servers including:
- Discovery from multiple sources
- Installation using various methods
- Configuration management
- Health monitoring
- Updates and maintenance
Usage:
python mcp_manager.py discover --auto-install
python mcp_manager.py install --servers filesystem github postgres
python mcp_manager.py install --categories official core
python mcp_manager.py list --status all
python mcp_manager.py health-check
python mcp_manager.py update --all
"""
import asyncio
import json
import logging
import time
import click
from haive.mcp.downloader.core import GeneralMCPDownloader, ServerConfig
# Setup logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
[docs]
class MCPServerManager:
"""Comprehensive MCP Server Manager."""
def __init__(self, config_file: str | None = None, install_dir: str | None = None):
""" Init .
Args:
config_file: [TODO: Add description]
install_dir: [TODO: Add description]
"""
self.downloader = GeneralMCPDownloader(config_file, install_dir)
self.install_dir = self.downloader.install_dir # type: ignore
self.status_file = self.install_dir / "server_status.json"
[docs]
async def discover_all_sources(
self, auto_install: bool = False, limit: int | None = None
) -> dict:
"""Discover MCP servers from all configured sources."""
click.echo("🔍 Discovering MCP servers from all sources...")
discovered = {}
# Use the downloader's discovery method
for source in self.downloader.patterns.get("discovery_sources", []): # type: ignore
click.echo(f" Checking: {source}")
try:
servers = await self.downloader.discover_servers_from_registry(source) # type: ignore
discovered[source] = servers
click.echo(f" Found: {len(servers)} servers")
except Exception as e:
click.echo(f" Error: {e}")
# Flatten and deduplicate
all_servers = []
seen_names = set()
for source, servers in discovered.items():
for server in servers:
name = server.get("name", "")
if name and name not in seen_names:
server["discovered_from"] = source
all_servers.append(server)
seen_names.add(name)
if limit:
all_servers = all_servers[:limit]
click.echo("\n📊 Discovery Summary:")
click.echo(f" Sources checked: {len(discovered)}")
click.echo(f" Unique servers found: {len(all_servers)}")
# Save discovery results
discovery_file = self.install_dir / "discovered_servers.json"
with open(discovery_file, "w") as f:
json.dump(
{
"discovered_at": time.strftime("%Y-%m-%d %H:%M:%S"),
"sources": list(discovered.keys()),
"servers": all_servers,
},
f,
indent=2,
)
click.echo(f" Discovery results saved to: {discovery_file}")
if auto_install:
click.echo("\n🚀 Auto-installing discovered servers...")
return await self.install_discovered_servers(all_servers)
return {"discovered": all_servers, "discovery_file": str(discovery_file)}
[docs]
async def install_discovered_servers(self, servers: list[dict]) -> dict:
"""Install servers from discovery results."""
# Add discovered servers to downloader config
for server_data in servers:
# Determine appropriate template based on server info
template = self._determine_template(server_data)
server = ServerConfig(
name=server_data.get("name", ""),
template=template,
source=server_data.get("source", ""),
variables=server_data.get("variables", {}),
tags=set(server_data.get("tags", [])),
version=server_data.get("version"), # Add missing version parameter
)
# Check if server already exists
existing_names = [s.name for s in self.downloader.servers]
if server.name not in existing_names:
self.downloader.servers.append(server)
# Install all servers
result = await self.downloader.download_servers()
return result.model_dump() # Convert DownloadResult to dict
def _determine_template(self, server_data: dict) -> str:
"""Determine the best template for a discovered server."""
source = server_data.get("source", "")
tags = server_data.get("tags", [])
if "npm" in tags or "npm" in source:
if "@modelcontextprotocol" in server_data.get("variables", {}).get(
"package", ""
):
return "npm_official"
if "@" in server_data.get("variables", {}).get("package", ""):
return "npm_scoped"
return "npm_community"
if "git" in tags or "github" in tags:
return "git_repo"
if "docker" in tags:
return "docker_image"
if "pip" in tags or "python" in tags:
return "pypi_package"
return "npm_community" # Default fallback
[docs]
async def install_servers(
self, server_names: list[str] | None = None, categories: list[str] | None = None
) -> dict:
"""Install specific servers or categories."""
if server_names:
click.echo(f"📦 Installing servers: {', '.join(server_names)}")
elif categories:
click.echo(f"📦 Installing categories: {', '.join(categories)}")
else:
click.echo("📦 Installing all enabled servers")
result = await self.downloader.download_servers(
server_names=server_names, categories=categories
)
# Update status tracking
await self._update_server_status(result.model_dump())
return result.model_dump() # Convert DownloadResult to dict
[docs]
async def list_servers(self, status_filter: str = "all") -> dict:
"""List servers with their status."""
servers_info = []
status_data = self._load_server_status()
for server in self.downloader.servers:
template = self.downloader.templates.get(server.template, {})
server_status = status_data.get(server.name, {})
info = {
"name": server.name,
"template": server.template,
"category": getattr(template, "category", "unknown"),
"enabled": server.enabled,
"tags": list(server.tags) if hasattr(server, "tags") else [],
"installation_method": getattr(
template, "installation_method", "unknown"
),
"status": server_status.get("status", "unknown"),
"last_check": server_status.get("last_check"),
"last_success": server_status.get("last_success"),
}
# Filter by status
if status_filter != "all":
if status_filter == "installed" and info["status"] != "installed":
continue
if (
(status_filter == "failed" and info["status"] != "failed")
or (status_filter == "enabled" and not info["enabled"])
or (status_filter == "disabled" and info["enabled"])
):
continue
servers_info.append(info)
return {"servers": servers_info, "total": len(servers_info)}
[docs]
async def health_check(self, server_names: list[str] | None = None) -> dict:
"""Perform health checks on servers."""
click.echo("🏥 Performing health checks...")
servers_to_check = []
if server_names:
servers_to_check = [
s for s in self.downloader.servers if s.name in server_names
]
else:
servers_to_check = [s for s in self.downloader.servers if s.enabled]
results = []
for server in servers_to_check:
template = self.downloader.templates.get(server.template)
if not template:
continue
click.echo(f" Checking: {server.name}")
# Find appropriate installer for verification
installer = None
for inst in self.downloader.installers:
if await inst.can_handle(server, template):
installer = inst
break
if installer:
try:
is_healthy = await installer.verify(
server, template, self.install_dir
)
status = "healthy" if is_healthy else "unhealthy"
except Exception as e:
status = "error"
click.echo(f" Error: {e}")
else:
status = "no_installer"
results.append(
{
"server": server.name,
"status": status,
"checked_at": time.strftime("%Y-%m-%d %H:%M:%S"),
}
)
click.echo(f" Status: {status}")
# Update status file
await self._update_health_status(results)
# Summary
healthy = len([r for r in results if r["status"] == "healthy"])
total = len(results)
click.echo("\n📊 Health Check Summary:")
click.echo(f" Healthy: {healthy}/{total}")
click.echo(
f" Health rate: {healthy / total * 100:.1f}%"
if total > 0
else " No servers checked"
)
return {"results": results, "healthy": healthy, "total": total}
[docs]
async def update_servers(
self, server_names: list[str] | None = None, force: bool = False
) -> dict:
"""Update servers to latest versions."""
click.echo("🔄 Updating servers...")
# For now, this re-installs servers (future: implement proper update logic)
return await self.install_servers(server_names)
def _load_server_status(self) -> dict:
"""Load server status from file."""
if self.status_file.exists():
try:
with open(self.status_file) as f:
return json.load(f)
except BaseException:
pass
return {}
async def _update_server_status(self, install_result: dict):
"""Update server status after installation."""
status_data = self._load_server_status()
# Update successful installations
for server_info in install_result.get("successful_servers", []):
server_name = server_info["server"]
status_data[server_name] = {
"status": "installed",
"last_success": time.strftime("%Y-%m-%d %H:%M:%S"),
"last_check": time.strftime("%Y-%m-%d %H:%M:%S"),
"install_result": server_info["result"],
}
# Update failed installations
for server_info in install_result.get("failed_servers", []):
server_name = server_info["server"]
status_data[server_name] = {
"status": "failed",
"last_check": time.strftime("%Y-%m-%d %H:%M:%S"),
"error": server_info["error"],
}
# Save status
with open(self.status_file, "w") as f:
json.dump(status_data, f, indent=2)
async def _update_health_status(self, health_results: list[dict]):
"""Update health check results."""
status_data = self._load_server_status()
for result in health_results:
server_name = result["server"]
if server_name not in status_data:
status_data[server_name] = {}
status_data[server_name]["health_status"] = result["status"]
status_data[server_name]["last_health_check"] = result["checked_at"]
# Save status
with open(self.status_file, "w") as f:
json.dump(status_data, f, indent=2)
# CLI Interface
@click.group()
@click.option("--config", help="Configuration file path")
@click.option("--install-dir", help="Installation directory")
@click.pass_context
def cli(ctx, config, install_dir):
"""MCP Server Manager - Comprehensive management for Model Context Protocol servers."""
ctx.ensure_object(dict)
ctx.obj["manager"] = MCPServerManager(config, install_dir)
@cli.command()
@click.option(
"--auto-install", is_flag=True, help="Automatically install discovered servers"
)
@click.option("--limit", type=int, help="Limit number of servers to discover")
@click.pass_context
def discover(ctx, auto_install, limit):
"""Discover MCP servers from all configured sources."""
manager = ctx.obj["manager"]
result = asyncio.run(manager.discover_all_sources(auto_install, limit))
if not auto_install:
click.echo(f"\n📋 Discovered {len(result['discovered'])} servers")
click.echo(f"💾 Results saved to: {result['discovery_file']}")
click.echo("\nUse 'mcp_manager.py install' to install discovered servers")
@cli.command()
@click.option("--servers", multiple=True, help="Specific servers to install")
@click.option("--categories", multiple=True, help="Server categories to install")
@click.option("--all", "install_all", is_flag=True, help="Install all enabled servers")
@click.pass_context
def install(ctx, servers, categories, install_all):
"""Install MCP servers."""
manager = ctx.obj["manager"]
server_names = list(servers) if servers else None
category_list = list(categories) if categories else None
if not server_names and not category_list and not install_all:
click.echo("Please specify --servers, --categories, or --all")
return
result = asyncio.run(manager.install_servers(server_names, category_list))
click.echo("\n📊 Installation Results:")
click.echo(f" Total: {result['total']}")
click.echo(f" Successful: {result['successful']} ({result['success_rate']:.1f}%)")
click.echo(f" Failed: {result['failed']}")
@cli.command()
@click.option(
"--status",
type=click.Choice(["all", "installed", "failed", "enabled", "disabled"]),
default="all",
help="Filter servers by status",
)
@click.option(
"--format",
"output_format",
type=click.Choice(["table", "json"]),
default="table",
help="Output format",
)
@click.pass_context
def list_servers(ctx, status, output_format):
"""List MCP servers and their status."""
manager = ctx.obj["manager"]
result = asyncio.run(manager.list_servers(status))
if output_format == "json":
click.echo(json.dumps(result, indent=2))
else:
# Table format
servers = result["servers"]
if not servers:
click.echo("No servers found")
return
click.echo(f"\n📋 MCP Servers ({result['total']} total)")
click.echo("=" * 80)
for server in servers:
status_emoji = {"installed": "✅", "failed": "❌", "unknown": "❓"}.get(
server["status"], "⚪"
)
enabled_emoji = "🟢" if server["enabled"] else "🔴"
click.echo(
f"{status_emoji} {enabled_emoji} {server['name']:<25} {server['category']:<15} {
server['installation_method']
}"
)
@cli.command()
@click.option("--servers", multiple=True, help="Specific servers to check")
@click.pass_context
def health_check(ctx, servers):
"""Perform health checks on MCP servers."""
manager = ctx.obj["manager"]
server_names = list(servers) if servers else None
asyncio.run(manager.health_check(server_names))
@cli.command()
@click.option("--servers", multiple=True, help="Specific servers to update")
@click.option("--all", "update_all", is_flag=True, help="Update all servers")
@click.option("--force", is_flag=True, help="Force update even if current")
@click.pass_context
def update(ctx, servers, update_all, force):
"""Update MCP servers to latest versions."""
manager = ctx.obj["manager"]
server_names = list(servers) if servers else None
if not server_names and not update_all:
click.echo("Please specify --servers or --all")
return
result = asyncio.run(manager.update_servers(server_names, force))
click.echo("\n📊 Update Results:")
click.echo(f" Updated: {result['successful']}")
click.echo(f" Failed: {result['failed']}")
@cli.command()
@click.pass_context
def config(ctx):
"""Show current configuration."""
manager = ctx.obj["manager"]
click.echo("📋 Current Configuration")
click.echo("=" * 50)
click.echo(f"Install Directory: {manager.install_dir}")
click.echo(f"Templates: {len(manager.downloader.templates)}")
click.echo(f"Servers: {len(manager.downloader.servers)}")
click.echo(
f"Discovery Sources: {len(manager.downloader.patterns.get('discovery_sources', []))}"
)
if __name__ == "__main__":
cli()