From 721cf1e82b3fda63166de330020f93d00a505799 Mon Sep 17 00:00:00 2001 From: Tyler Rockwood Date: Thu, 29 May 2025 03:44:36 +0000 Subject: [PATCH] mcp: bump sse timeout 5 seconds is too low for slow services --- src/redpanda/agents/__init__.py | 2 ++ src/redpanda/agents/_mcp.py | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/src/redpanda/agents/__init__.py b/src/redpanda/agents/__init__.py index fe234d5..e7240ae 100644 --- a/src/redpanda/agents/__init__.py +++ b/src/redpanda/agents/__init__.py @@ -18,6 +18,7 @@ MCPEndpoint, SSEMCPEndpoint, StdioMCPEndpoint, + StreamableHTTPMCPEndpoint, WebsocketMCPEndpoint, ) from ._tools import Tool @@ -30,4 +31,5 @@ "StdioMCPEndpoint", "SSEMCPEndpoint", "WebsocketMCPEndpoint", + "StreamableHTTPMCPEndpoint", ] diff --git a/src/redpanda/agents/_mcp.py b/src/redpanda/agents/_mcp.py index 4f2f464..407014f 100644 --- a/src/redpanda/agents/_mcp.py +++ b/src/redpanda/agents/_mcp.py @@ -14,11 +14,13 @@ import json from contextlib import asynccontextmanager +from datetime import timedelta from typing import Any, override from mcp import ClientSession, Tool as MCPToolDef from mcp.client.sse import sse_client from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.client.streamable_http import streamablehttp_client from mcp.client.websocket import websocket_client from mcp.shared.exceptions import McpError from opentelemetry import trace @@ -100,6 +102,29 @@ def __init__(self, url: str, cache_enabled: bool = True): self.url = url +class StreamableHTTPMCPEndpoint(MCPEndpoint): + """ + A MCP endpoint that communicates with an MCP server over HTTP streaming. + """ + + url: str + + def __init__(self, url: str, cache_enabled: bool = True): + """ + Create a new StreamableHTTPMCPEndpoint instance. + + Args: + url: The URL of the HTTP server. + cache_enabled: Whether to cache the list of {tools,resources,prompts} from the server. + """ + super().__init__(cache_enabled) + self.url = url + + @property + def headers(self) -> dict[str, Any]: + return {} + + @asynccontextmanager async def mcp_client(server: MCPEndpoint): """ @@ -110,13 +135,19 @@ async def mcp_client(server: MCPEndpoint): async with ClientSession(read, write) as client: yield MCPClient(server, client) elif isinstance(server, SSEMCPEndpoint): - async with sse_client(server.url, server.headers) as (read, write): + async with sse_client(server.url, server.headers, timeout=30) as (read, write): async with ClientSession(read, write) as client: yield MCPClient(server, client) elif isinstance(server, WebsocketMCPEndpoint): async with websocket_client(server.url) as (read, write): async with ClientSession(read, write) as client: yield MCPClient(server, client) + elif isinstance(server, StreamableHTTPMCPEndpoint): + async with streamablehttp_client( + server.url, server.headers, timeout=timedelta(seconds=30) + ) as (read, write, _get_session_id): + async with ClientSession(read, write) as client: + yield MCPClient(server, client) else: raise NotImplementedError(f"Unknown server type: {server}")