mcp-browser/mcp_browser/http_gateway.py

167 lines
5.0 KiB
Python

"""
HTTP gateway for exposing MCP Browser via the Streamable HTTP transport.
This module provides a lightweight HTTP server that accepts JSON-RPC requests
over HTTP POST (and optionally Server-Sent Events in the future) and forwards
them to an underlying MCPBrowser instance.
"""
from __future__ import annotations
import asyncio
import json
from typing import Optional, Dict, Any
from aiohttp import web
from .proxy import MCPBrowser
from .logging_config import get_logger
class StreamableHTTPGateway:
"""Expose an MCPBrowser instance via HTTP."""
def __init__(
self,
browser: MCPBrowser,
*,
host: str = "127.0.0.1",
port: int = 0,
path: str = "/mcp",
allow_origin: Optional[str] = None,
) -> None:
self.browser = browser
self.host = host
self.port = port
self.path = path.rstrip("/") or "/mcp"
self.allow_origin = allow_origin
self.logger = get_logger(__name__)
self._app: Optional[web.Application] = None
self._runner: Optional[web.AppRunner] = None
self._site: Optional[web.TCPSite] = None
self._shutdown_event = asyncio.Event()
async def start(self) -> None:
"""Start the HTTP server."""
if self._runner:
return
await self.browser.initialize()
self._app = web.Application()
self._app.add_routes(
[
web.post(self.path, self._handle_rpc_request),
web.post(f"{self.path}/", self._handle_rpc_request),
web.get(self.path, self._handle_get_not_supported),
web.get(f"{self.path}/", self._handle_get_not_supported),
]
)
self._runner = web.AppRunner(self._app)
await self._runner.setup()
self._site = web.TCPSite(self._runner, self.host, self.port)
await self._site.start()
addresses = []
if self._runner.addresses:
for addr in self._runner.addresses:
if isinstance(addr, tuple):
addresses.append(f"http://{addr[0]}:{addr[1]}{self.path}")
self.logger.info(
"Streamable HTTP gateway listening on %s",
", ".join(addresses) if addresses else f"{self.host}:{self.port}",
)
async def close(self) -> None:
"""Shutdown HTTP server and underlying browser."""
if self._site:
await self._site.stop()
self._site = None
if self._runner:
await self._runner.cleanup()
self._runner = None
if self._app:
await self._app.shutdown()
await self._app.cleanup()
self._app = None
await self.browser.close()
self._shutdown_event.set()
async def serve_forever(self) -> None:
"""Run until cancelled."""
await self.start()
await self._shutdown_event.wait()
async def _handle_rpc_request(self, request: web.Request) -> web.StreamResponse:
"""Handle POST /mcp requests."""
try:
payload = await request.json(loads=json.loads)
if not isinstance(payload, dict):
raise ValueError("JSON-RPC payload must be a JSON object")
except Exception as exc:
self.logger.warning("Invalid JSON payload: %s", exc)
return web.json_response(
{
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": "Invalid JSON payload"},
},
status=400,
)
try:
response = await self.browser.call(payload)
except Exception as exc:
self.logger.error("Error processing request: %s", exc)
response = {
"jsonrpc": "2.0",
"id": payload.get("id"),
"error": {"code": -32603, "message": str(exc)},
}
headers: Dict[str, str] = {}
if self.allow_origin:
headers["Access-Control-Allow-Origin"] = self.allow_origin
return web.json_response(response, headers=headers)
async def _handle_get_not_supported(self, request: web.Request) -> web.StreamResponse:
"""Placeholder handler for SSE subscription attempts."""
return web.json_response(
{
"error": {
"code": -32601,
"message": "Server-initiated streams are not yet supported. Use POST requests.",
}
},
status=405,
)
async def run_streamable_http_gateway(
browser: MCPBrowser,
*,
host: str,
port: int,
path: str,
allow_origin: Optional[str] = None,
) -> None:
"""Utility to run the gateway until cancelled."""
gateway = StreamableHTTPGateway(
browser,
host=host,
port=port,
path=path,
allow_origin=allow_origin,
)
try:
await gateway.serve_forever()
except asyncio.CancelledError:
raise
finally:
await gateway.close()