Skip to content

Propagate contextvars.Context through anyio streams without modifying SessionMessage#2298

Open
Kludex wants to merge 5 commits intomainfrom
try-no-context-field
Open

Propagate contextvars.Context through anyio streams without modifying SessionMessage#2298
Kludex wants to merge 5 commits intomainfrom
try-no-context-field

Conversation

@Kludex
Copy link
Member

@Kludex Kludex commented Mar 16, 2026

Summary

Alternative approach to #1996 that propagates contextvars.Context through anyio memory streams without adding any field to SessionMessage, addressing Kludex's review comment.

  • Introduces ContextSendStream / ContextReceiveStream wrappers that capture the sender's context at send() time and expose it via last_context on the receive side
  • Replaces concrete MemoryObjectSendStream / MemoryObjectReceiveStream type annotations with Protocol-based ReadStream / WriteStream, so both raw and context-aware streams are accepted
  • All transport stream creation sites use create_context_streams[T](n) (mirrors anyio's bracket syntax)
  • BaseSession._receive_loop and client post_writer functions restore sender context via ctx.run(tg.start_soon, handler, message)
  • RequestResponder carries context for the session-to-server handler boundary
  • Zero test files changed for stream creation (tests use raw streams via create_client_server_memory_streams which now returns context-aware streams)

Verified with OpenTelemetry: trace IDs propagate correctly from client spans through to server tool handlers, each call carries its own context, and no context leaks between calls.

Fixes #1969
Part of #421

Test plan

  • All 1139 existing tests pass (0 modified for stream creation)
  • pyright src/ passes with 0 errors
  • OTel trace propagation verified: client span trace_id reaches server handler
  • Context isolation verified: distinct spans produce distinct traces, no leaking

Kludex added 2 commits March 16, 2026 08:49
… SessionMessage

Introduce context-aware stream wrappers (ContextSendStream / ContextReceiveStream)
that capture the sender's contextvars.Context at send() time and expose it on the
receive side via last_context. This enables OpenTelemetry trace propagation,
per-request auth via ContextVars, and other context-dependent use cases across the
anyio memory stream boundary - without adding any field to SessionMessage.

Key changes:
- New _context_streams module with ContextSendStream, ContextReceiveStream, and
  create_context_streams factory (mirrors anyio's bracket syntax API)
- Protocol-based ReadStream/WriteStream in _transport.py, replacing concrete
  MemoryObjectReceiveStream/MemoryObjectSendStream in all parameter types
- All transport stream creation sites use create_context_streams
- BaseSession._receive_loop and client post_writers restore sender context
  via ctx.run(tg.start_soon, handler, message)
- RequestResponder carries context for the session-to-server handler boundary

Github-Issue:#1996
Comment on lines +116 to +129
class _CreateContextStreams:
"""Callable that supports ``create_context_streams[T](n)`` bracket syntax.

Matches anyio's ``create_memory_object_stream`` API style.
"""

def __getitem__(self, _item: Any) -> _CreateContextStreams:
return self

def __call__(self, max_buffer_size: float = 0) -> tuple[ContextSendStream[Any], ContextReceiveStream[Any]]:
return _create_context_streams(max_buffer_size)


create_context_streams = _CreateContextStreams()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Nit: T_Item TypeVar at line 23 is defined but never used (only T is used throughout). Additionally, _CreateContextStreams.__getitem__ returns unparameterized self and __call__ returns tuple[ContextSendStream[Any], ContextReceiveStream[Any]], so create_context_streams[SessionMessage](0) silently discards the type parameter — a static type safety regression from the original anyio.create_memory_object_stream[T]() which properly propagates T to type checkers.

Extended reasoning...

Unused TypeVar

T_Item = TypeVar("T_Item") is defined at line 23 of _context_streams.py but is never referenced anywhere in the file. Only T is used in Generic[T] for ContextSendStream, ContextReceiveStream, and _Envelope. This is leftover dead code from an earlier iteration that should be removed.

Type parameter silently discarded

The _CreateContextStreams class (lines 116-129) implements __getitem__ and __call__ to mimic anyio's create_memory_object_stream[T](n) bracket syntax. However, the type parameter is completely ignored:

def __getitem__(self, _item: Any) -> _CreateContextStreams:
    return self  # type parameter discarded

def __call__(self, ...) -> tuple[ContextSendStream[Any], ContextReceiveStream[Any]]:
    return _create_context_streams(max_buffer_size)  # always returns Any

__getitem__ returns the unparameterized self (not a generic alias), and __call__ always returns Any-typed streams. This means create_context_streams[SessionMessage](0) and create_context_streams[int](0) return identical types to static type checkers.

Concrete proof

Consider the call site in sse.py:

read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0)

Step-by-step:

  1. create_context_streams is an instance of _CreateContextStreams
  2. create_context_streams[SessionMessage | Exception] calls __getitem__(SessionMessage | Exception) → returns self (same _CreateContextStreams instance, no type info retained)
  3. (0) calls __call__(0) → returns tuple[ContextSendStream[Any], ContextReceiveStream[Any]]

The SessionMessage | Exception type parameter is silently lost. In contrast, anyio's create_memory_object_stream is a class inheriting from tuple[MemoryObjectSendStream[T_Item], MemoryObjectReceiveStream[T_Item]], so create_memory_object_stream[SessionMessage](0) correctly returns tuple[MemoryObjectSendStream[SessionMessage], MemoryObjectReceiveStream[SessionMessage]] to type checkers.

Impact

This is purely a static typing concern — there is zero runtime impact, and pyright passes with 0 errors because Any is compatible with everything. The bracket syntax creates a false impression of type safety. Call sites happen to work because variables are assigned to concretely typed fields, providing sufficient type context.

Fix

Remove the unused T_Item TypeVar. For the type safety regression, either use @overload declarations or make _CreateContextStreams properly generic (e.g., using __class_getitem__ to return a properly parameterized callable, similar to anyio's approach of inheriting from tuple).

Kludex added 2 commits March 16, 2026 09:13
… widening

- Replace task group + ctx.run(tg.start_soon, ...) with direct
  await sender_ctx.run(handler, msg) to avoid ExceptionGroup wrapping
  that would prevent ClosedResourceError from being caught
- Move ReadStream/WriteStream protocols to mcp.shared._stream_protocols
  so shared/server modules don't depend on client internals
- Restore write stream type narrowing in MessageStream (SessionMessage
  only, not SessionMessage | Exception)
- Remove unused T_Item TypeVar
ctx.run(async_fn) only sets context during coroutine creation, not
execution. For session._receive_loop, pass sender_ctx explicitly to
the handler which stores it on RequestResponder.context. For client
post_writers (sse/streamable_http), restore ctx.run(tg.start_soon, ...)
so the httpx calls actually run in the sender's context.
Copy link

@claude claude bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All my previous bug reports have been addressed by commits 32565b3 and b321dd2 — nice work. However, this is a large PR (18 files) introducing new core abstractions for context propagation that warrants human review of the overall design before merging.

Extended reasoning...

Overview

This PR adds contextvars.Context propagation through anyio memory streams to support OpenTelemetry trace propagation from client to server handlers. It introduces two new modules (_context_streams.py and _stream_protocols.py), modifies BaseSession._receive_loop and RequestResponder to carry context, updates all transport stream creation sites (stdio, SSE, WebSocket, StreamableHTTP), and changes type annotations from concrete MemoryObjectSendStream/MemoryObjectReceiveStream to protocol-based ReadStream/WriteStream.

Security risks

No security concerns identified. The changes are purely about context propagation for observability — no auth, crypto, or permission logic is touched.

Level of scrutiny

This PR touches core infrastructure code in shared/session.py (the _receive_loop and RequestResponder), all transport implementations, and introduces new abstractions that every transport now depends on. The design decisions — using envelope-based context capture in streams, protocol-based stream typing, explicit context parameter passing — are significant architectural choices that should be reviewed by a maintainer familiar with the codebase. A human should verify that the OTel propagation chain actually works end-to-end and that the new stream protocols are the right abstraction for the long term.

Other factors

All previously identified bugs have been fixed in follow-up commits. The author addressed feedback about ExceptionGroup wrapping, Context.run() with async functions, reverse dependencies, type widening, and unused TypeVars. Tests pass and pyright reports 0 errors. The code quality is solid, but the scope and architectural impact warrant human oversight.

Comment on lines +19 to +51
class ReadStream(Protocol[T_co]): # pragma: no branch
"""Protocol for reading items from a stream.

Consumers that need the sender's context should use
``getattr(stream, 'last_context', None)``.
"""

async def receive(self) -> T_co: ... # pragma: no branch
async def aclose(self) -> None: ... # pragma: no branch
def __aiter__(self) -> ReadStream[T_co]: ... # pragma: no branch
async def __anext__(self) -> T_co: ... # pragma: no branch
async def __aenter__(self) -> Self: ... # pragma: no branch
async def __aexit__( # pragma: no branch
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None: ...


@runtime_checkable
class WriteStream(Protocol[T_contra]): # pragma: no branch
"""Protocol for writing items to a stream."""

async def send(self, item: T_contra, /) -> None: ... # pragma: no branch
async def aclose(self) -> None: ... # pragma: no branch
async def __aenter__(self) -> Self: ... # pragma: no branch
async def __aexit__( # pragma: no branch
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool | None: ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are there a bunch of no branch pragmas here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Propagate ContextVars to Transport Layer in MCP Clients

2 participants