193 lines
7.1 KiB
Python
193 lines
7.1 KiB
Python
"""
|
|
Base MCP server implementation for Python.
|
|
|
|
Provides a foundation for building MCP servers with standard
|
|
JSON-RPC handling and tool management.
|
|
"""
|
|
|
|
import sys
|
|
import json
|
|
import asyncio
|
|
from typing import Dict, Any, List, Optional, Callable
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
|
class BaseMCPServer(ABC):
|
|
"""Base class for MCP servers."""
|
|
|
|
def __init__(self, name: str, version: str = "1.0.0"):
|
|
self.name = name
|
|
self.version = version
|
|
self.tools: Dict[str, Dict[str, Any]] = {}
|
|
self._running = False
|
|
|
|
@abstractmethod
|
|
async def handle_tool_call(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Handle a tool call. Must be implemented by subclasses."""
|
|
pass
|
|
|
|
def register_tool(self, name: str, description: str, input_schema: Dict[str, Any],
|
|
handler: Optional[Callable] = None):
|
|
"""Register a tool with the server."""
|
|
self.tools[name] = {
|
|
"name": name,
|
|
"description": description,
|
|
"inputSchema": input_schema,
|
|
"handler": handler
|
|
}
|
|
|
|
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Handle a JSON-RPC request."""
|
|
method = request.get("method")
|
|
params = request.get("params", {})
|
|
request_id = request.get("id")
|
|
|
|
try:
|
|
if method == "initialize":
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"protocolVersion": "2024-11-05",
|
|
"capabilities": {
|
|
"tools": {}
|
|
},
|
|
"serverInfo": {
|
|
"name": self.name,
|
|
"version": self.version
|
|
}
|
|
}
|
|
}
|
|
|
|
elif method == "tools/list":
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": {
|
|
"tools": list(self.tools.values())
|
|
}
|
|
}
|
|
|
|
elif method == "tools/call":
|
|
tool_name = params.get("name")
|
|
arguments = params.get("arguments", {})
|
|
|
|
if tool_name not in self.tools:
|
|
raise Exception(f"Tool '{tool_name}' not found")
|
|
|
|
# Use registered handler if available, otherwise use abstract method
|
|
tool_info = self.tools[tool_name]
|
|
if tool_info.get("handler"):
|
|
result = await tool_info["handler"](arguments)
|
|
else:
|
|
result = await self.handle_tool_call(tool_name, arguments)
|
|
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"result": result
|
|
}
|
|
|
|
else:
|
|
raise Exception(f"Method '{method}' not found")
|
|
|
|
except Exception as e:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": request_id,
|
|
"error": {
|
|
"code": -32603,
|
|
"message": str(e)
|
|
}
|
|
}
|
|
|
|
async def run(self):
|
|
"""Run the MCP server, reading from stdin and writing to stdout."""
|
|
self._running = True
|
|
|
|
# Log server startup
|
|
print(f"DEBUG: {self.name} server starting", file=sys.stderr, flush=True)
|
|
|
|
# Use asyncio for stdin reading
|
|
loop = asyncio.get_event_loop()
|
|
stdin_reader = asyncio.StreamReader()
|
|
stdin_protocol = asyncio.StreamReaderProtocol(stdin_reader)
|
|
|
|
try:
|
|
# Connect stdin to async reader
|
|
await loop.connect_read_pipe(lambda: stdin_protocol, sys.stdin)
|
|
except Exception as e:
|
|
print(f"ERROR: {self.name} failed to setup async stdin: {e}", file=sys.stderr, flush=True)
|
|
return
|
|
|
|
# Main server loop
|
|
while self._running:
|
|
try:
|
|
# Read a line with timeout to prevent hanging
|
|
line = await asyncio.wait_for(
|
|
stdin_reader.readline(),
|
|
timeout=None # No timeout - we want to wait indefinitely for commands
|
|
)
|
|
|
|
if not line:
|
|
# Empty bytes means EOF
|
|
print(f"DEBUG: {self.name} detected EOF on stdin", file=sys.stderr, flush=True)
|
|
break
|
|
|
|
# Decode and process line
|
|
line_str = line.decode('utf-8').strip()
|
|
if not line_str:
|
|
continue
|
|
|
|
print(f"DEBUG: {self.name} received: {line_str}", file=sys.stderr, flush=True)
|
|
|
|
try:
|
|
request = json.loads(line_str)
|
|
response = await self.handle_request(request)
|
|
response_str = json.dumps(response)
|
|
print(f"DEBUG: {self.name} sending: {response_str}", file=sys.stderr, flush=True)
|
|
print(response_str, flush=True)
|
|
except json.JSONDecodeError as e:
|
|
print(f"ERROR: {self.name} JSON decode error: {e}", file=sys.stderr, flush=True)
|
|
error_response = {
|
|
"jsonrpc": "2.0",
|
|
"id": None,
|
|
"error": {
|
|
"code": -32700,
|
|
"message": "Parse error"
|
|
}
|
|
}
|
|
print(json.dumps(error_response), flush=True)
|
|
except Exception as e:
|
|
print(f"ERROR: {self.name} request handling error: {e}", file=sys.stderr, flush=True)
|
|
error_response = {
|
|
"jsonrpc": "2.0",
|
|
"id": None,
|
|
"error": {
|
|
"code": -32603,
|
|
"message": str(e)
|
|
}
|
|
}
|
|
print(json.dumps(error_response), flush=True)
|
|
|
|
except asyncio.CancelledError:
|
|
print(f"DEBUG: {self.name} cancelled", file=sys.stderr, flush=True)
|
|
break
|
|
except KeyboardInterrupt:
|
|
print(f"DEBUG: {self.name} interrupted", file=sys.stderr, flush=True)
|
|
break
|
|
except Exception as e:
|
|
print(f"ERROR: {self.name} unexpected error: {e}", file=sys.stderr, flush=True)
|
|
# Continue running despite errors
|
|
await asyncio.sleep(0.1)
|
|
|
|
print(f"DEBUG: {self.name} server exiting", file=sys.stderr, flush=True)
|
|
|
|
def content_text(self, text: str) -> Dict[str, Any]:
|
|
"""Helper to create text content response."""
|
|
return {
|
|
"content": [{
|
|
"type": "text",
|
|
"text": text
|
|
}]
|
|
} |