""" MCP server process management. Handles spawning, lifecycle management, and communication with MCP servers. Supports both interactive and non-interactive modes. """ import os import json import asyncio import subprocess from typing import Optional, Dict, Any, Callable, List, Union from .buffer import JsonRpcBuffer from .config import MCPServerConfig from .logging_config import get_logger, TRACE from .streamable_http import StreamableHTTPClient, StreamableHTTPError import logging class MCPServer: """Manages a single MCP server process.""" def __init__(self, config: MCPServerConfig, logger: Optional[logging.Logger] = None): self.config = config self.logger = logger or get_logger(__name__) self.process: Optional[subprocess.Popen] = None self.buffer = JsonRpcBuffer() self._running = False self._message_handlers: List[Callable[[dict], None]] = [] self._next_id = 1 self._pending_requests: Dict[Union[str, int], asyncio.Future] = {} self._last_error_time: Optional[float] = None self._offline_since: Optional[float] = None self._http_client: Optional[StreamableHTTPClient] = None async def start(self): """Start the MCP server process.""" if self.config.transport.type == "streamable-http": if self._http_client: return if not self.config.transport.url: raise ValueError("Streamable HTTP transport requires 'url' option") self.logger.info(f"Connecting to streamable HTTP endpoint: {self.config.transport.url}") self._http_client = StreamableHTTPClient( self.config.transport.url, headers=self.config.transport.headers, timeout=self.config.transport.timeout, sse_timeout=self.config.transport.sse_timeout, oauth_config=self.config.transport.oauth, logger=self.logger, ) await self._http_client.start() self._running = True self._offline_since = None return if self.process: return # Check if server is marked as offline import time if self._offline_since: offline_duration = time.time() - self._offline_since if offline_duration < 1800: # 30 minutes self.logger.warning(f"Server has been offline for {offline_duration:.0f}s, skipping start") raise RuntimeError(f"Server marked as offline since {offline_duration:.0f}s ago") if not self.config.command: raise ValueError("Server command not configured for stdio transport") # Prepare environment env = os.environ.copy() env.update({ "NODE_NO_READLINE": "1", "PYTHONUNBUFFERED": "1", **self.config.env }) # Build command cmd = self.config.command + self.config.args self.logger.info(f"Starting MCP server: {' '.join(cmd)}") try: # Start process self.process = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, text=True, bufsize=0 # Unbuffered ) self._running = True self._offline_since = None # Clear offline state # Start reading outputs asyncio.create_task(self._read_stdout()) asyncio.create_task(self._read_stderr()) except Exception as e: self.logger.error(f"Failed to start server: {e}") self._mark_offline() raise async def stop(self): """Stop the MCP server process.""" if self.config.transport.type == "streamable-http": self._running = False if self._http_client: await self._http_client.stop() self._http_client = None return self._running = False if self.process: self.process.terminate() try: await asyncio.wait_for( asyncio.to_thread(self.process.wait), timeout=5.0 ) except asyncio.TimeoutError: self.process.kill() self.process = None def _mark_offline(self): """Mark server as offline.""" import time self._offline_since = time.time() self.logger.warning(f"Server marked as offline") async def send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Send a JSON-RPC request and wait for response. Args: method: JSON-RPC method name params: Optional parameters Returns: Response result or raises exception on error """ if self.config.transport.type == "streamable-http": return await self._send_request_http(method, params or {}) if not self.process: raise RuntimeError("MCP server not started") request_id = self._next_id self._next_id += 1 request = { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params or {} } # Create future for response future = asyncio.Future() self._pending_requests[request_id] = future # Send request request_str = json.dumps(request) + "\n" self.process.stdin.write(request_str) self.process.stdin.flush() self.logger.log(TRACE, f">>> {request_str.strip()}") # Wait for response with appropriate timeout timeout = 3.0 if method == "initialize" or method == "tools/list" else 30.0 try: response = await asyncio.wait_for(future, timeout=timeout) return response except asyncio.TimeoutError: del self._pending_requests[request_id] self.logger.error(f"Timeout waiting for response to {method} (timeout={timeout}s)") self._mark_offline() raise TimeoutError(f"No response for request {request_id}") async def _send_request_http(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: """Send request via streamable HTTP transport.""" if not self._http_client: raise RuntimeError("Streamable HTTP client not started") request_id = self._next_id self._next_id += 1 request = { "jsonrpc": "2.0", "id": request_id, "method": method, "params": params } loop = asyncio.get_running_loop() future: asyncio.Future = loop.create_future() self._pending_requests[request_id] = future timeout = self.config.transport.timeout or 30.0 try: await self._http_client.send(request, self._handle_message) result = await asyncio.wait_for(future, timeout=timeout) return result except asyncio.TimeoutError: raise TimeoutError(f"No response for request {request_id}") except StreamableHTTPError: raise except Exception as exc: self.logger.error(f"Streamable HTTP request failed: {exc}") raise finally: self._pending_requests.pop(request_id, None) def send_raw(self, message: str): """Send raw message to MCP server (for pass-through).""" if not self.process: raise RuntimeError("MCP server not started") if not message.endswith('\n'): message += '\n' self.logger.log(TRACE, f">>> {message.strip()}") self.process.stdin.write(message) self.process.stdin.flush() def add_message_handler(self, handler: Callable[[dict], None]): """Add a handler for incoming messages.""" self._message_handlers.append(handler) async def _read_stdout(self): """Read and process stdout from MCP server.""" while self._running and self.process: try: line = await asyncio.to_thread(self.process.stdout.readline) if not line: break messages = self.buffer.append(line) for msg in messages: await self._handle_message(msg) except Exception as e: self.logger.error(f"Error reading stdout: {e}") self._mark_offline() break async def _read_stderr(self): """Read and log stderr from MCP server.""" while self._running and self.process: try: line = await asyncio.to_thread(self.process.stderr.readline) if not line: break if line.strip(): self.logger.warning(f"stderr: {line.strip()}") except Exception: break async def _handle_message(self, message: dict): """Handle an incoming JSON-RPC message.""" self.logger.log(TRACE, f"<<< {json.dumps(message)}") # Check if it's a response to a pending request msg_id = message.get("id") if msg_id in self._pending_requests: future = self._pending_requests.pop(msg_id) if "error" in message: future.set_exception(Exception(message["error"].get("message", "Unknown error"))) else: future.set_result(message.get("result")) # Call registered handlers for handler in self._message_handlers: try: handler(message) except Exception as e: self.logger.error(f"Handler error: {e}")