Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions src/mcp/server/lowlevel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,16 +387,23 @@ async def run(
await stack.enter_async_context(task_support.run())

async with anyio.create_task_group() as tg:
async for message in session.incoming_messages:
logger.debug("Received message: %s", message)

tg.start_soon(
self._handle_message,
message,
session,
lifespan_context,
raise_exceptions,
)
try:
async for message in session.incoming_messages:
logger.debug("Received message: %s", message)

tg.start_soon(
self._handle_message,
message,
session,
lifespan_context,
raise_exceptions,
)
finally:
# Transport closed: cancel in-flight handlers. Without this the
# TG join waits for them, and when they eventually try to
# respond they hit a closed write stream (the session's
# _receive_loop closed it when the read stream ended).
tg.cancel_scope.cancel()

async def _handle_message(
self,
Expand Down Expand Up @@ -470,16 +477,32 @@ async def _handle_request(
except MCPError as err:
response = err.error
except anyio.get_cancelled_exc_class():
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
return
if message.cancelled:
# Client sent CancelledNotification; responder.cancel() already
# sent an error response, so skip the duplicate.
logger.info("Request %s cancelled - duplicate response suppressed", message.request_id)
return
# Transport-close cancellation from the TG in run(); re-raise so the
# TG swallows its own cancellation.
raise
except Exception as err:
if raise_exceptions: # pragma: no cover
raise err
response = types.ErrorData(code=0, message=str(err))
else: # pragma: no cover
response = types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found")

try:
await message.respond(response)
else: # pragma: no cover
await message.respond(types.ErrorData(code=types.METHOD_NOT_FOUND, message="Method not found"))
except (anyio.BrokenResourceError, anyio.ClosedResourceError):
# Transport closed between handler unblocking and respond. Happens
# when _receive_loop's finally wakes a handler blocked on
# send_request: the handler runs to respond() before run()'s TG
# cancel fires, but after the write stream closed. Closed if our
# end closed (_receive_loop's async-with exit); Broken if the peer
# end closed first (streamable_http terminate()).
logger.debug("Response for %s dropped - transport closed", message.request_id)
return

logger.debug("Response sent")

Expand Down
6 changes: 4 additions & 2 deletions src/mcp/shared/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __exit__(
) -> None:
"""Exit the context manager, performing cleanup and notifying completion."""
try:
if self._completed: # pragma: no branch
if self._completed:
self._on_complete(self)
finally:
self._entered = False
Expand Down Expand Up @@ -418,7 +418,9 @@ async def _receive_loop(self) -> None:
finally:
# after the read stream is closed, we need to send errors
# to any pending requests
for id, stream in self._response_streams.items():
# Snapshot: stream.send() wakes the waiter, whose finally pops
# from _response_streams before the next __next__() call.
for id, stream in list(self._response_streams.items()):
error = ErrorData(code=CONNECTION_CLOSED, message="Connection closed")
try:
await stream.send(JSONRPCError(jsonrpc="2.0", id=id, error=error))
Expand Down
158 changes: 158 additions & 0 deletions tests/server/test_cancel_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,19 @@
from mcp import Client
from mcp.server import Server, ServerRequestContext
from mcp.shared.exceptions import MCPError
from mcp.shared.message import SessionMessage
from mcp.types import (
LATEST_PROTOCOL_VERSION,
CallToolRequest,
CallToolRequestParams,
CallToolResult,
CancelledNotification,
CancelledNotificationParams,
ClientCapabilities,
Implementation,
InitializeRequestParams,
JSONRPCNotification,
JSONRPCRequest,
ListToolsResult,
PaginatedRequestParams,
TextContent,
Expand Down Expand Up @@ -90,3 +97,154 @@ async def first_request():
assert isinstance(content, TextContent)
assert content.text == "Call number: 2"
assert call_count == 2


@pytest.mark.anyio
async def test_server_cancels_in_flight_handlers_on_transport_close():
"""When the transport closes mid-request, server.run() must cancel in-flight
handlers rather than join on them.

Without the cancel, the task group waits for the handler, which then tries
to respond through a write stream that _receive_loop already closed,
raising ClosedResourceError and crashing server.run() with exit code 1.

This drives server.run() with raw memory streams because InMemoryTransport
wraps it in its own finally-cancel (_memory.py) which masks the bug.
"""
handler_started = anyio.Event()
handler_cancelled = anyio.Event()
server_run_returned = anyio.Event()

async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
handler_started.set()
try:
await anyio.sleep_forever()
finally:
handler_cancelled.set()
# unreachable: sleep_forever only exits via cancellation
raise AssertionError # pragma: no cover

server = Server("test", on_call_tool=handle_call_tool)

to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)

async def run_server():
await server.run(server_read, server_write, server.create_initialization_options())
server_run_returned.set()

init_req = JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=InitializeRequestParams(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=ClientCapabilities(),
client_info=Implementation(name="test", version="1.0"),
).model_dump(by_alias=True, mode="json", exclude_none=True),
)
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")
call_req = JSONRPCRequest(
jsonrpc="2.0",
id=2,
method="tools/call",
params=CallToolRequestParams(name="slow", arguments={}).model_dump(by_alias=True, mode="json"),
)

with anyio.fail_after(5):
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
tg.start_soon(run_server)

await to_server.send(SessionMessage(init_req))
await from_server.receive() # init response
await to_server.send(SessionMessage(initialized))
await to_server.send(SessionMessage(call_req))

await handler_started.wait()

# Close the server's input stream — this is what stdin EOF does.
# server.run()'s incoming_messages loop ends, finally-cancel fires,
# handler gets CancelledError, server.run() returns.
await to_server.aclose()

await server_run_returned.wait()

assert handler_cancelled.is_set()


@pytest.mark.anyio
async def test_server_handles_transport_close_with_pending_server_to_client_requests():
"""When the transport closes while handlers are blocked on server→client
requests (sampling, roots, elicitation), server.run() must still exit cleanly.

Two bugs covered:
1. _receive_loop's finally iterates _response_streams with await checkpoints
inside; the woken handler's send_request finally pops from that dict
before the next __next__() — RuntimeError: dictionary changed size.
2. The woken handler's MCPError is caught in _handle_request, which falls
through to respond() against a write stream _receive_loop already closed.
"""
handlers_started = 0
both_started = anyio.Event()
server_run_returned = anyio.Event()

async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
nonlocal handlers_started
handlers_started += 1
if handlers_started == 2:
both_started.set()
# Blocks on send_request waiting for a client response that never comes.
# _receive_loop's finally will wake this with CONNECTION_CLOSED.
await ctx.session.list_roots()
raise AssertionError # pragma: no cover

server = Server("test", on_call_tool=handle_call_tool)

to_server, server_read = anyio.create_memory_object_stream[SessionMessage | Exception](10)
server_write, from_server = anyio.create_memory_object_stream[SessionMessage](10)

async def run_server():
await server.run(server_read, server_write, server.create_initialization_options())
server_run_returned.set()

init_req = JSONRPCRequest(
jsonrpc="2.0",
id=1,
method="initialize",
params=InitializeRequestParams(
protocol_version=LATEST_PROTOCOL_VERSION,
capabilities=ClientCapabilities(),
client_info=Implementation(name="test", version="1.0"),
).model_dump(by_alias=True, mode="json", exclude_none=True),
)
initialized = JSONRPCNotification(jsonrpc="2.0", method="notifications/initialized")

with anyio.fail_after(5):
async with anyio.create_task_group() as tg, to_server, server_read, server_write, from_server:
tg.start_soon(run_server)

await to_server.send(SessionMessage(init_req))
await from_server.receive() # init response
await to_server.send(SessionMessage(initialized))

# Two tool calls → two handlers → two _response_streams entries.
for rid in (2, 3):
call_req = JSONRPCRequest(
jsonrpc="2.0",
id=rid,
method="tools/call",
params=CallToolRequestParams(name="t", arguments={}).model_dump(by_alias=True, mode="json"),
)
await to_server.send(SessionMessage(call_req))

await both_started.wait()
# Drain the two roots/list requests so send_request's _write_stream.send()
# completes and both handlers are parked at response_stream_reader.receive().
await from_server.receive()
await from_server.receive()

await to_server.aclose()

# Without the fixes: RuntimeError (dict mutation) or ClosedResourceError
# (respond after write-stream close) escapes run_server and this hangs.
await server_run_returned.wait()
Loading