Skip to content

Commit e1fd62e

Browse files
authored
fix: close all memory stream ends in client transport cleanup (#2266)
1 parent 2c73a2a commit e1fd62e

File tree

7 files changed

+117
-12
lines changed

7 files changed

+117
-12
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ classifiers = [
2525
"Programming Language :: Python :: 3.14",
2626
]
2727
dependencies = [
28-
"anyio>=4.5",
28+
"anyio>=4.9",
2929
"httpx>=0.27.1",
3030
"httpx-sse>=0.4",
3131
"pydantic>=2.12.0",

src/mcp/client/sse.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,5 @@ async def post_writer(endpoint_url: str):
160160
finally:
161161
await read_stream_writer.aclose()
162162
await write_stream.aclose()
163+
await read_stream.aclose()
164+
await write_stream_reader.aclose()

src/mcp/client/streamable_http.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,3 +577,5 @@ def start_get_stream() -> None:
577577
finally:
578578
await read_stream_writer.aclose()
579579
await write_stream.aclose()
580+
await read_stream.aclose()
581+
await write_stream_reader.aclose()

src/mcp/client/websocket.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,10 @@ async def websocket_client(
3838
write_stream: MemoryObjectSendStream[SessionMessage]
3939
write_stream_reader: MemoryObjectReceiveStream[SessionMessage]
4040

41-
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
42-
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
43-
4441
# Connect using websockets, requesting the "mcp" subprotocol
4542
async with ws_connect(url, subprotocols=[Subprotocol("mcp")]) as ws:
43+
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
44+
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
4645

4746
async def ws_reader():
4847
"""Reads text messages from the WebSocket, parses them as JSON-RPC messages,
@@ -68,7 +67,13 @@ async def ws_writer():
6867
msg_dict = session_message.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
6968
await ws.send(json.dumps(msg_dict))
7069

71-
async with anyio.create_task_group() as tg:
70+
async with (
71+
read_stream_writer,
72+
read_stream,
73+
write_stream,
74+
write_stream_reader,
75+
anyio.create_task_group() as tg,
76+
):
7277
# Start reader and writer tasks
7378
tg.start_soon(ws_reader)
7479
tg.start_soon(ws_writer)
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
"""Regression tests for memory stream leaks in client transports.
2+
3+
When a connection error occurs (404, 403, ConnectError), transport context
4+
managers must close ALL 4 memory stream ends they created. anyio memory streams
5+
are paired but independent — closing the writer does NOT close the reader.
6+
Unclosed stream ends emit ResourceWarning on GC, which pytest promotes to a
7+
test failure in whatever test happens to be running when GC triggers.
8+
9+
These tests force GC after the transport context exits, so any leaked stream
10+
triggers a ResourceWarning immediately and deterministically here, rather than
11+
nondeterministically in an unrelated later test.
12+
"""
13+
14+
import gc
15+
import sys
16+
from collections.abc import Iterator
17+
from contextlib import contextmanager
18+
19+
import httpx
20+
import pytest
21+
22+
from mcp.client.sse import sse_client
23+
from mcp.client.streamable_http import streamable_http_client
24+
from mcp.client.websocket import websocket_client
25+
26+
27+
@contextmanager
28+
def _assert_no_memory_stream_leak() -> Iterator[None]:
29+
"""Fail if any anyio MemoryObject stream emits ResourceWarning during the block.
30+
31+
Uses a custom sys.unraisablehook to capture ONLY MemoryObject stream leaks,
32+
ignoring unrelated resources (e.g. PipeHandle from flaky stdio tests on the
33+
same xdist worker). gc.collect() is forced after the block to make leaks
34+
deterministic.
35+
"""
36+
leaked: list[str] = []
37+
old_hook = sys.unraisablehook
38+
39+
def hook(args: "sys.UnraisableHookArgs") -> None: # pragma: no cover
40+
# Only executes if a leak occurs (i.e. the bug is present).
41+
# args.object is the __del__ function (not the stream instance) when
42+
# unraisablehook fires from a finalizer, so check exc_value — the
43+
# actual ResourceWarning("Unclosed <MemoryObjectSendStream at ...>").
44+
# Non-MemoryObject unraisables (e.g. PipeHandle leaked by an earlier
45+
# flaky test on the same xdist worker) are deliberately ignored —
46+
# this test should not fail for another test's resource leak.
47+
if "MemoryObject" in str(args.exc_value):
48+
leaked.append(str(args.exc_value))
49+
50+
sys.unraisablehook = hook
51+
try:
52+
yield
53+
gc.collect()
54+
assert not leaked, f"Memory streams leaked: {leaked}"
55+
finally:
56+
sys.unraisablehook = old_hook
57+
58+
59+
@pytest.mark.anyio
60+
async def test_sse_client_closes_all_streams_on_connection_error(free_tcp_port: int) -> None:
61+
"""sse_client must close all 4 stream ends when the connection fails.
62+
63+
Before the fix, only read_stream_writer and write_stream were closed in
64+
the finally block. read_stream and write_stream_reader were leaked.
65+
"""
66+
with _assert_no_memory_stream_leak():
67+
# sse_client enters a task group BEFORE connecting, so anyio wraps the
68+
# ConnectError from aconnect_sse in an ExceptionGroup.
69+
with pytest.raises(Exception) as exc_info: # noqa: B017
70+
async with sse_client(f"http://127.0.0.1:{free_tcp_port}/sse"):
71+
pytest.fail("should not reach here") # pragma: no cover
72+
73+
assert exc_info.group_contains(httpx.ConnectError)
74+
# exc_info holds the traceback → holds frame locals → keeps leaked
75+
# streams alive. Must drop it before gc.collect() can detect a leak.
76+
del exc_info
77+
78+
79+
@pytest.mark.anyio
80+
async def test_streamable_http_client_closes_all_streams_on_exit() -> None:
81+
"""streamable_http_client must close all 4 stream ends on exit.
82+
83+
Before the fix, read_stream was never closed — not even on the happy path.
84+
This test enters and exits the context without sending any messages, so no
85+
network connection is ever attempted (streamable_http connects lazily).
86+
"""
87+
with _assert_no_memory_stream_leak():
88+
async with streamable_http_client("http://127.0.0.1:1/mcp"):
89+
pass
90+
91+
92+
@pytest.mark.anyio
93+
async def test_websocket_client_closes_all_streams_on_connection_error(free_tcp_port: int) -> None:
94+
"""websocket_client must close all 4 stream ends when ws_connect fails.
95+
96+
Before the fix, there was no try/finally at all — if ws_connect raised,
97+
all 4 streams were leaked.
98+
"""
99+
with _assert_no_memory_stream_leak():
100+
with pytest.raises(OSError):
101+
async with websocket_client(f"ws://127.0.0.1:{free_tcp_port}/ws"):
102+
pytest.fail("should not reach here") # pragma: no cover

tests/shared/test_sse.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -544,12 +544,6 @@ def test_sse_server_transport_endpoint_validation(endpoint: str, expected_result
544544
assert sse._endpoint.startswith("/")
545545

546546

547-
# ResourceWarning filter: When mocking aconnect_sse, the sse_client's internal task
548-
# group doesn't receive proper cancellation signals, so the sse_reader task's finally
549-
# block (which closes read_stream_writer) doesn't execute. This is a test artifact -
550-
# the actual code path (`if not sse.data: continue`) IS exercised and works correctly.
551-
# Production code with real SSE connections cleans up properly.
552-
@pytest.mark.filterwarnings("ignore::ResourceWarning")
553547
@pytest.mark.anyio
554548
async def test_sse_client_handles_empty_keepalive_pings() -> None:
555549
"""Test that SSE client properly handles empty data lines (keep-alive pings).

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)