diff --git a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py index 5fac56be5..6ef2f0b11 100644 --- a/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py +++ b/examples/clients/simple-auth-client/mcp_simple_auth_client/main.py @@ -18,7 +18,7 @@ from urllib.parse import parse_qs, urlparse import httpx -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from mcp.client._transport import ReadStream, WriteStream from mcp.client.auth import OAuthClientProvider, TokenStorage from mcp.client.session import ClientSession from mcp.client.sse import sse_client @@ -241,8 +241,8 @@ async def _default_redirect_handler(authorization_url: str) -> None: async def _run_session( self, - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], - write_stream: MemoryObjectSendStream[SessionMessage], + read_stream: ReadStream[SessionMessage | Exception], + write_stream: WriteStream[SessionMessage], ): """Run the MCP session with the given streams.""" print("🤝 Initializing MCP session...") diff --git a/src/mcp/client/__main__.py b/src/mcp/client/__main__.py index f3db17906..b9ec34422 100644 --- a/src/mcp/client/__main__.py +++ b/src/mcp/client/__main__.py @@ -6,9 +6,9 @@ from urllib.parse import urlparse import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp import types +from mcp.client._transport import ReadStream, WriteStream from mcp.client.session import ClientSession from mcp.client.sse import sse_client from mcp.client.stdio import StdioServerParameters, stdio_client @@ -33,8 +33,8 @@ async def message_handler( async def run_session( - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], - write_stream: MemoryObjectSendStream[SessionMessage], + read_stream: ReadStream[SessionMessage | Exception], + write_stream: WriteStream[SessionMessage], client_info: types.Implementation | None = None, ): async with ClientSession( diff --git a/src/mcp/client/_transport.py b/src/mcp/client/_transport.py index a86362900..0163fef95 100644 --- a/src/mcp/client/_transport.py +++ b/src/mcp/client/_transport.py @@ -5,11 +5,12 @@ from contextlib import AbstractAsyncContextManager from typing import Protocol -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream - +from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.message import SessionMessage -TransportStreams = tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]] +__all__ = ["ReadStream", "WriteStream", "Transport", "TransportStreams"] + +TransportStreams = tuple[ReadStream[SessionMessage | Exception], WriteStream[SessionMessage]] class Transport(AbstractAsyncContextManager[TransportStreams], Protocol): diff --git a/src/mcp/client/session.py b/src/mcp/client/session.py index a0ca751bd..47dadd77b 100644 --- a/src/mcp/client/session.py +++ b/src/mcp/client/session.py @@ -4,10 +4,10 @@ from typing import Any, Protocol import anyio.lowlevel -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from pydantic import TypeAdapter from mcp import types +from mcp.client._transport import ReadStream, WriteStream from mcp.client.experimental import ExperimentalClientFeatures from mcp.client.experimental.task_handlers import ExperimentalTaskHandlers from mcp.shared._context import RequestContext @@ -109,8 +109,8 @@ class ClientSession( ): def __init__( self, - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], - write_stream: MemoryObjectSendStream[SessionMessage], + read_stream: ReadStream[SessionMessage | Exception], + write_stream: WriteStream[SessionMessage], read_timeout_seconds: float | None = None, sampling_callback: SamplingFnT | None = None, elicitation_callback: ElicitationFnT | None = None, diff --git a/src/mcp/client/sse.py b/src/mcp/client/sse.py index 972efce58..e1e886cc1 100644 --- a/src/mcp/client/sse.py +++ b/src/mcp/client/sse.py @@ -7,11 +7,11 @@ import anyio import httpx from anyio.abc import TaskStatus -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import aconnect_sse from httpx_sse._exceptions import SSEError from mcp import types +from mcp.shared._context_streams import create_context_streams from mcp.shared._httpx_utils import McpHttpClientFactory, create_mcp_http_client from mcp.shared.message import SessionMessage @@ -51,14 +51,8 @@ async def sse_client( auth: Optional HTTPX authentication handler. on_session_created: Optional callback invoked with the session ID when received. """ - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] - read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - - write_stream: MemoryObjectSendStream[SessionMessage] - write_stream_reader: MemoryObjectReceiveStream[SessionMessage] - - read_stream_writer, read_stream = anyio.create_memory_object_stream(0) - write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + write_stream, write_stream_reader = create_context_streams[SessionMessage](0) async with anyio.create_task_group() as tg: try: @@ -132,7 +126,8 @@ async def sse_reader(task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED): async def post_writer(endpoint_url: str): try: async with write_stream_reader: - async for session_message in write_stream_reader: + + async def _send_message(session_message: SessionMessage) -> None: logger.debug(f"Sending client message: {session_message}") response = await client.post( endpoint_url, @@ -144,6 +139,14 @@ async def post_writer(endpoint_url: str): ) response.raise_for_status() logger.debug(f"Client message sent successfully: {response.status_code}") + + async for session_message in write_stream_reader: + sender_ctx = write_stream_reader.last_context + if sender_ctx is not None: + async with anyio.create_task_group() as tg: + sender_ctx.run(tg.start_soon, _send_message, session_message) + else: + await _send_message(session_message) # pragma: no cover except Exception: # pragma: lax no cover logger.exception("Error in post_writer") finally: diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 3416bbc81..d178953f6 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -11,11 +11,11 @@ import anyio import httpx from anyio.abc import TaskGroup -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from httpx_sse import EventSource, ServerSentEvent, aconnect_sse from pydantic import ValidationError from mcp.client._transport import TransportStreams +from mcp.shared._context_streams import ContextReceiveStream, ContextSendStream, create_context_streams from mcp.shared._httpx_utils import create_mcp_http_client from mcp.shared.message import ClientMessageMetadata, SessionMessage from mcp.types import ( @@ -38,8 +38,8 @@ # TODO(Marcelo): Put the TransportStreams in a module under shared, so we can import here. SessionMessageOrError = SessionMessage | Exception -StreamWriter = MemoryObjectSendStream[SessionMessageOrError] -StreamReader = MemoryObjectReceiveStream[SessionMessage] +StreamWriter = ContextSendStream[SessionMessageOrError] +StreamReader = ContextReceiveStream[SessionMessage] MCP_SESSION_ID = "mcp-session-id" MCP_PROTOCOL_VERSION = "mcp-protocol-version" @@ -434,14 +434,15 @@ async def post_writer( client: httpx.AsyncClient, write_stream_reader: StreamReader, read_stream_writer: StreamWriter, - write_stream: MemoryObjectSendStream[SessionMessage], + write_stream: ContextSendStream[SessionMessage], start_get_stream: Callable[[], None], tg: TaskGroup, ) -> None: """Handle writing requests to the server.""" try: async with write_stream_reader: - async for session_message in write_stream_reader: + + async def _handle_message(session_message: SessionMessage) -> None: message = session_message.message metadata = ( session_message.metadata @@ -478,6 +479,14 @@ async def handle_request_async(): else: await handle_request_async() + async for session_message in write_stream_reader: + sender_ctx = write_stream_reader.last_context + if sender_ctx is not None: + async with anyio.create_task_group() as tg_local: + sender_ctx.run(tg_local.start_soon, _handle_message, session_message) + else: + await _handle_message(session_message) # pragma: no cover + except Exception: # pragma: lax no cover logger.exception("Error in post_writer") finally: @@ -533,8 +542,8 @@ async def streamable_http_client( Example: See examples/snippets/clients/ for usage patterns. """ - read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0) - write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0) + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + write_stream, write_stream_reader = create_context_streams[SessionMessage](0) # Determine if we need to create and manage the client client_provided = http_client is not None diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 167f34b8b..b6dcc398d 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -36,6 +36,7 @@ async def main(): from __future__ import annotations +import contextvars import logging import warnings from collections.abc import AsyncIterator, Awaitable, Callable @@ -44,7 +45,6 @@ async def main(): from typing import Any, Generic import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from starlette.applications import Starlette from starlette.middleware import Middleware from starlette.middleware.authentication import AuthenticationMiddleware @@ -65,6 +65,7 @@ async def main(): from mcp.server.streamable_http import EventStore from mcp.server.streamable_http_manager import StreamableHTTPASGIApp, StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings +from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import ServerMessageMetadata, SessionMessage from mcp.shared.session import RequestResponder @@ -355,8 +356,8 @@ def session_manager(self) -> StreamableHTTPSessionManager: async def run( self, - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], - write_stream: MemoryObjectSendStream[SessionMessage], + read_stream: ReadStream[SessionMessage | Exception], + write_stream: WriteStream[SessionMessage], initialization_options: InitializationOptions, # When False, exceptions are returned as messages to the client. # When True, exceptions are raised, which will cause the server to shut down @@ -390,7 +391,13 @@ async def run( async for message in session.incoming_messages: logger.debug("Received message: %s", message) - tg.start_soon( + if isinstance(message, RequestResponder) and message.context is not None: + context = message.context + else: + context = contextvars.copy_context() + + context.run( + tg.start_soon, self._handle_message, message, session, diff --git a/src/mcp/server/session.py b/src/mcp/server/session.py index 759d2131a..2d75e9b9b 100644 --- a/src/mcp/server/session.py +++ b/src/mcp/server/session.py @@ -33,13 +33,14 @@ async def handle_list_prompts(ctx: RequestContext, params) -> ListPromptsResult: import anyio import anyio.lowlevel -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from anyio.streams.memory import MemoryObjectReceiveStream from pydantic import AnyUrl, TypeAdapter from mcp import types from mcp.server.experimental.session_features import ExperimentalServerSessionFeatures from mcp.server.models import InitializationOptions from mcp.server.validation import validate_sampling_tools, validate_tool_use_result_messages +from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import StatelessModeNotSupported from mcp.shared.experimental.tasks.capabilities import check_tasks_capability from mcp.shared.experimental.tasks.helpers import RELATED_TASK_METADATA_KEY @@ -79,8 +80,8 @@ class ServerSession( def __init__( self, - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], - write_stream: MemoryObjectSendStream[SessionMessage], + read_stream: ReadStream[SessionMessage | Exception], + write_stream: WriteStream[SessionMessage], init_options: InitializationOptions, stateless: bool = False, ) -> None: diff --git a/src/mcp/server/sse.py b/src/mcp/server/sse.py index 9dcee67f7..48192ff61 100644 --- a/src/mcp/server/sse.py +++ b/src/mcp/server/sse.py @@ -43,7 +43,6 @@ async def handle_sse(request): from uuid import UUID, uuid4 import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from pydantic import ValidationError from sse_starlette import EventSourceResponse from starlette.requests import Request @@ -55,6 +54,7 @@ async def handle_sse(request): TransportSecurityMiddleware, TransportSecuritySettings, ) +from mcp.shared._context_streams import ContextSendStream, create_context_streams from mcp.shared.message import ServerMessageMetadata, SessionMessage logger = logging.getLogger(__name__) @@ -72,7 +72,7 @@ class SseServerTransport: """ _endpoint: str - _read_stream_writers: dict[UUID, MemoryObjectSendStream[SessionMessage | Exception]] + _read_stream_writers: dict[UUID, ContextSendStream[SessionMessage | Exception]] _security: TransportSecurityMiddleware def __init__(self, endpoint: str, security_settings: TransportSecuritySettings | None = None) -> None: @@ -129,14 +129,9 @@ async def connect_sse(self, scope: Scope, receive: Receive, send: Send): # prag raise ValueError("Request validation failed") logger.debug("Setting up SSE connection") - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] - read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - write_stream: MemoryObjectSendStream[SessionMessage] - write_stream_reader: MemoryObjectReceiveStream[SessionMessage] - - read_stream_writer, read_stream = anyio.create_memory_object_stream(0) - write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + write_stream, write_stream_reader = create_context_streams[SessionMessage](0) session_id = uuid4() self._read_stream_writers[session_id] = read_stream_writer diff --git a/src/mcp/server/stdio.py b/src/mcp/server/stdio.py index e526bab56..119676b18 100644 --- a/src/mcp/server/stdio.py +++ b/src/mcp/server/stdio.py @@ -23,9 +23,9 @@ async def run_server(): import anyio import anyio.lowlevel -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from mcp import types +from mcp.shared._context_streams import create_context_streams from mcp.shared.message import SessionMessage @@ -43,14 +43,8 @@ async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio. if not stdout: stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8")) - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] - read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - - write_stream: MemoryObjectSendStream[SessionMessage] - write_stream_reader: MemoryObjectReceiveStream[SessionMessage] - - read_stream_writer, read_stream = anyio.create_memory_object_stream(0) - write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + write_stream, write_stream_reader = create_context_streams[SessionMessage](0) async def stdin_reader(): try: diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index aa99e7c88..f14201857 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -25,6 +25,8 @@ from starlette.types import Receive, Scope, Send from mcp.server.transport_security import TransportSecurityMiddleware, TransportSecuritySettings +from mcp.shared._context_streams import ContextReceiveStream, ContextSendStream, create_context_streams +from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.message import ServerMessageMetadata, SessionMessage from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS from mcp.types import ( @@ -119,10 +121,10 @@ class StreamableHTTPServerTransport: """ # Server notification streams for POST requests as well as standalone SSE stream - _read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] | None = None - _read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] | None = None - _write_stream: MemoryObjectSendStream[SessionMessage] | None = None - _write_stream_reader: MemoryObjectReceiveStream[SessionMessage] | None = None + _read_stream_writer: ContextSendStream[SessionMessage | Exception] | None = None + _read_stream: ContextReceiveStream[SessionMessage | Exception] | None = None + _write_stream: ContextSendStream[SessionMessage] | None = None + _write_stream_reader: ContextReceiveStream[SessionMessage] | None = None _security: TransportSecurityMiddleware def __init__( @@ -954,8 +956,8 @@ async def connect( self, ) -> AsyncGenerator[ tuple[ - MemoryObjectReceiveStream[SessionMessage | Exception], - MemoryObjectSendStream[SessionMessage], + ReadStream[SessionMessage | Exception], + WriteStream[SessionMessage], ], None, ]: @@ -967,8 +969,8 @@ async def connect( # Create the memory streams for this connection - read_stream_writer, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](0) - write_stream, write_stream_reader = anyio.create_memory_object_stream[SessionMessage](0) + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + write_stream, write_stream_reader = create_context_streams[SessionMessage](0) # Store the streams self._read_stream_writer = read_stream_writer diff --git a/src/mcp/server/websocket.py b/src/mcp/server/websocket.py index 3e675da5f..a1fe64d40 100644 --- a/src/mcp/server/websocket.py +++ b/src/mcp/server/websocket.py @@ -1,12 +1,12 @@ from contextlib import asynccontextmanager import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from pydantic_core import ValidationError from starlette.types import Receive, Scope, Send from starlette.websockets import WebSocket from mcp import types +from mcp.shared._context_streams import create_context_streams from mcp.shared.message import SessionMessage @@ -19,14 +19,8 @@ async def websocket_server(scope: Scope, receive: Receive, send: Send): websocket = WebSocket(scope, receive, send) await websocket.accept(subprotocol="mcp") - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] - read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] - - write_stream: MemoryObjectSendStream[SessionMessage] - write_stream_reader: MemoryObjectReceiveStream[SessionMessage] - - read_stream_writer, read_stream = anyio.create_memory_object_stream(0) - write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) + write_stream, write_stream_reader = create_context_streams[SessionMessage](0) async def ws_reader(): try: diff --git a/src/mcp/shared/_context_streams.py b/src/mcp/shared/_context_streams.py new file mode 100644 index 000000000..8c3e98df4 --- /dev/null +++ b/src/mcp/shared/_context_streams.py @@ -0,0 +1,128 @@ +"""Context-aware memory stream wrappers. + +anyio memory streams do not propagate ``contextvars.Context`` across task +boundaries. These thin wrappers capture the sender's context at ``send()`` +time and expose it on the receive side via ``last_context``, so consumers +can restore it with ``ctx.run(handler, item)``. + +The iteration interface is unchanged (yields ``T``, not tuples), keeping +these wrappers duck-type compatible with plain ``MemoryObjectSendStream`` +and ``MemoryObjectReceiveStream``. +""" + +from __future__ import annotations + +import contextvars +from types import TracebackType +from typing import Any, Generic, TypeVar + +import anyio +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream + +T = TypeVar("T") + +# Internal payload carried through the underlying raw stream. +_Envelope = tuple[contextvars.Context, T] + + +class ContextSendStream(Generic[T]): + """Send-side wrapper that snapshots ``contextvars.copy_context()`` on every ``send()``.""" + + __slots__ = ("_inner",) + + def __init__(self, inner: MemoryObjectSendStream[_Envelope[T]]) -> None: + self._inner = inner + + async def send(self, item: T) -> None: + await self._inner.send((contextvars.copy_context(), item)) + + def close(self) -> None: + self._inner.close() + + async def aclose(self) -> None: + await self._inner.aclose() + + def clone(self) -> ContextSendStream[T]: # pragma: no cover + return ContextSendStream(self._inner.clone()) + + async def __aenter__(self) -> ContextSendStream[T]: + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: + await self.aclose() + return None + + +class ContextReceiveStream(Generic[T]): + """Receive-side wrapper that yields ``T`` and stores the sender's context in ``last_context``.""" + + __slots__ = ("_inner", "last_context") + + def __init__(self, inner: MemoryObjectReceiveStream[_Envelope[T]]) -> None: + self._inner = inner + self.last_context: contextvars.Context | None = None + + async def receive(self) -> T: + ctx, item = await self._inner.receive() + self.last_context = ctx + return item + + def close(self) -> None: + self._inner.close() + + async def aclose(self) -> None: + await self._inner.aclose() + + def clone(self) -> ContextReceiveStream[T]: # pragma: no cover + return ContextReceiveStream(self._inner.clone()) + + def __aiter__(self) -> ContextReceiveStream[T]: + return self + + async def __anext__(self) -> T: + try: + return await self.receive() + except anyio.EndOfStream: + raise StopAsyncIteration + + async def __aenter__(self) -> ContextReceiveStream[T]: + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: + await self.aclose() + return None + + +def _create_context_streams( + max_buffer_size: float = 0, +) -> tuple[ContextSendStream[Any], ContextReceiveStream[Any]]: + raw_send: MemoryObjectSendStream[Any] + raw_receive: MemoryObjectReceiveStream[Any] + raw_send, raw_receive = anyio.create_memory_object_stream(max_buffer_size) + return ContextSendStream(raw_send), ContextReceiveStream(raw_receive) + + +class _CreateContextStreams: + """Callable that supports ``create_context_streams[T](n)`` bracket syntax. + + Matches anyio's ``create_memory_object_stream`` API style. + """ + + def __getitem__(self, _item: Any) -> _CreateContextStreams: + return self + + def __call__(self, max_buffer_size: float = 0) -> tuple[ContextSendStream[Any], ContextReceiveStream[Any]]: + return _create_context_streams(max_buffer_size) + + +create_context_streams = _CreateContextStreams() diff --git a/src/mcp/shared/_stream_protocols.py b/src/mcp/shared/_stream_protocols.py new file mode 100644 index 000000000..f7da4fc20 --- /dev/null +++ b/src/mcp/shared/_stream_protocols.py @@ -0,0 +1,51 @@ +"""Stream protocols for MCP transports. + +These are general-purpose protocols satisfied by both ``MemoryObjectSendStream``/ +``MemoryObjectReceiveStream`` and the context-aware wrappers in ``_context_streams``. +""" + +from __future__ import annotations + +from types import TracebackType +from typing import Protocol, TypeVar, runtime_checkable + +from typing_extensions import Self + +T_co = TypeVar("T_co", covariant=True) +T_contra = TypeVar("T_contra", contravariant=True) + + +@runtime_checkable +class ReadStream(Protocol[T_co]): # pragma: no branch + """Protocol for reading items from a stream. + + Consumers that need the sender's context should use + ``getattr(stream, 'last_context', None)``. + """ + + async def receive(self) -> T_co: ... # pragma: no branch + async def aclose(self) -> None: ... # pragma: no branch + def __aiter__(self) -> ReadStream[T_co]: ... # pragma: no branch + async def __anext__(self) -> T_co: ... # pragma: no branch + async def __aenter__(self) -> Self: ... # pragma: no branch + async def __aexit__( # pragma: no branch + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: ... + + +@runtime_checkable +class WriteStream(Protocol[T_contra]): # pragma: no branch + """Protocol for writing items to a stream.""" + + async def send(self, item: T_contra, /) -> None: ... # pragma: no branch + async def aclose(self) -> None: ... # pragma: no branch + async def __aenter__(self) -> Self: ... # pragma: no branch + async def __aexit__( # pragma: no branch + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: ... diff --git a/src/mcp/shared/memory.py b/src/mcp/shared/memory.py index f2d5e2b9a..1f2e05cc3 100644 --- a/src/mcp/shared/memory.py +++ b/src/mcp/shared/memory.py @@ -5,12 +5,10 @@ from collections.abc import AsyncGenerator from contextlib import asynccontextmanager -import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream - +from mcp.shared._context_streams import ContextReceiveStream, ContextSendStream, create_context_streams from mcp.shared.message import SessionMessage -MessageStream = tuple[MemoryObjectReceiveStream[SessionMessage | Exception], MemoryObjectSendStream[SessionMessage]] +MessageStream = tuple[ContextReceiveStream[SessionMessage | Exception], ContextSendStream[SessionMessage]] @asynccontextmanager @@ -22,8 +20,8 @@ async def create_client_server_memory_streams() -> AsyncGenerator[tuple[MessageS (read_stream, write_stream) """ # Create streams for both directions - server_to_client_send, server_to_client_receive = anyio.create_memory_object_stream[SessionMessage | Exception](1) - client_to_server_send, client_to_server_receive = anyio.create_memory_object_stream[SessionMessage | Exception](1) + server_to_client_send, server_to_client_receive = create_context_streams[SessionMessage | Exception](1) + client_to_server_send, client_to_server_receive = create_context_streams[SessionMessage | Exception](1) client_streams = (server_to_client_receive, client_to_server_send) server_streams = (client_to_server_receive, server_to_client_send) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 9364abb73..22a604c82 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextvars import logging from collections.abc import Callable from contextlib import AsyncExitStack @@ -7,10 +8,11 @@ from typing import Any, Generic, Protocol, TypeVar import anyio -from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream +from anyio.streams.memory import MemoryObjectSendStream from pydantic import BaseModel, TypeAdapter from typing_extensions import Self +from mcp.shared._stream_protocols import ReadStream, WriteStream from mcp.shared.exceptions import MCPError from mcp.shared.message import MessageMetadata, ServerMessageMetadata, SessionMessage from mcp.shared.response_router import ResponseRouter @@ -79,11 +81,13 @@ def __init__( session: BaseSession[SendRequestT, SendNotificationT, SendResultT, ReceiveRequestT, ReceiveNotificationT], on_complete: Callable[[RequestResponder[ReceiveRequestT, SendResultT]], Any], message_metadata: MessageMetadata = None, + context: contextvars.Context | None = None, ) -> None: self.request_id = request_id self.request_meta = request_meta self.request = request self.message_metadata = message_metadata + self.context = context self._session = session self._completed = False self._cancel_scope = anyio.CancelScope() @@ -181,8 +185,8 @@ class BaseSession( def __init__( self, - read_stream: MemoryObjectReceiveStream[SessionMessage | Exception], - write_stream: MemoryObjectSendStream[SessionMessage], + read_stream: ReadStream[SessionMessage | Exception], + write_stream: WriteStream[SessionMessage], # If none, reading will never time out read_timeout_seconds: float | None = None, ) -> None: @@ -333,10 +337,12 @@ def _receive_notification_adapter(self) -> TypeAdapter[ReceiveNotificationT]: async def _receive_loop(self) -> None: async with self._read_stream, self._write_stream: try: - async for message in self._read_stream: - if isinstance(message, Exception): - await self._handle_incoming(message) - elif isinstance(message.message, JSONRPCRequest): + + async def _handle_session_message( + message: SessionMessage, + sender_context: contextvars.Context | None = None, + ) -> None: + if isinstance(message.message, JSONRPCRequest): try: validated_request = self._receive_request_adapter.validate_python( message.message.model_dump(by_alias=True, mode="json", exclude_none=True), @@ -349,6 +355,7 @@ async def _receive_loop(self) -> None: session=self, on_complete=lambda r: self._in_flight.pop(r.request_id, None), message_metadata=message.metadata, + context=sender_context, ) self._in_flight[responder.request_id] = responder await self._received_request(responder) @@ -406,6 +413,14 @@ async def _receive_loop(self) -> None: else: # Response or error await self._handle_response(message) + async for message in self._read_stream: + if isinstance(message, Exception): + await self._handle_incoming(message) + continue + + sender_ctx: contextvars.Context | None = getattr(self._read_stream, "last_context", None) + await _handle_session_message(message, sender_context=sender_ctx) + except anyio.ClosedResourceError: # This is expected when the client disconnects abruptly. # Without this handler, the exception would propagate up and diff --git a/tests/client/conftest.py b/tests/client/conftest.py index 2e39f1363..081e1d68e 100644 --- a/tests/client/conftest.py +++ b/tests/client/conftest.py @@ -4,15 +4,15 @@ from unittest.mock import patch import pytest -from anyio.streams.memory import MemoryObjectSendStream import mcp.shared.memory +from mcp.client._transport import WriteStream from mcp.shared.message import SessionMessage from mcp.types import JSONRPCNotification, JSONRPCRequest class SpyMemoryObjectSendStream: - def __init__(self, original_stream: MemoryObjectSendStream[SessionMessage]): + def __init__(self, original_stream: WriteStream[SessionMessage]): self.original_stream = original_stream self.sent_messages: list[SessionMessage] = [] diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index f8ca30441..3d5770fb6 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -45,6 +45,7 @@ from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings from mcp.shared._context import RequestContext +from mcp.shared._context_streams import create_context_streams from mcp.shared._httpx_utils import ( MCP_DEFAULT_SSE_READ_TIMEOUT, MCP_DEFAULT_TIMEOUT, @@ -1783,8 +1784,8 @@ async def test_handle_sse_event_skips_empty_data(): # Create a mock SSE event with empty data (keep-alive ping) mock_sse = ServerSentEvent(event="message", data="", id=None, retry=None) - # Create a mock stream writer - write_stream, read_stream = anyio.create_memory_object_stream[SessionMessage | Exception](1) + # Create a context-aware stream writer (matches StreamWriter type alias) + write_stream, read_stream = create_context_streams[SessionMessage | Exception](1) try: # Call _handle_sse_event with empty data - should return False and not raise @@ -1794,8 +1795,9 @@ async def test_handle_sse_event_skips_empty_data(): assert result is False # Nothing should have been written to the stream - # Check buffer is empty (statistics().current_buffer_used returns buffer size) - assert write_stream.statistics().current_buffer_used == 0 + with pytest.raises(TimeoutError): + with anyio.fail_after(0): + await read_stream.receive() finally: await write_stream.aclose() await read_stream.aclose()