Source code for haive.mcp.client.transport

"""MCP Transport Layer Implementation.

This module provides transport layer implementations for the MCP protocol.
Different transports handle the underlying communication mechanism between
the MCP client and server.

Supported Transports:
    - STDIO: Communication via stdin/stdout with a subprocess
    - HTTP: RESTful communication over HTTP
    - SSE: Server-sent events for streaming
    - WebSocket: Real-time bidirectional communication
    - Docker: Communication via stdin/stdout with a Docker container

The transport layer is responsible for:
    - Establishing and managing connections
    - Sending and receiving raw messages
    - Handling transport-specific encoding/decoding
    - Managing connection lifecycle
"""

import asyncio
import json
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Union, AsyncIterator
import subprocess
import aiohttp
from pydantic import BaseModel, Field

from .exceptions import MCPTransportError, MCPConnectionError, MCPTimeoutError

logger = logging.getLogger(__name__)


[docs] class MCPTransport(ABC): """Abstract base class for MCP transports. All MCP transports must implement this interface to provide a consistent API for the protocol layer. The transport handles the low-level communication while the protocol layer handles MCP message semantics. The transport is responsible for: - Connection establishment and teardown - Raw message sending and receiving - Transport-specific error handling - Connection state management """ def __init__(self, timeout: float = 30.0): """Initialize transport with timeout. Args: timeout: Default timeout for operations in seconds """ self.timeout = timeout self.connected = False self._connection_lock = asyncio.Lock()
[docs] @abstractmethod async def connect(self) -> None: """Establish connection to the MCP server. Raises: MCPConnectionError: If connection fails MCPTimeoutError: If connection times out """ pass
[docs] @abstractmethod async def disconnect(self) -> None: """Close connection to the MCP server. Should be idempotent and safe to call multiple times. """ pass
[docs] @abstractmethod async def send_message(self, message: Dict[str, Any]) -> None: """Send a message to the server. Args: message: JSON-RPC message to send Raises: MCPTransportError: If sending fails MCPConnectionError: If not connected """ pass
[docs] @abstractmethod async def receive_message(self) -> Dict[str, Any]: """Receive a message from the server. Returns: JSON-RPC message from server Raises: MCPTransportError: If receiving fails MCPConnectionError: If not connected MCPTimeoutError: If receive times out """ pass
async def __aenter__(self): """Async context manager entry.""" await self.connect() return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Async context manager exit.""" await self.disconnect()
[docs] class StdioTransport(MCPTransport): """STDIO transport for MCP communication. This transport communicates with an MCP server via stdin/stdout of a subprocess. This is the most common transport for MCP servers that are designed to run as command-line tools. The transport handles: - Process lifecycle management - JSON-RPC message framing over stdio - Process cleanup on disconnection - Error handling for process failures Examples: Basic usage:: transport = StdioTransport( command="npx", args=["-y", "@modelcontextprotocol/server-filesystem", "/tmp"] ) With environment variables:: transport = StdioTransport( command="python", args=["-m", "my_mcp_server"], env={"API_KEY": "secret"}, cwd="/path/to/server" ) """ def __init__( self, command: str, args: Optional[List[str]] = None, env: Optional[Dict[str, str]] = None, cwd: Optional[str] = None, timeout: float = 30.0 ): """Initialize STDIO transport. Args: command: Command to execute (e.g., "npx", "python") args: Command line arguments env: Environment variables for the process cwd: Working directory for the process timeout: Timeout for operations """ super().__init__(timeout) self.command = command self.args = args or [] self.env = env self.cwd = cwd self.process: Optional[asyncio.subprocess.Process] = None self._read_queue: Optional[asyncio.Queue] = None self._reader_task: Optional[asyncio.Task] = None
[docs] async def connect(self) -> None: """Start the MCP server process and establish stdio communication.""" async with self._connection_lock: if self.connected: return try: # Build full command full_command = [self.command] + self.args logger.info(f"Starting MCP server: {' '.join(full_command)}") # Start process self.process = await asyncio.create_subprocess_exec( *full_command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=self.env, cwd=self.cwd ) # Set up message reading self._read_queue = asyncio.Queue() self._reader_task = asyncio.create_task(self._reader_loop()) self.connected = True logger.info(f"MCP server started with PID {self.process.pid}") except Exception as e: await self._cleanup() raise MCPConnectionError( f"Failed to start MCP server process: {e}", details={"command": full_command, "error": str(e)} )
[docs] async def disconnect(self) -> None: """Stop the MCP server process and cleanup resources.""" async with self._connection_lock: if not self.connected: return await self._cleanup() self.connected = False logger.info("MCP server process stopped")
async def _cleanup(self) -> None: """Clean up process and tasks.""" # Cancel reader task if self._reader_task and not self._reader_task.done(): self._reader_task.cancel() try: await self._reader_task except asyncio.CancelledError: pass self._reader_task = None # Terminate process if self.process: try: # Try graceful shutdown first if self.process.returncode is None: self.process.terminate() try: await asyncio.wait_for(self.process.wait(), timeout=5.0) except asyncio.TimeoutError: # Force kill if graceful shutdown fails self.process.kill() await self.process.wait() except Exception as e: logger.warning(f"Error during process cleanup: {e}") finally: self.process = None self._read_queue = None
[docs] async def send_message(self, message: Dict[str, Any]) -> None: """Send JSON-RPC message to server via stdin.""" if not self.connected or not self.process or not self.process.stdin: raise MCPConnectionError("Not connected to MCP server") try: # Serialize message message_str = json.dumps(message, separators=(',', ':')) message_bytes = (message_str + '\n').encode('utf-8') # Send to process stdin self.process.stdin.write(message_bytes) await self.process.stdin.drain() logger.debug(f"Sent message: {message_str}") except Exception as e: raise MCPTransportError( f"Failed to send message: {e}", details={"message": message, "error": str(e)} )
[docs] async def receive_message(self) -> Dict[str, Any]: """Receive JSON-RPC message from server via stdout.""" if not self.connected or not self._read_queue: raise MCPConnectionError("Not connected to MCP server") try: # Wait for message with timeout message = await asyncio.wait_for( self._read_queue.get(), timeout=self.timeout ) if isinstance(message, Exception): raise message logger.debug(f"Received message: {json.dumps(message, separators=(',', ':'))}") return message except asyncio.TimeoutError: raise MCPTimeoutError( f"No message received within {self.timeout}s timeout" )
async def _reader_loop(self) -> None: """Background task to read messages from process stdout.""" if not self.process or not self.process.stdout: return try: while True: # Read line from stdout line = await self.process.stdout.readline() if not line: # Process has ended break try: # Parse JSON message line_str = line.decode('utf-8').strip() if line_str: message = json.loads(line_str) await self._read_queue.put(message) except json.JSONDecodeError as e: # Invalid JSON - put error in queue error = MCPTransportError( f"Invalid JSON from server: {e}", details={"raw_line": line_str} ) await self._read_queue.put(error) except Exception as e: # Reader error - put in queue error = MCPTransportError( f"Error reading from server: {e}", details={"error": str(e)} ) await self._read_queue.put(error)
[docs] class HttpTransport(MCPTransport): """HTTP transport for MCP communication. This transport communicates with an MCP server over HTTP using JSON-RPC over HTTP POST requests. This is useful for servers that expose HTTP APIs or for remote MCP servers. Examples: Basic HTTP transport:: transport = HttpTransport("http://localhost:8080/mcp") With authentication:: transport = HttpTransport( "https://api.example.com/mcp", headers={"Authorization": "Bearer token123"} ) """ def __init__( self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0 ): """Initialize HTTP transport. Args: url: Base URL for the MCP server headers: HTTP headers to include in requests timeout: Request timeout in seconds """ super().__init__(timeout) self.url = url self.headers = headers or {} self.session: Optional[aiohttp.ClientSession] = None
[docs] async def connect(self) -> None: """Create HTTP session.""" async with self._connection_lock: if self.connected: return self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout), headers=self.headers ) self.connected = True logger.info(f"HTTP transport connected to {self.url}")
[docs] async def disconnect(self) -> None: """Close HTTP session.""" async with self._connection_lock: if not self.connected: return if self.session: await self.session.close() self.session = None self.connected = False logger.info("HTTP transport disconnected")
[docs] async def send_message(self, message: Dict[str, Any]) -> None: """Send JSON-RPC message via HTTP POST. Note: For HTTP transport, sending and receiving are combined in a request-response cycle. This method stores the message for the next receive_message call. """ if not self.connected or not self.session: raise MCPConnectionError("HTTP transport not connected") # Store message for request-response cycle self._pending_message = message
[docs] async def receive_message(self) -> Dict[str, Any]: """Send pending message and receive response.""" if not self.connected or not self.session: raise MCPConnectionError("HTTP transport not connected") if not hasattr(self, '_pending_message'): raise MCPTransportError("No message to send") try: async with self.session.post( self.url, json=self._pending_message, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: raise MCPTransportError( f"HTTP error {response.status}: {response.reason}", details={"status": response.status, "url": self.url} ) result = await response.json() delattr(self, '_pending_message') return result except aiohttp.ClientError as e: raise MCPTransportError( f"HTTP request failed: {e}", details={"url": self.url, "error": str(e)} )
[docs] class SseTransport(MCPTransport): """Server-Sent Events transport for MCP communication. This transport uses SSE for receiving messages and HTTP POST for sending. Useful for streaming scenarios where the server needs to push messages to the client. """ def __init__( self, sse_url: str, post_url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0 ): """Initialize SSE transport. Args: sse_url: URL for SSE stream (receiving messages) post_url: URL for POST requests (sending messages) headers: HTTP headers for both requests timeout: Request timeout """ super().__init__(timeout) self.sse_url = sse_url self.post_url = post_url self.headers = headers or {} self.session: Optional[aiohttp.ClientSession] = None self._sse_task: Optional[asyncio.Task] = None self._message_queue: Optional[asyncio.Queue] = None
[docs] async def connect(self) -> None: """Establish SSE connection.""" async with self._connection_lock: if self.connected: return self.session = aiohttp.ClientSession( timeout=aiohttp.ClientTimeout(total=self.timeout), headers=self.headers ) self._message_queue = asyncio.Queue() self._sse_task = asyncio.create_task(self._sse_loop()) self.connected = True logger.info(f"SSE transport connected to {self.sse_url}")
[docs] async def disconnect(self) -> None: """Close SSE connection.""" async with self._connection_lock: if not self.connected: return # Cancel SSE task if self._sse_task and not self._sse_task.done(): self._sse_task.cancel() try: await self._sse_task except asyncio.CancelledError: pass # Close session if self.session: await self.session.close() self.session = None self.connected = False logger.info("SSE transport disconnected")
[docs] async def send_message(self, message: Dict[str, Any]) -> None: """Send message via HTTP POST.""" if not self.connected or not self.session: raise MCPConnectionError("SSE transport not connected") try: async with self.session.post( self.post_url, json=message, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: raise MCPTransportError( f"HTTP error {response.status}: {response.reason}" ) except aiohttp.ClientError as e: raise MCPTransportError(f"Failed to send message: {e}")
[docs] async def receive_message(self) -> Dict[str, Any]: """Receive message from SSE stream.""" if not self.connected or not self._message_queue: raise MCPConnectionError("SSE transport not connected") try: message = await asyncio.wait_for( self._message_queue.get(), timeout=self.timeout ) if isinstance(message, Exception): raise message return message except asyncio.TimeoutError: raise MCPTimeoutError(f"No SSE message received within {self.timeout}s")
async def _sse_loop(self) -> None: """Background task to read SSE messages.""" if not self.session: return try: async with self.session.get(self.sse_url) as response: if response.status != 200: error = MCPTransportError( f"SSE connection failed: {response.status}" ) await self._message_queue.put(error) return async for line in response.content: line_str = line.decode('utf-8').strip() if line_str.startswith('data: '): data = line_str[6:] # Remove 'data: ' prefix try: message = json.loads(data) await self._message_queue.put(message) except json.JSONDecodeError as e: error = MCPTransportError(f"Invalid SSE JSON: {e}") await self._message_queue.put(error) except Exception as e: error = MCPTransportError(f"SSE stream error: {e}") await self._message_queue.put(error)
[docs] class WebSocketTransport(MCPTransport): """WebSocket transport for MCP communication. This transport provides real-time bidirectional communication over WebSocket. Useful for interactive applications and real-time scenarios. """ def __init__( self, url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0 ): """Initialize WebSocket transport. Args: url: WebSocket URL (ws:// or wss://) headers: WebSocket headers timeout: Connection timeout """ super().__init__(timeout) self.url = url self.headers = headers or {} self.session: Optional[aiohttp.ClientSession] = None self.websocket: Optional[aiohttp.ClientWebSocketResponse] = None
[docs] async def connect(self) -> None: """Establish WebSocket connection.""" async with self._connection_lock: if self.connected: return try: self.session = aiohttp.ClientSession() self.websocket = await self.session.ws_connect( self.url, headers=self.headers, timeout=self.timeout ) self.connected = True logger.info(f"WebSocket connected to {self.url}") except Exception as e: await self._cleanup() raise MCPConnectionError(f"WebSocket connection failed: {e}")
[docs] async def disconnect(self) -> None: """Close WebSocket connection.""" async with self._connection_lock: if not self.connected: return await self._cleanup() self.connected = False logger.info("WebSocket disconnected")
async def _cleanup(self) -> None: """Clean up WebSocket resources.""" if self.websocket and not self.websocket.closed: await self.websocket.close() self.websocket = None if self.session: await self.session.close() self.session = None
[docs] async def send_message(self, message: Dict[str, Any]) -> None: """Send message over WebSocket.""" if not self.connected or not self.websocket: raise MCPConnectionError("WebSocket not connected") try: await self.websocket.send_str(json.dumps(message)) except Exception as e: raise MCPTransportError(f"Failed to send WebSocket message: {e}")
[docs] async def receive_message(self) -> Dict[str, Any]: """Receive message from WebSocket.""" if not self.connected or not self.websocket: raise MCPConnectionError("WebSocket not connected") try: msg = await asyncio.wait_for( self.websocket.receive(), timeout=self.timeout ) if msg.type == aiohttp.WSMsgType.TEXT: return json.loads(msg.data) elif msg.type == aiohttp.WSMsgType.ERROR: raise MCPTransportError(f"WebSocket error: {msg.data}") elif msg.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED): raise MCPConnectionError("WebSocket connection closed") else: raise MCPTransportError(f"Unexpected WebSocket message type: {msg.type}") except asyncio.TimeoutError: raise MCPTimeoutError(f"No WebSocket message within {self.timeout}s") except json.JSONDecodeError as e: raise MCPTransportError(f"Invalid JSON in WebSocket message: {e}")
[docs] class DockerTransport(MCPTransport): """Docker transport for MCP communication. This transport runs an MCP server inside a Docker container and communicates via stdin/stdout, similar to StdioTransport but using ``docker run`` as the process wrapper. This provides isolation, reproducibility, and simplified dependency management for MCP servers. The transport handles: - Docker container lifecycle management - Volume mounting for data access - Port mapping for network services - Environment variable passing to the container - Graceful container shutdown with ``docker stop`` Examples: Basic Docker transport:: transport = DockerTransport( image="mcp/postgres", env={"POSTGRES_HOST": "host.docker.internal"}, ) With volumes and network:: transport = DockerTransport( image="mcp/filesystem", volumes=["/home/user/data:/data:ro"], network="host", env={"ALLOWED_DIRS": "/data"}, ) """ def __init__( self, image: str, env: Optional[Dict[str, str]] = None, volumes: Optional[List[str]] = None, ports: Optional[List[str]] = None, network: Optional[str] = None, docker_args: Optional[List[str]] = None, timeout: float = 30.0, ): """Initialize Docker transport. Args: image: Docker image name (e.g., ``"mcp/postgres"``) env: Environment variables to pass to the container volumes: Volume mounts (e.g., ``["/host/path:/container/path:ro"]``) ports: Port mappings (e.g., ``["8080:8080"]``) network: Docker network to attach to (e.g., ``"host"``) docker_args: Extra arguments to pass to ``docker run`` timeout: Timeout for operations in seconds """ super().__init__(timeout) self.image = image self.env = env or {} self.volumes = volumes or [] self.ports = ports or [] self.network = network self.docker_args = docker_args or [] self.process: Optional[asyncio.subprocess.Process] = None self._read_queue: Optional[asyncio.Queue] = None self._reader_task: Optional[asyncio.Task] = None self._container_id: Optional[str] = None def _build_docker_command(self) -> List[str]: """Build the ``docker run`` command.""" cmd = ["docker", "run", "--rm", "-i"] # Environment variables for key, value in self.env.items(): cmd.extend(["-e", f"{key}={value}"]) # Volume mounts for vol in self.volumes: cmd.extend(["-v", vol]) # Port mappings for port in self.ports: cmd.extend(["-p", port]) # Network if self.network: cmd.extend(["--network", self.network]) # Extra args cmd.extend(self.docker_args) # Image cmd.append(self.image) return cmd
[docs] async def connect(self) -> None: """Start the Docker container and establish stdio communication.""" async with self._connection_lock: if self.connected: return full_command = self._build_docker_command() logger.info(f"Starting MCP server in Docker: {' '.join(full_command)}") try: self.process = await asyncio.create_subprocess_exec( *full_command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) self._read_queue = asyncio.Queue() self._reader_task = asyncio.create_task(self._reader_loop()) self.connected = True logger.info( f"Docker MCP server started (PID {self.process.pid}, " f"image={self.image})" ) except FileNotFoundError: raise MCPConnectionError( "Docker is not installed or not in PATH. " "Install Docker from https://docs.docker.com/get-docker/", details={"command": full_command}, ) except Exception as e: await self._cleanup() raise MCPConnectionError( f"Failed to start Docker container: {e}", details={"command": full_command, "error": str(e)}, )
[docs] async def disconnect(self) -> None: """Stop the Docker container and clean up resources.""" async with self._connection_lock: if not self.connected: return await self._cleanup() self.connected = False logger.info(f"Docker MCP server stopped (image={self.image})")
async def _cleanup(self) -> None: """Clean up container and tasks.""" # Cancel reader task if self._reader_task and not self._reader_task.done(): self._reader_task.cancel() try: await self._reader_task except asyncio.CancelledError: pass self._reader_task = None # Terminate container process if self.process: try: if self.process.returncode is None: self.process.terminate() try: await asyncio.wait_for(self.process.wait(), timeout=10.0) except asyncio.TimeoutError: self.process.kill() await self.process.wait() except Exception as e: logger.warning(f"Error during Docker container cleanup: {e}") finally: self.process = None self._read_queue = None
[docs] async def send_message(self, message: Dict[str, Any]) -> None: """Send JSON-RPC message to the container via stdin.""" if not self.connected or not self.process or not self.process.stdin: raise MCPConnectionError("Not connected to Docker MCP server") try: message_str = json.dumps(message, separators=(",", ":")) message_bytes = (message_str + "\n").encode("utf-8") self.process.stdin.write(message_bytes) await self.process.stdin.drain() logger.debug(f"Sent message to Docker container: {message_str}") except Exception as e: raise MCPTransportError( f"Failed to send message to Docker container: {e}", details={"message": message, "error": str(e)}, )
[docs] async def receive_message(self) -> Dict[str, Any]: """Receive JSON-RPC message from the container via stdout.""" if not self.connected or not self._read_queue: raise MCPConnectionError("Not connected to Docker MCP server") try: message = await asyncio.wait_for( self._read_queue.get(), timeout=self.timeout ) if isinstance(message, Exception): raise message logger.debug( f"Received message from Docker container: " f"{json.dumps(message, separators=(',', ':'))}" ) return message except asyncio.TimeoutError: raise MCPTimeoutError( f"No message received from Docker container within {self.timeout}s" )
async def _reader_loop(self) -> None: """Background task to read messages from container stdout.""" if not self.process or not self.process.stdout: return try: while True: line = await self.process.stdout.readline() if not line: break try: line_str = line.decode("utf-8").strip() if line_str: message = json.loads(line_str) await self._read_queue.put(message) except json.JSONDecodeError as e: error = MCPTransportError( f"Invalid JSON from Docker container: {e}", details={"raw_line": line_str}, ) await self._read_queue.put(error) except Exception as e: error = MCPTransportError( f"Error reading from Docker container: {e}", details={"error": str(e)}, ) await self._read_queue.put(error)