dataflow.mcp.clientΒΆ
MCP Client Integration for haive-dataflow.
This module provides integration between MCP servers and the Haive framework through LangChain MCP adapters and the dataflow registry system.
The client handles: - Connection management to multiple MCP servers - Tool loading and registration from MCP servers - Integration with LangGraph workflows - Health monitoring and error handling
- Classes:
MCPClient: Main client for MCP server integration MCPToolProvider: Provider for MCP tools in the registry MCPServerAdapter: Adapter for individual MCP servers
AttributesΒΆ
ClassesΒΆ
Client for managing MCP server connections and tools. |
|
Adapter for individual MCP servers. |
|
Provider for registering MCP tools in the dataflow registry. |
Module ContentsΒΆ
- class dataflow.mcp.client.MCPClient(registry_system=None)ΒΆ
Client for managing MCP server connections and tools.
This class provides a high-level interface for connecting to MCP servers, loading tools, and integrating with the Haive dataflow registry system.
- registry_systemΒΆ
Reference to the dataflow registry
- mcp_clientΒΆ
Underlying MultiServerMCPClient instance
- connected_serversΒΆ
Dictionary of connected server configurations
- available_toolsΒΆ
Cache of available tools from all servers
Examples
Basic usage:
from haive.dataflow.mcp import MCPClient from haive.dataflow import registry_system
client = MCPClient(registry_system) await client.initialize_from_registry()
# Get available tools tools = await client.get_available_tools()
# Execute a tool result = await client.execute_tool(βread_fileβ, {βpathβ: β/path/to/fileβ})
- async check_server_health(server_name: str) haive.dataflow.registry.models.MCPServerHealthΒΆ
Check health of a specific MCP server.
- Parameters:
server_name β Name of the server to check
- Returns:
Server health information
- async connect_to_servers(server_configs: dict[str, haive.dataflow.registry.models.MCPServerConfig]) boolΒΆ
Connect to specific MCP servers.
- Parameters:
server_configs β Dictionary of server name to configuration
- Returns:
True if connection successful, False otherwise
- async execute_tool(tool_name: str, parameters: dict[str, Any]) AnyΒΆ
Execute a specific MCP tool.
- Parameters:
tool_name β Name of the tool to execute
parameters β Tool parameters
- Returns:
Tool execution result
- async get_available_tools() list[Any]ΒΆ
Get all available tools from connected MCP servers.
- Returns:
List of LangChain Tool objects
- async get_server_health_status() dict[str, haive.dataflow.registry.models.MCPServerHealth]ΒΆ
Get health status for all connected servers.
- Returns:
Dictionary of server name to health status
- async initialize_from_registry() boolΒΆ
Initialize MCP client with servers from the registry.
- Returns:
True if initialization successful, False otherwise
- registry_system = NoneΒΆ
- class dataflow.mcp.client.MCPServerAdapter(config: haive.dataflow.registry.models.MCPServerConfig)ΒΆ
Adapter for individual MCP servers.
This class provides a consistent interface for working with individual MCP servers, handling connection, tool execution, and health monitoring.
- async connect() boolΒΆ
Connect to the MCP server.
- Returns:
True if connection successful, False otherwise
- async disconnect()ΒΆ
Disconnect from the MCP server.
- async execute_tool(tool_name: str, parameters: dict[str, Any]) AnyΒΆ
Execute a tool on this server.
- Parameters:
tool_name β Name of the tool to execute
parameters β Tool parameters
- Returns:
Tool execution result
- async get_available_tools() list[str]ΒΆ
Get list of available tools on this server.
- Returns:
List of tool names
- get_health_status() haive.dataflow.registry.models.MCPServerHealthΒΆ
Get current health status.
- Returns:
Current health status
- configΒΆ
- is_connected = FalseΒΆ
- last_health_check: datetime.datetime | None = NoneΒΆ
- class dataflow.mcp.client.MCPToolProvider(mcp_client: MCPClient, registry_system=None)ΒΆ
Provider for registering MCP tools in the dataflow registry.
This class handles the discovery and registration of tools from MCP servers into the Haive dataflow registry system for broader discovery and use.
- async discover_and_register_tools() list[str]ΒΆ
Discover MCP tools and register them in the dataflow registry.
- Returns:
List of registry IDs for registered tools
- mcp_clientΒΆ
- registry_system = NoneΒΆ
- dataflow.mcp.client.LANGCHAIN_MCP_AVAILABLE = TrueΒΆ
- dataflow.mcp.client.MCP_SDK_AVAILABLE = TrueΒΆ
- dataflow.mcp.client.loggerΒΆ