mcp-browser/mcp_browser/server.py

291 lines
10 KiB
Python

"""
MCP server process management.
Handles spawning, lifecycle management, and communication with MCP servers.
Supports both interactive and non-interactive modes.
"""
import os
import json
import asyncio
import subprocess
from typing import Optional, Dict, Any, Callable, List, Union
from .buffer import JsonRpcBuffer
from .config import MCPServerConfig
from .logging_config import get_logger, TRACE
from .streamable_http import StreamableHTTPClient, StreamableHTTPError
import logging
class MCPServer:
"""Manages a single MCP server process."""
def __init__(self, config: MCPServerConfig, logger: Optional[logging.Logger] = None):
self.config = config
self.logger = logger or get_logger(__name__)
self.process: Optional[subprocess.Popen] = None
self.buffer = JsonRpcBuffer()
self._running = False
self._message_handlers: List[Callable[[dict], None]] = []
self._next_id = 1
self._pending_requests: Dict[Union[str, int], asyncio.Future] = {}
self._last_error_time: Optional[float] = None
self._offline_since: Optional[float] = None
self._http_client: Optional[StreamableHTTPClient] = None
async def start(self):
"""Start the MCP server process."""
if self.config.transport.type == "streamable-http":
if self._http_client:
return
if not self.config.transport.url:
raise ValueError("Streamable HTTP transport requires 'url' option")
self.logger.info(f"Connecting to streamable HTTP endpoint: {self.config.transport.url}")
self._http_client = StreamableHTTPClient(
self.config.transport.url,
headers=self.config.transport.headers,
timeout=self.config.transport.timeout,
sse_timeout=self.config.transport.sse_timeout,
oauth_config=self.config.transport.oauth,
logger=self.logger,
)
await self._http_client.start()
self._running = True
self._offline_since = None
return
if self.process:
return
# Check if server is marked as offline
import time
if self._offline_since:
offline_duration = time.time() - self._offline_since
if offline_duration < 1800: # 30 minutes
self.logger.warning(f"Server has been offline for {offline_duration:.0f}s, skipping start")
raise RuntimeError(f"Server marked as offline since {offline_duration:.0f}s ago")
if not self.config.command:
raise ValueError("Server command not configured for stdio transport")
# Prepare environment
env = os.environ.copy()
env.update({
"NODE_NO_READLINE": "1",
"PYTHONUNBUFFERED": "1",
**self.config.env
})
# Build command
cmd = self.config.command + self.config.args
self.logger.info(f"Starting MCP server: {' '.join(cmd)}")
try:
# Start process
self.process = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
text=True,
bufsize=0 # Unbuffered
)
self._running = True
self._offline_since = None # Clear offline state
# Start reading outputs
asyncio.create_task(self._read_stdout())
asyncio.create_task(self._read_stderr())
except Exception as e:
self.logger.error(f"Failed to start server: {e}")
self._mark_offline()
raise
async def stop(self):
"""Stop the MCP server process."""
if self.config.transport.type == "streamable-http":
self._running = False
if self._http_client:
await self._http_client.stop()
self._http_client = None
return
self._running = False
if self.process:
self.process.terminate()
try:
await asyncio.wait_for(
asyncio.to_thread(self.process.wait),
timeout=5.0
)
except asyncio.TimeoutError:
self.process.kill()
self.process = None
def _mark_offline(self):
"""Mark server as offline."""
import time
self._offline_since = time.time()
self.logger.warning(f"Server marked as offline")
async def send_request(self, method: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Send a JSON-RPC request and wait for response.
Args:
method: JSON-RPC method name
params: Optional parameters
Returns:
Response result or raises exception on error
"""
if self.config.transport.type == "streamable-http":
return await self._send_request_http(method, params or {})
if not self.process:
raise RuntimeError("MCP server not started")
request_id = self._next_id
self._next_id += 1
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params or {}
}
# Create future for response
future = asyncio.Future()
self._pending_requests[request_id] = future
# Send request
request_str = json.dumps(request) + "\n"
self.process.stdin.write(request_str)
self.process.stdin.flush()
self.logger.log(TRACE, f">>> {request_str.strip()}")
# Wait for response with appropriate timeout
timeout = 3.0 if method == "initialize" or method == "tools/list" else 30.0
try:
response = await asyncio.wait_for(future, timeout=timeout)
return response
except asyncio.TimeoutError:
del self._pending_requests[request_id]
self.logger.error(f"Timeout waiting for response to {method} (timeout={timeout}s)")
self._mark_offline()
raise TimeoutError(f"No response for request {request_id}")
async def _send_request_http(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Send request via streamable HTTP transport."""
if not self._http_client:
raise RuntimeError("Streamable HTTP client not started")
request_id = self._next_id
self._next_id += 1
request = {
"jsonrpc": "2.0",
"id": request_id,
"method": method,
"params": params
}
loop = asyncio.get_running_loop()
future: asyncio.Future = loop.create_future()
self._pending_requests[request_id] = future
timeout = self.config.transport.timeout or 30.0
try:
await self._http_client.send(request, self._handle_message)
result = await asyncio.wait_for(future, timeout=timeout)
return result
except asyncio.TimeoutError:
raise TimeoutError(f"No response for request {request_id}")
except StreamableHTTPError:
raise
except Exception as exc:
self.logger.error(f"Streamable HTTP request failed: {exc}")
raise
finally:
self._pending_requests.pop(request_id, None)
def send_raw(self, message: str):
"""Send raw message to MCP server (for pass-through)."""
if not self.process:
raise RuntimeError("MCP server not started")
if not message.endswith('\n'):
message += '\n'
self.logger.log(TRACE, f">>> {message.strip()}")
self.process.stdin.write(message)
self.process.stdin.flush()
def add_message_handler(self, handler: Callable[[dict], None]):
"""Add a handler for incoming messages."""
self._message_handlers.append(handler)
async def _read_stdout(self):
"""Read and process stdout from MCP server."""
while self._running and self.process:
try:
line = await asyncio.to_thread(self.process.stdout.readline)
if not line:
break
messages = self.buffer.append(line)
for msg in messages:
await self._handle_message(msg)
except Exception as e:
self.logger.error(f"Error reading stdout: {e}")
self._mark_offline()
break
async def _read_stderr(self):
"""Read and log stderr from MCP server."""
while self._running and self.process:
try:
line = await asyncio.to_thread(self.process.stderr.readline)
if not line:
break
if line.strip():
self.logger.warning(f"stderr: {line.strip()}")
except Exception:
break
async def _handle_message(self, message: dict):
"""Handle an incoming JSON-RPC message."""
self.logger.log(TRACE, f"<<< {json.dumps(message)}")
# Check if it's a response to a pending request
msg_id = message.get("id")
if msg_id in self._pending_requests:
future = self._pending_requests.pop(msg_id)
if "error" in message:
future.set_exception(Exception(message["error"].get("message", "Unknown error")))
else:
future.set_result(message.get("result"))
# Call registered handlers
for handler in self._message_handlers:
try:
handler(message)
except Exception as e:
self.logger.error(f"Handler error: {e}")