Skip to content

Commit b321dd2

Browse files
committed
Restore correct context propagation for async handlers
ctx.run(async_fn) only sets context during coroutine creation, not execution. For session._receive_loop, pass sender_ctx explicitly to the handler which stores it on RequestResponder.context. For client post_writers (sse/streamable_http), restore ctx.run(tg.start_soon, ...) so the httpx calls actually run in the sender's context.
1 parent 32565b3 commit b321dd2

File tree

3 files changed

+10
-9
lines changed

3 files changed

+10
-9
lines changed

src/mcp/client/sse.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ async def _send_message(session_message: SessionMessage) -> None:
143143
async for session_message in write_stream_reader:
144144
sender_ctx = write_stream_reader.last_context
145145
if sender_ctx is not None:
146-
await sender_ctx.run(_send_message, session_message)
146+
async with anyio.create_task_group() as tg:
147+
sender_ctx.run(tg.start_soon, _send_message, session_message)
147148
else:
148149
await _send_message(session_message) # pragma: no cover
149150
except Exception: # pragma: lax no cover

src/mcp/client/streamable_http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,8 @@ async def handle_request_async():
482482
async for session_message in write_stream_reader:
483483
sender_ctx = write_stream_reader.last_context
484484
if sender_ctx is not None:
485-
await sender_ctx.run(_handle_message, session_message)
485+
async with anyio.create_task_group() as tg_local:
486+
sender_ctx.run(tg_local.start_soon, _handle_message, session_message)
486487
else:
487488
await _handle_message(session_message) # pragma: no cover
488489

src/mcp/shared/session.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,10 @@ async def _receive_loop(self) -> None:
338338
async with self._read_stream, self._write_stream:
339339
try:
340340

341-
async def _handle_session_message(message: SessionMessage) -> None:
341+
async def _handle_session_message(
342+
message: SessionMessage,
343+
sender_context: contextvars.Context | None = None,
344+
) -> None:
342345
if isinstance(message.message, JSONRPCRequest):
343346
try:
344347
validated_request = self._receive_request_adapter.validate_python(
@@ -352,7 +355,7 @@ async def _handle_session_message(message: SessionMessage) -> None:
352355
session=self,
353356
on_complete=lambda r: self._in_flight.pop(r.request_id, None),
354357
message_metadata=message.metadata,
355-
context=contextvars.copy_context(),
358+
context=sender_context,
356359
)
357360
self._in_flight[responder.request_id] = responder
358361
await self._received_request(responder)
@@ -416,11 +419,7 @@ async def _handle_session_message(message: SessionMessage) -> None:
416419
continue
417420

418421
sender_ctx: contextvars.Context | None = getattr(self._read_stream, "last_context", None)
419-
if sender_ctx is not None:
420-
coro = sender_ctx.run(_handle_session_message, message)
421-
await coro
422-
else:
423-
await _handle_session_message(message)
422+
await _handle_session_message(message, sender_context=sender_ctx)
424423

425424
except anyio.ClosedResourceError:
426425
# This is expected when the client disconnects abruptly.

0 commit comments

Comments
 (0)