"""MCP Server Installer Service with LLM fallback and HITL approval.
Provides a complete install pipeline:
1. Search the 1,960 server database for matching servers
2. Extract install command from README (primary)
3. Fall back to LLM to derive install command if extraction fails
4. HITL approval before installation (configurable)
5. Install via npx/uvx/pip and verify connectivity
The service is usable standalone or integrated into IntelligentMCPAgent.
Example:
.. code-block:: python
from haive.mcp.installer_service import MCPInstallerService
service = MCPInstallerService(require_approval=True)
result = await service.search_and_install("postgres")
# Searches DB → finds server → derives install → asks approval → installs
"""
from __future__ import annotations
import asyncio
import json
import logging
import subprocess
import shutil
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable
from haive.mcp.config import MCPServerConfig, MCPTransport
from haive.mcp.documentation.doc_loader import MCPDocumentationLoader
from haive.mcp.self_query import MCPSelfQuery
logger = logging.getLogger(__name__)
[docs]
class InstallMethod(str, Enum):
"""How the install command was determined."""
README = "readme"
LLM = "llm"
FALLBACK = "fallback"
[docs]
@dataclass
class InstallPlan:
"""Plan for installing an MCP server."""
server_name: str
description: str
install_command: str
method: InstallMethod
config: dict[str, Any]
repository_url: str = ""
stars: int | None = None
confidence: float = 1.0
[docs]
@dataclass
class InstallResult:
"""Result of an installation attempt."""
success: bool
server_name: str
message: str
config: dict[str, Any] | None = None
tools_discovered: list[str] = field(default_factory=list)
[docs]
class MCPInstallerService:
"""Service for searching, configuring, and installing MCP servers.
Supports three modes for deriving install commands:
1. **README extraction** -- parses install commands from server READMEs
2. **LLM fallback** -- uses an LLM to analyze the repo and suggest install
3. **Pattern fallback** -- derives from repo name conventions
HITL approval can be enabled via ``require_approval=True``. The default
approval handler prints to stdout and waits for input. Override with a
custom ``approval_callback``.
"""
def __init__(
self,
require_approval: bool = True,
approval_callback: Callable[[InstallPlan], bool] | None = None,
llm_callback: Callable[[str, str], str] | None = None,
auto_verify: bool = True,
):
"""Initialize the installer service.
Args:
require_approval: Require human approval before install.
approval_callback: Custom approval function. Receives an
``InstallPlan`` and returns ``True`` to approve.
If ``None``, uses interactive stdin prompt.
llm_callback: Custom LLM function for install command derivation.
Receives ``(server_name, readme_content)`` and returns
an install command string. If ``None``, uses a built-in
prompt with langchain (if available).
auto_verify: Verify the command exists before installing.
"""
self.require_approval = require_approval
self.approval_callback = approval_callback
self.llm_callback = llm_callback
self.auto_verify = auto_verify
self._query = MCPSelfQuery()
self._loader = self._query.loader
@property
def server_count(self) -> int:
return self._query.server_count
# ------------------------------------------------------------------
# Search
# ------------------------------------------------------------------
[docs]
def search(self, query: str, limit: int = 10) -> list[dict[str, Any]]:
"""Search the server database."""
return self._query.search(query, limit=limit)
[docs]
def get_detail(self, name: str) -> dict[str, Any] | None:
"""Get enriched detail for a server."""
return self._query.get_server_detail(name)
# ------------------------------------------------------------------
# Install plan
# ------------------------------------------------------------------
[docs]
async def plan_install(self, server_name: str) -> InstallPlan | None:
"""Create an install plan for a server.
Tries README extraction first, then LLM fallback.
Args:
server_name: Server name (exact or partial match).
Returns:
``InstallPlan`` or ``None`` if server not found.
"""
enriched = self._loader.get_enriched_server(server_name)
if enriched is None:
return None
install_cmd = enriched.get("install_command", "")
method = InstallMethod.README
confidence = 0.9
# If no install command from README, try LLM fallback
if not install_cmd:
readme = enriched.get("readme_content", "")
install_cmd = await self._llm_derive_install(
enriched.get("name", server_name), readme
)
if install_cmd:
method = InstallMethod.LLM
confidence = 0.7
else:
# Last resort: pattern-based fallback
owner = enriched.get("repository_owner", "")
repo = enriched.get("repository_name", "")
if repo:
install_cmd = f"npx -y {repo}"
method = InstallMethod.FALLBACK
confidence = 0.4
if not install_cmd:
return None
config = self._loader.generate_server_config(server_name) or {
"command": install_cmd.split()[0],
"args": install_cmd.split()[1:],
"transport": "stdio",
}
return InstallPlan(
server_name=enriched.get("name", server_name),
description=enriched.get("description", ""),
install_command=install_cmd,
method=method,
config=config,
repository_url=enriched.get("repository_url", ""),
stars=enriched.get("stars"),
confidence=confidence,
)
# ------------------------------------------------------------------
# Approval
# ------------------------------------------------------------------
[docs]
async def approve(self, plan: InstallPlan) -> bool:
"""Request approval for an install plan.
Uses ``approval_callback`` if set, otherwise prompts on stdin.
Args:
plan: The install plan to approve.
Returns:
``True`` if approved.
"""
if not self.require_approval:
return True
if self.approval_callback:
result = self.approval_callback(plan)
if asyncio.iscoroutine(result):
return await result
return result
# Default: interactive stdin
return self._interactive_approve(plan)
@staticmethod
def _interactive_approve(plan: InstallPlan) -> bool:
"""Interactive approval via stdin."""
print(f"\n Install MCP Server")
print(f" {'=' * 40}")
print(f" Server: {plan.server_name}")
if plan.description:
print(f" Desc: {plan.description[:80]}")
print(f" Command: {plan.install_command}")
print(f" Method: {plan.method.value} (confidence: {plan.confidence:.0%})")
if plan.repository_url:
print(f" Repo: {plan.repository_url}")
print()
try:
answer = input(" Approve? [y/N] ").strip().lower()
return answer in ("y", "yes")
except (EOFError, KeyboardInterrupt):
return False
# ------------------------------------------------------------------
# Install
# ------------------------------------------------------------------
[docs]
async def install(self, plan: InstallPlan) -> InstallResult:
"""Execute an install plan.
Verifies the command is available, then attempts installation.
Args:
plan: Approved install plan.
Returns:
``InstallResult`` with success status.
"""
cmd_parts = plan.install_command.split()
binary = cmd_parts[0]
# Verify binary exists
if self.auto_verify and not shutil.which(binary):
return InstallResult(
success=False,
server_name=plan.server_name,
message=f"Command '{binary}' not found. Install it first.",
)
# Try connecting with langchain-mcp-adapters
try:
from langchain_mcp_adapters.client import MultiServerMCPClient
lc_config = {
plan.server_name: {
"command": plan.config.get("command", binary),
"args": plan.config.get("args", cmd_parts[1:]),
"transport": "stdio",
}
}
if plan.config.get("env"):
lc_config[plan.server_name]["env"] = plan.config["env"]
client = MultiServerMCPClient(lc_config)
tools = await client.get_tools()
tool_names = [t.name for t in tools]
return InstallResult(
success=True,
server_name=plan.server_name,
message=f"Connected! {len(tools)} tools available.",
config=plan.config,
tools_discovered=tool_names,
)
except Exception as e:
return InstallResult(
success=False,
server_name=plan.server_name,
message=f"Connection failed: {e}",
config=plan.config,
)
# ------------------------------------------------------------------
# Combined flow
# ------------------------------------------------------------------
[docs]
async def search_and_install(self, query: str) -> InstallResult | None:
"""Full pipeline: search → plan → approve → install.
Args:
query: Search query (e.g., ``"postgres"``, ``"filesystem"``).
Returns:
``InstallResult`` or ``None`` if no server found / rejected.
"""
# Search
results = self.search(query)
if not results:
logger.info(f"No servers found for '{query}'")
return None
# Plan for best match
plan = await self.plan_install(results[0]["name"])
if plan is None:
logger.info(f"Could not create install plan for {results[0]['name']}")
return None
# Approve
approved = await self.approve(plan)
if not approved:
return InstallResult(
success=False,
server_name=plan.server_name,
message="Installation rejected by user.",
)
# Install
return await self.install(plan)
[docs]
def search_and_install_sync(self, query: str) -> InstallResult | None:
"""Synchronous wrapper for ``search_and_install``."""
return asyncio.run(self.search_and_install(query))
# ------------------------------------------------------------------
# Generate configs (for external use)
# ------------------------------------------------------------------
[docs]
def generate_mcp_server_config(self, server_name: str) -> MCPServerConfig | None:
"""Generate an ``MCPServerConfig`` for a server.
Args:
server_name: Server name.
Returns:
``MCPServerConfig`` ready to use with ``MCPManager``.
"""
config = self._loader.generate_server_config(server_name)
if config is None:
return None
return MCPServerConfig(
name=server_name,
transport=MCPTransport.STDIO,
command=config["command"],
args=config.get("args", []),
env=config.get("env", {}),
)
[docs]
def generate_langchain_config(self, server_name: str) -> dict[str, Any] | None:
"""Generate a ``langchain-mcp-adapters`` compatible config.
Args:
server_name: Server name.
Returns:
Dict suitable for ``MultiServerMCPClient``.
"""
config = self._loader.generate_server_config(server_name)
if config is None:
return None
lc = {
server_name: {
"command": config["command"],
"args": config.get("args", []),
"transport": "stdio",
}
}
if config.get("env"):
lc[server_name]["env"] = config["env"]
return lc
[docs]
def generate_claude_desktop_config(self, server_name: str) -> dict[str, Any] | None:
"""Generate a Claude Desktop ``mcp.json`` config entry.
Args:
server_name: Server name.
Returns:
Dict in Claude Desktop format.
"""
config = self._loader.generate_server_config(server_name)
if config is None:
return None
entry: dict[str, Any] = {
"command": config["command"],
"args": config.get("args", []),
}
if config.get("env"):
entry["env"] = config["env"]
return {"mcpServers": {server_name: entry}}
# ------------------------------------------------------------------
# LLM fallback
# ------------------------------------------------------------------
async def _llm_derive_install(self, server_name: str, readme: str) -> str:
"""Use LLM to derive install command from server name and README.
Args:
server_name: Server name.
readme: README content (may be empty).
Returns:
Derived install command, or empty string.
"""
if self.llm_callback:
result = self.llm_callback(server_name, readme)
if asyncio.iscoroutine(result):
return await result
return result
# Try using langchain if available
try:
from langchain_openai import ChatOpenAI
except ImportError:
try:
from langchain_anthropic import ChatAnthropic as ChatOpenAI
except ImportError:
logger.debug("No LLM available for install command fallback")
return ""
prompt = f"""Given this MCP server, determine the single install/run command.
Server: {server_name}
Repository README (first 2000 chars):
{readme[:2000] if readme else '(no README available)'}
Rules:
- For npm packages: return "npx -y <package-name>"
- For Python packages: return "uvx <package-name>" or "pip install <package-name>"
- Return ONLY the command, nothing else. No explanation.
- If you cannot determine the command, return "UNKNOWN"
"""
try:
llm = ChatOpenAI(temperature=0)
response = await llm.ainvoke(prompt)
cmd = response.content.strip().strip("`").strip()
if cmd and cmd != "UNKNOWN" and len(cmd) < 200:
return cmd
except Exception as e:
logger.debug(f"LLM fallback failed: {e}")
return ""