Skip to content

Commit ee46896

Browse files
committed
fix: close all memory stream ends in client transport cleanup
Client transports for SSE, WebSocket, and StreamableHTTP create 4 memory stream ends (2 paired streams) but only closed 2 in their finally blocks. anyio memory stream ends are independent — closing the writer does not close the reader. The unclosed ends leak and emit ResourceWarning when garbage collected. This caused flaky test failures in CI: a transport connection error in one test would leak streams, then GC in a later unrelated test would trigger ResourceWarning, which pytest promotes to a test failure. Fix follows the existing correct pattern in stdio.py: - sse.py: close all 4 stream ends in the existing finally block - streamable_http.py: close all 4 stream ends in the existing finally block (read_stream was previously never closed, even on happy path) - websocket.py: add try/finally wrapping the entire body, closing all 4 stream ends (previously had no cleanup at all — ws_connect failure leaked everything) Regression tests force gc.collect() after the transport context exits so leaked streams fail deterministically in the test that caused them.
1 parent 62eb08e commit ee46896

File tree

4 files changed

+124
-32
lines changed

4 files changed

+124
-32
lines changed

src/mcp/client/sse.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,5 @@ async def post_writer(endpoint_url: str):
160160
finally:
161161
await read_stream_writer.aclose()
162162
await write_stream.aclose()
163+
await read_stream.aclose()
164+
await write_stream_reader.aclose()

src/mcp/client/streamable_http.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,3 +577,5 @@ def start_get_stream() -> None:
577577
finally:
578578
await read_stream_writer.aclose()
579579
await write_stream.aclose()
580+
await read_stream.aclose()
581+
await write_stream_reader.aclose()

src/mcp/client/websocket.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,40 +41,46 @@ async def websocket_client(
4141
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
4242
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
4343

44-
# Connect using websockets, requesting the "mcp" subprotocol
45-
async with ws_connect(url, subprotocols=[Subprotocol("mcp")]) as ws:
44+
try:
45+
# Connect using websockets, requesting the "mcp" subprotocol
46+
async with ws_connect(url, subprotocols=[Subprotocol("mcp")]) as ws:
4647

47-
async def ws_reader():
48-
"""Reads text messages from the WebSocket, parses them as JSON-RPC messages,
49-
and sends them into read_stream_writer.
50-
"""
51-
async with read_stream_writer:
52-
async for raw_text in ws:
53-
try:
54-
message = types.jsonrpc_message_adapter.validate_json(raw_text, by_name=False)
55-
session_message = SessionMessage(message)
56-
await read_stream_writer.send(session_message)
57-
except ValidationError as exc: # pragma: no cover
58-
# If JSON parse or model validation fails, send the exception
59-
await read_stream_writer.send(exc)
48+
async def ws_reader():
49+
"""Reads text messages from the WebSocket, parses them as JSON-RPC messages,
50+
and sends them into read_stream_writer.
51+
"""
52+
async with read_stream_writer:
53+
async for raw_text in ws:
54+
try:
55+
message = types.jsonrpc_message_adapter.validate_json(raw_text, by_name=False)
56+
session_message = SessionMessage(message)
57+
await read_stream_writer.send(session_message)
58+
except ValidationError as exc: # pragma: no cover
59+
# If JSON parse or model validation fails, send the exception
60+
await read_stream_writer.send(exc)
6061

61-
async def ws_writer():
62-
"""Reads JSON-RPC messages from write_stream_reader and
63-
sends them to the server.
64-
"""
65-
async with write_stream_reader:
66-
async for session_message in write_stream_reader:
67-
# Convert to a dict, then to JSON
68-
msg_dict = session_message.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
69-
await ws.send(json.dumps(msg_dict))
62+
async def ws_writer():
63+
"""Reads JSON-RPC messages from write_stream_reader and
64+
sends them to the server.
65+
"""
66+
async with write_stream_reader:
67+
async for session_message in write_stream_reader:
68+
# Convert to a dict, then to JSON
69+
msg_dict = session_message.message.model_dump(by_alias=True, mode="json", exclude_unset=True)
70+
await ws.send(json.dumps(msg_dict))
7071

71-
async with anyio.create_task_group() as tg:
72-
# Start reader and writer tasks
73-
tg.start_soon(ws_reader)
74-
tg.start_soon(ws_writer)
72+
async with anyio.create_task_group() as tg:
73+
# Start reader and writer tasks
74+
tg.start_soon(ws_reader)
75+
tg.start_soon(ws_writer)
7576

76-
# Yield the receive/send streams
77-
yield (read_stream, write_stream)
77+
# Yield the receive/send streams
78+
yield (read_stream, write_stream)
7879

79-
# Once the caller's 'async with' block exits, we shut down
80-
tg.cancel_scope.cancel()
80+
# Once the caller's 'async with' block exits, we shut down
81+
tg.cancel_scope.cancel()
82+
finally:
83+
await read_stream_writer.aclose()
84+
await write_stream.aclose()
85+
await read_stream.aclose()
86+
await write_stream_reader.aclose()
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Regression tests for memory stream leaks in client transports.
2+
3+
When a connection error occurs (404, 403, ConnectError), transport context
4+
managers must close ALL 4 memory stream ends they created. anyio memory streams
5+
are paired but independent — closing the writer does NOT close the reader.
6+
Unclosed stream ends emit ResourceWarning on GC, which pytest promotes to a
7+
test failure in whatever test happens to be running when GC triggers.
8+
9+
These tests force GC after the transport context exits, so any leaked stream
10+
triggers a ResourceWarning immediately and deterministically here, rather than
11+
nondeterministically in an unrelated later test.
12+
"""
13+
14+
import gc
15+
import socket
16+
17+
import httpx
18+
import pytest
19+
20+
from mcp.client.sse import sse_client
21+
from mcp.client.streamable_http import streamable_http_client
22+
from mcp.client.websocket import websocket_client
23+
24+
25+
def _unused_tcp_port() -> int:
26+
"""Return a port with no listener. Binding then closing leaves the port unbound."""
27+
with socket.socket() as s:
28+
s.bind(("127.0.0.1", 0))
29+
return s.getsockname()[1]
30+
31+
32+
@pytest.mark.anyio
33+
async def test_sse_client_closes_all_streams_on_connection_error() -> None:
34+
"""sse_client must close all 4 stream ends when the connection fails.
35+
36+
Before the fix, only read_stream_writer and write_stream were closed in
37+
the finally block. read_stream and write_stream_reader were leaked.
38+
"""
39+
port = _unused_tcp_port()
40+
41+
# sse_client enters a task group BEFORE connecting, so anyio wraps the
42+
# ConnectError from aconnect_sse in an ExceptionGroup. ExceptionGroup is
43+
# an Exception subclass, so we catch broadly and verify the sub-exception.
44+
with pytest.raises(Exception) as exc_info: # noqa: B017
45+
async with sse_client(f"http://127.0.0.1:{port}/sse"):
46+
pytest.fail("should not reach here") # pragma: no cover
47+
48+
assert exc_info.group_contains(httpx.ConnectError)
49+
50+
# If any stream leaked, gc.collect() triggers ResourceWarning in __del__,
51+
# which pytest's filterwarnings=["error"] promotes to a test failure.
52+
gc.collect()
53+
54+
55+
@pytest.mark.anyio
56+
async def test_streamable_http_client_closes_all_streams_on_exit() -> None:
57+
"""streamable_http_client must close all 4 stream ends on exit.
58+
59+
Before the fix, read_stream was never closed — not even on the happy path.
60+
This test enters and exits the context without sending any messages, so no
61+
network connection is ever attempted (streamable_http connects lazily).
62+
"""
63+
async with streamable_http_client("http://127.0.0.1:1/mcp"):
64+
pass
65+
66+
gc.collect()
67+
68+
69+
@pytest.mark.anyio
70+
async def test_websocket_client_closes_all_streams_on_connection_error() -> None:
71+
"""websocket_client must close all 4 stream ends when ws_connect fails.
72+
73+
Before the fix, there was no try/finally at all — if ws_connect raised,
74+
all 4 streams were leaked.
75+
"""
76+
port = _unused_tcp_port()
77+
78+
with pytest.raises(OSError):
79+
async with websocket_client(f"ws://127.0.0.1:{port}/ws"):
80+
pytest.fail("should not reach here") # pragma: no cover
81+
82+
gc.collect()

0 commit comments

Comments
 (0)