From f638a9d3588e20d45516bf8fb475f416875416ce Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 18 Mar 2026 15:36:29 +0000 Subject: [PATCH 1/4] fix: cancel in-flight handlers when transport closes in server.run() When the transport closes (stdin EOF, client disconnect) while a request handler is still running, server.run()'s task group joins on the handler instead of cancelling it. The handler eventually finishes, tries to send its response through a write stream that _receive_loop already closed, and server.run() crashes with ClosedResourceError wrapped in a triple-nested ExceptionGroup. The fix cancels the task group when the incoming_messages loop ends. Handlers receive CancelledError and can clean up in finally blocks. The existing CancelledError catch in _handle_request (added for CancelledNotification handling in #1153) now distinguishes the two cancellation sources: responder.cancel() already sent an error response and we skip the duplicate; transport-close cancellation is re-raised so the task group swallows it. Github-Issue: #526 --- src/mcp/server/lowlevel/server.py | 37 ++++++++----- src/mcp/shared/session.py | 2 +- tests/server/test_cancel_handling.py | 80 ++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 13 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 167f34b8b..a24cbc6d7 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -387,16 +387,23 @@ async def run( await stack.enter_async_context(task_support.run()) async with anyio.create_task_group() as tg: - async for message in session.incoming_messages: - logger.debug("Received message: %s", message) - - tg.start_soon( - self._handle_message, - message, - session, - lifespan_context, - raise_exceptions, - ) + try: + async for message in session.incoming_messages: + logger.debug("Received message: %s", message) + + tg.start_soon( + self._handle_message, + message, + session, + lifespan_context, + raise_exceptions, + ) + finally: + # Transport closed: cancel in-flight handlers. Without this the + # TG join waits for them, and when they eventually try to + # respond they hit a closed write stream (the session's + # _receive_loop closed it when the read stream ended). + tg.cancel_scope.cancel() async def _handle_message( self, @@ -470,8 +477,14 @@ async def _handle_request( except MCPError as err: response = err.error except anyio.get_cancelled_exc_class(): - logger.info("Request %s cancelled - duplicate response suppressed", message.request_id) - return + if message.cancelled: + # Client sent CancelledNotification; responder.cancel() already + # sent an error response, so skip the duplicate. + logger.info("Request %s cancelled - duplicate response suppressed", message.request_id) + return + # Transport-close cancellation from the TG in run(); re-raise so the + # TG swallows its own cancellation. + raise except Exception as err: if raise_exceptions: # pragma: no cover raise err diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 9364abb73..0459774ef 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -105,7 +105,7 @@ def __exit__( ) -> None: """Exit the context manager, performing cleanup and notifying completion.""" try: - if self._completed: # pragma: no branch + if self._completed: self._on_complete(self) finally: self._entered = False diff --git a/tests/server/test_cancel_handling.py b/tests/server/test_cancel_handling.py index 297f3d6a5..0789b7c9c 100644 --- a/tests/server/test_cancel_handling.py +++ b/tests/server/test_cancel_handling.py @@ -6,12 +6,19 @@ from mcp import Client from mcp.server import Server, ServerRequestContext from mcp.shared.exceptions import MCPError +from mcp.shared.message import SessionMessage from mcp.types import ( + LATEST_PROTOCOL_VERSION, CallToolRequest, CallToolRequestParams, CallToolResult, CancelledNotification, CancelledNotificationParams, + ClientCapabilities, + Implementation, + InitializeRequestParams, + JSONRPCNotification, + JSONRPCRequest, ListToolsResult, PaginatedRequestParams, TextContent, @@ -90,3 +97,76 @@ async def first_request(): assert isinstance(content, TextContent) assert content.text == "Call number: 2" assert call_count == 2 + + +@pytest.mark.anyio +async def test_server_cancels_in_flight_handlers_on_transport_close(): + """When the transport closes mid-request, server.run() must cancel in-flight + handlers rather than join on them. + + Without the cancel, the task group waits for the handler, which then tries + to respond through a write stream that _receive_loop already closed, + raising ClosedResourceError and crashing server.run() with exit code 1. + + This drives server.run() with raw memory streams because InMemoryTransport + wraps it in its own finally-cancel (_memory.py) which masks the bug. + """ + handler_started = anyio.Event() + handler_cancelled = anyio.Event() + server_run_returned = anyio.Event() + + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + handler_started.set() + try: + await anyio.sleep_forever() + finally: + handler_cancelled.set() + # unreachable: sleep_forever only exits via cancellation + raise AssertionError # pragma: no cover + + server = Server("test", on_call_tool=handle_call_tool) + + to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) + server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + await server.run(server_read, server_write, server.create_initialization_options()) + server_run_returned.set() + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=InitializeRequestParams( + protocol_version=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + client_info=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + call_req = JSONRPCRequest( + jsonrpc="2.0", + id=2, + method="tools/call", + params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"), + ) + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server: + tg.start_soon(run_server) + + await to_server.send(SessionMessage(init_req)) + await from_server.receive() # init response + await to_server.send(SessionMessage(initialized)) + await to_server.send(SessionMessage(call_req)) + + await handler_started.wait() + + # Close the server's input stream — this is what stdin EOF does. + # server.run()'s incoming_messages loop ends, finally-cancel fires, + # handler gets CancelledError, server.run() returns. + await to_server.aclose() + + await server_run_returned.wait() + + assert handler_cancelled.is_set() From 36a974acd8d28969e9b2df8e9bbf6f8fea18d421 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 18 Mar 2026 17:04:03 +0000 Subject: [PATCH 2/4] fix: handle transport close with pending server-to-client requests Two additional races in the same transport-close window as the previous commit, both triggered when handlers are blocked on server-to-client requests (sampling, roots, elicitation) at the moment the transport closes: 1. _receive_loop's finally iterates _response_streams.items() with await checkpoints inside the loop. The woken handler's send_request finally pops from that dict before the iterator's next __next__(), raising RuntimeError: dictionary changed size during iteration. Fix: snapshot with list() before iterating. 2. The woken handler's send_request raises MCPError (CONNECTION_CLOSED), which _handle_request catches and converts to an error response. It then falls through to message.respond() against a write stream that _receive_loop already closed. Fix: catch ClosedResourceError and drop the response. Both reproduce deterministically with two handlers blocked on list_roots() when to_server is closed. Single test covers both: fails 20/20 with either fix reverted, passes 50/50 with both. --- src/mcp/server/lowlevel/server.py | 9 +++- src/mcp/shared/session.py | 4 +- tests/server/test_cancel_handling.py | 78 ++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 2 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index a24cbc6d7..44bb94605 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -490,7 +490,14 @@ async def _handle_request( raise err response = types.ErrorData(code=0, message=str(err)) - await message.respond(response) + try: + await message.respond(response) + except anyio.ClosedResourceError: + # Transport closed between handler unblocking and respond. Happens + # when _receive_loop's finally wakes a handler blocked on + # send_request: the handler runs to respond() before run()'s TG + # cancel fires, but after _receive_loop closed _write_stream. + logger.debug("Response for %s dropped - transport closed", message.request_id) else: # pragma: no cover await message.respond(types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")) diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index 0459774ef..6fc59923f 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -418,7 +418,9 @@ async def _receive_loop(self) -> None: finally: # after the read stream is closed, we need to send errors # to any pending requests - for id, stream in self._response_streams.items(): + # Snapshot: stream.send() wakes the waiter, whose finally pops + # from _response_streams before the next __next__() call. + for id, stream in list(self._response_streams.items()): error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed") try: await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error)) diff --git a/tests/server/test_cancel_handling.py b/tests/server/test_cancel_handling.py index 0789b7c9c..cff5a37c1 100644 --- a/tests/server/test_cancel_handling.py +++ b/tests/server/test_cancel_handling.py @@ -170,3 +170,81 @@ async def run_server(): await server_run_returned.wait() assert handler_cancelled.is_set() + + +@pytest.mark.anyio +async def test_server_handles_transport_close_with_pending_server_to_client_requests(): + """When the transport closes while handlers are blocked on server→client + requests (sampling, roots, elicitation), server.run() must still exit cleanly. + + Two bugs covered: + 1. _receive_loop's finally iterates _response_streams with await checkpoints + inside; the woken handler's send_request finally pops from that dict + before the next __next__() — RuntimeError: dictionary changed size. + 2. The woken handler's MCPError is caught in _handle_request, which falls + through to respond() against a write stream _receive_loop already closed. + """ + handlers_started = 0 + both_started = anyio.Event() + server_run_returned = anyio.Event() + + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + nonlocal handlers_started + handlers_started += 1 + if handlers_started == 2: + both_started.set() + # Blocks on send_request waiting for a client response that never comes. + # _receive_loop's finally will wake this with CONNECTION_CLOSED. + await ctx.session.list_roots() + raise AssertionError # pragma: no cover + + server = Server("test", on_call_tool=handle_call_tool) + + to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10) + server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10) + + async def run_server(): + await server.run(server_read, server_write, server.create_initialization_options()) + server_run_returned.set() + + init_req = JSONRPCRequest( + jsonrpc="2.0", + id=1, + method="initialize", + params=InitializeRequestParams( + protocol_version=LATEST_PROTOCOL_VERSION, + capabilities=ClientCapabilities(), + client_info=Implementation(name="test", version="1.0"), + ).model_dump(by_alias=True, mode="json", exclude_none=True), + ) + initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized") + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server: + tg.start_soon(run_server) + + await to_server.send(SessionMessage(init_req)) + await from_server.receive() # init response + await to_server.send(SessionMessage(initialized)) + + # Two tool calls → two handlers → two _response_streams entries. + for rid in (2, 3): + call_req = JSONRPCRequest( + jsonrpc="2.0", + id=rid, + method="tools/call", + params=CallToolRequestParams(name="t", arguments={}).model_dump(by_alias=True, mode="json"), + ) + await to_server.send(SessionMessage(call_req)) + + await both_started.wait() + # Drain the two roots/list requests so send_request's _write_stream.send() + # completes and both handlers are parked at response_stream_reader.receive(). + await from_server.receive() + await from_server.receive() + + await to_server.aclose() + + # Without the fixes: RuntimeError (dict mutation) or ClosedResourceError + # (respond after write-stream close) escapes run_server and this hangs. + await server_run_returned.wait() From e1d712dd1902037ae5f1a37219b16541880e48bd Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:09:18 +0000 Subject: [PATCH 3/4] refactor: unify respond() call after if/else to avoid 3.14 phantom trace Python 3.14's compiler attributes the async trampoline's CLEANUP_THROW instructions (for the try-body's await) to the next physical line of code, which was the else body. coverage.py traced a phantom line event there, tripping strict-no-cover even though the else never runs. Moving the try/respond after the if/else avoids the misattribution and also deduplicates the two respond() calls. --- src/mcp/server/lowlevel/server.py | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 44bb94605..74a0b5267 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -489,17 +489,18 @@ async def _handle_request( if raise_exceptions: # pragma: no cover raise err response = types.ErrorData(code=0, message=str(err)) - - try: - await message.respond(response) - except anyio.ClosedResourceError: - # Transport closed between handler unblocking and respond. Happens - # when _receive_loop's finally wakes a handler blocked on - # send_request: the handler runs to respond() before run()'s TG - # cancel fires, but after _receive_loop closed _write_stream. - logger.debug("Response for %s dropped - transport closed", message.request_id) else: # pragma: no cover - await message.respond(types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")) + response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found") + + try: + await message.respond(response) + except anyio.ClosedResourceError: + # Transport closed between handler unblocking and respond. Happens + # when _receive_loop's finally wakes a handler blocked on + # send_request: the handler runs to respond() before run()'s TG + # cancel fires, but after _receive_loop closed _write_stream. + logger.debug("Response for %s dropped - transport closed", message.request_id) + return logger.debug("Response sent") From 328bb0b411119567ad9250eadc38893a3138868b Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Wed, 18 Mar 2026 18:17:26 +0000 Subject: [PATCH 4/4] fix: also catch BrokenResourceError from respond() for SHTTP terminate() streamable_http's terminate() closes _write_stream_reader (the receive end) before _write_stream (the send end). A handler reaching respond() between those two closes gets BrokenResourceError (peer end closed) rather than ClosedResourceError (our end closed). The stdio path only ever hits ClosedResourceError because _receive_loop's async-with closes the send end. --- src/mcp/server/lowlevel/server.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 74a0b5267..c28842272 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -494,11 +494,13 @@ async def _handle_request( try: await message.respond(response) - except anyio.ClosedResourceError: + except (anyio.BrokenResourceError, anyio.ClosedResourceError): # Transport closed between handler unblocking and respond. Happens # when _receive_loop's finally wakes a handler blocked on # send_request: the handler runs to respond() before run()'s TG - # cancel fires, but after _receive_loop closed _write_stream. + # cancel fires, but after the write stream closed. Closed if our + # end closed (_receive_loop's async-with exit); Broken if the peer + # end closed first (streamable_http terminate()). logger.debug("Response for %s dropped - transport closed", message.request_id) return