""" HTTP gateway for exposing MCP Browser via the Streamable HTTP transport. This module provides a lightweight HTTP server that accepts JSON-RPC requests over HTTP POST (and optionally Server-Sent Events in the future) and forwards them to an underlying MCPBrowser instance. """ from __future__ import annotations import asyncio import json from typing import Optional, Dict, Any from aiohttp import web from .proxy import MCPBrowser from .logging_config import get_logger class StreamableHTTPGateway: """Expose an MCPBrowser instance via HTTP.""" def __init__( self, browser: MCPBrowser, *, host: str = "127.0.0.1", port: int = 0, path: str = "/mcp", allow_origin: Optional[str] = None, ) -> None: self.browser = browser self.host = host self.port = port self.path = path.rstrip("/") or "/mcp" self.allow_origin = allow_origin self.logger = get_logger(__name__) self._app: Optional[web.Application] = None self._runner: Optional[web.AppRunner] = None self._site: Optional[web.TCPSite] = None self._shutdown_event = asyncio.Event() async def start(self) -> None: """Start the HTTP server.""" if self._runner: return await self.browser.initialize() self._app = web.Application() self._app.add_routes( [ web.post(self.path, self._handle_rpc_request), web.post(f"{self.path}/", self._handle_rpc_request), web.get(self.path, self._handle_get_not_supported), web.get(f"{self.path}/", self._handle_get_not_supported), ] ) self._runner = web.AppRunner(self._app) await self._runner.setup() self._site = web.TCPSite(self._runner, self.host, self.port) await self._site.start() addresses = [] if self._runner.addresses: for addr in self._runner.addresses: if isinstance(addr, tuple): addresses.append(f"http://{addr[0]}:{addr[1]}{self.path}") self.logger.info( "Streamable HTTP gateway listening on %s", ", ".join(addresses) if addresses else f"{self.host}:{self.port}", ) async def close(self) -> None: """Shutdown HTTP server and underlying browser.""" if self._site: await self._site.stop() self._site = None if self._runner: await self._runner.cleanup() self._runner = None if self._app: await self._app.shutdown() await self._app.cleanup() self._app = None await self.browser.close() self._shutdown_event.set() async def serve_forever(self) -> None: """Run until cancelled.""" await self.start() await self._shutdown_event.wait() async def _handle_rpc_request(self, request: web.Request) -> web.StreamResponse: """Handle POST /mcp requests.""" try: payload = await request.json(loads=json.loads) if not isinstance(payload, dict): raise ValueError("JSON-RPC payload must be a JSON object") except Exception as exc: self.logger.warning("Invalid JSON payload: %s", exc) return web.json_response( { "jsonrpc": "2.0", "id": None, "error": {"code": -32700, "message": "Invalid JSON payload"}, }, status=400, ) try: response = await self.browser.call(payload) except Exception as exc: self.logger.error("Error processing request: %s", exc) response = { "jsonrpc": "2.0", "id": payload.get("id"), "error": {"code": -32603, "message": str(exc)}, } headers: Dict[str, str] = {} if self.allow_origin: headers["Access-Control-Allow-Origin"] = self.allow_origin return web.json_response(response, headers=headers) async def _handle_get_not_supported(self, request: web.Request) -> web.StreamResponse: """Placeholder handler for SSE subscription attempts.""" return web.json_response( { "error": { "code": -32601, "message": "Server-initiated streams are not yet supported. Use POST requests.", } }, status=405, ) async def run_streamable_http_gateway( browser: MCPBrowser, *, host: str, port: int, path: str, allow_origin: Optional[str] = None, ) -> None: """Utility to run the gateway until cancelled.""" gateway = StreamableHTTPGateway( browser, host=host, port=port, path=path, allow_origin=allow_origin, ) try: await gateway.serve_forever() except asyncio.CancelledError: raise finally: await gateway.close()