167 lines
5.0 KiB
Python
167 lines
5.0 KiB
Python
"""
|
|
HTTP gateway for exposing MCP Browser via the Streamable HTTP transport.
|
|
|
|
This module provides a lightweight HTTP server that accepts JSON-RPC requests
|
|
over HTTP POST (and optionally Server-Sent Events in the future) and forwards
|
|
them to an underlying MCPBrowser instance.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from typing import Optional, Dict, Any
|
|
|
|
from aiohttp import web
|
|
|
|
from .proxy import MCPBrowser
|
|
from .logging_config import get_logger
|
|
|
|
|
|
class StreamableHTTPGateway:
|
|
"""Expose an MCPBrowser instance via HTTP."""
|
|
|
|
def __init__(
|
|
self,
|
|
browser: MCPBrowser,
|
|
*,
|
|
host: str = "127.0.0.1",
|
|
port: int = 0,
|
|
path: str = "/mcp",
|
|
allow_origin: Optional[str] = None,
|
|
) -> None:
|
|
self.browser = browser
|
|
self.host = host
|
|
self.port = port
|
|
self.path = path.rstrip("/") or "/mcp"
|
|
self.allow_origin = allow_origin
|
|
self.logger = get_logger(__name__)
|
|
|
|
self._app: Optional[web.Application] = None
|
|
self._runner: Optional[web.AppRunner] = None
|
|
self._site: Optional[web.TCPSite] = None
|
|
self._shutdown_event = asyncio.Event()
|
|
|
|
async def start(self) -> None:
|
|
"""Start the HTTP server."""
|
|
if self._runner:
|
|
return
|
|
|
|
await self.browser.initialize()
|
|
|
|
self._app = web.Application()
|
|
self._app.add_routes(
|
|
[
|
|
web.post(self.path, self._handle_rpc_request),
|
|
web.post(f"{self.path}/", self._handle_rpc_request),
|
|
web.get(self.path, self._handle_get_not_supported),
|
|
web.get(f"{self.path}/", self._handle_get_not_supported),
|
|
]
|
|
)
|
|
|
|
self._runner = web.AppRunner(self._app)
|
|
await self._runner.setup()
|
|
|
|
self._site = web.TCPSite(self._runner, self.host, self.port)
|
|
await self._site.start()
|
|
|
|
addresses = []
|
|
if self._runner.addresses:
|
|
for addr in self._runner.addresses:
|
|
if isinstance(addr, tuple):
|
|
addresses.append(f"http://{addr[0]}:{addr[1]}{self.path}")
|
|
self.logger.info(
|
|
"Streamable HTTP gateway listening on %s",
|
|
", ".join(addresses) if addresses else f"{self.host}:{self.port}",
|
|
)
|
|
|
|
async def close(self) -> None:
|
|
"""Shutdown HTTP server and underlying browser."""
|
|
if self._site:
|
|
await self._site.stop()
|
|
self._site = None
|
|
if self._runner:
|
|
await self._runner.cleanup()
|
|
self._runner = None
|
|
if self._app:
|
|
await self._app.shutdown()
|
|
await self._app.cleanup()
|
|
self._app = None
|
|
await self.browser.close()
|
|
self._shutdown_event.set()
|
|
|
|
async def serve_forever(self) -> None:
|
|
"""Run until cancelled."""
|
|
await self.start()
|
|
await self._shutdown_event.wait()
|
|
|
|
async def _handle_rpc_request(self, request: web.Request) -> web.StreamResponse:
|
|
"""Handle POST /mcp requests."""
|
|
try:
|
|
payload = await request.json(loads=json.loads)
|
|
if not isinstance(payload, dict):
|
|
raise ValueError("JSON-RPC payload must be a JSON object")
|
|
except Exception as exc:
|
|
self.logger.warning("Invalid JSON payload: %s", exc)
|
|
return web.json_response(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"id": None,
|
|
"error": {"code": -32700, "message": "Invalid JSON payload"},
|
|
},
|
|
status=400,
|
|
)
|
|
|
|
try:
|
|
response = await self.browser.call(payload)
|
|
except Exception as exc:
|
|
self.logger.error("Error processing request: %s", exc)
|
|
response = {
|
|
"jsonrpc": "2.0",
|
|
"id": payload.get("id"),
|
|
"error": {"code": -32603, "message": str(exc)},
|
|
}
|
|
|
|
headers: Dict[str, str] = {}
|
|
if self.allow_origin:
|
|
headers["Access-Control-Allow-Origin"] = self.allow_origin
|
|
|
|
return web.json_response(response, headers=headers)
|
|
|
|
async def _handle_get_not_supported(self, request: web.Request) -> web.StreamResponse:
|
|
"""Placeholder handler for SSE subscription attempts."""
|
|
return web.json_response(
|
|
{
|
|
"error": {
|
|
"code": -32601,
|
|
"message": "Server-initiated streams are not yet supported. Use POST requests.",
|
|
}
|
|
},
|
|
status=405,
|
|
)
|
|
|
|
|
|
async def run_streamable_http_gateway(
|
|
browser: MCPBrowser,
|
|
*,
|
|
host: str,
|
|
port: int,
|
|
path: str,
|
|
allow_origin: Optional[str] = None,
|
|
) -> None:
|
|
"""Utility to run the gateway until cancelled."""
|
|
gateway = StreamableHTTPGateway(
|
|
browser,
|
|
host=host,
|
|
port=port,
|
|
path=path,
|
|
allow_origin=allow_origin,
|
|
)
|
|
|
|
try:
|
|
await gateway.serve_forever()
|
|
except asyncio.CancelledError:
|
|
raise
|
|
finally:
|
|
await gateway.close()
|