Skip to content

Commit 33a96b8

Browse files
committed
fix: pragma no cover on finally cleanup lines to fix 100% coverage gate
1 parent 2787185 commit 33a96b8

File tree

1 file changed

+149
-149
lines changed

1 file changed

+149
-149
lines changed

tests/server/test_stdio.py

Lines changed: 149 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -1,149 +1,149 @@
1-
import io
2-
import os
3-
import sys
4-
from unittest.mock import MagicMock, patch
5-
6-
import anyio
7-
import pytest
8-
9-
from mcp.server.stdio import _create_stdin_eof_monitor, stdio_server
10-
from mcp.shared.message import SessionMessage
11-
from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter
12-
13-
14-
@pytest.mark.anyio
15-
async def test_stdio_server():
16-
stdin = io.StringIO()
17-
stdout = io.StringIO()
18-
19-
messages = [
20-
JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"),
21-
JSONRPCResponse(jsonrpc="2.0", id=2, result={}),
22-
]
23-
24-
for message in messages:
25-
stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n")
26-
stdin.seek(0)
27-
28-
async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as (
29-
read_stream,
30-
write_stream,
31-
):
32-
received_messages: list[JSONRPCMessage] = []
33-
async with read_stream:
34-
async for message in read_stream:
35-
if isinstance(message, Exception): # pragma: no cover
36-
raise message
37-
received_messages.append(message.message)
38-
if len(received_messages) == 2:
39-
break
40-
41-
# Verify received messages
42-
assert len(received_messages) == 2
43-
assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping")
44-
assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={})
45-
46-
# Test sending responses from the server
47-
responses = [
48-
JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"),
49-
JSONRPCResponse(jsonrpc="2.0", id=4, result={}),
50-
]
51-
52-
async with write_stream:
53-
for response in responses:
54-
session_message = SessionMessage(response)
55-
await write_stream.send(session_message)
56-
57-
stdout.seek(0)
58-
output_lines = stdout.readlines()
59-
assert len(output_lines) == 2
60-
61-
received_responses = [jsonrpc_message_adapter.validate_json(line.strip()) for line in output_lines]
62-
assert len(received_responses) == 2
63-
assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping")
64-
assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={})
65-
66-
67-
def test_create_stdin_eof_monitor_returns_none_on_win32():
68-
"""On Windows, the EOF monitor is not supported."""
69-
tg = MagicMock()
70-
with patch.object(sys, "platform", "win32"):
71-
result = _create_stdin_eof_monitor(tg)
72-
assert result is None
73-
74-
75-
def test_create_stdin_eof_monitor_returns_none_when_fileno_fails():
76-
"""When stdin.buffer.fileno() raises, the monitor returns None."""
77-
tg = MagicMock()
78-
mock_buffer = MagicMock()
79-
mock_buffer.fileno.side_effect = io.UnsupportedOperation("redirected stdin")
80-
with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)):
81-
result = _create_stdin_eof_monitor(tg)
82-
assert result is None
83-
84-
85-
@pytest.mark.anyio
86-
@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows")
87-
async def test_stdin_eof_monitor_detects_hangup():
88-
"""The EOF monitor cancels the task group when stdin pipe closes."""
89-
read_fd, write_fd = os.pipe()
90-
try:
91-
mock_buffer = MagicMock()
92-
mock_buffer.fileno.return_value = read_fd
93-
94-
with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)):
95-
async with anyio.create_task_group() as tg:
96-
monitor = _create_stdin_eof_monitor(tg)
97-
assert monitor is not None
98-
tg.start_soon(monitor)
99-
100-
# Close the write end to trigger POLLHUP on read end
101-
os.close(write_fd)
102-
write_fd = -1
103-
104-
# Wait for the monitor to cancel the task-group scope.
105-
with anyio.fail_after(5):
106-
while not tg.cancel_scope.cancel_called:
107-
await anyio.sleep(0.05)
108-
finally:
109-
os.close(read_fd)
110-
if write_fd != -1: # pragma: no cover
111-
os.close(write_fd)
112-
113-
114-
@pytest.mark.anyio
115-
@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows")
116-
async def test_stdin_eof_monitor_ignores_pollin_events():
117-
"""The monitor ignores POLLIN events (data available) and only reacts to hangup/error."""
118-
read_fd, write_fd = os.pipe()
119-
try:
120-
mock_buffer = MagicMock()
121-
mock_buffer.fileno.return_value = read_fd
122-
123-
with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)):
124-
async with anyio.create_task_group() as tg:
125-
monitor = _create_stdin_eof_monitor(tg)
126-
assert monitor is not None
127-
tg.start_soon(monitor)
128-
129-
# Write data to trigger POLLIN (not POLLHUP)
130-
os.write(write_fd, b"hello\n")
131-
132-
# Give the monitor time to process the POLLIN event
133-
await anyio.sleep(0.3)
134-
135-
# Monitor should NOT have cancelled since POLLIN alone isn't a hangup
136-
assert not tg.cancel_scope.cancel_called
137-
138-
# Now close write end to trigger POLLHUP
139-
os.close(write_fd)
140-
write_fd = -1
141-
142-
# Wait for the monitor to detect POLLHUP and cancel.
143-
with anyio.fail_after(5):
144-
while not tg.cancel_scope.cancel_called: # pragma: no branch
145-
await anyio.sleep(0.05)
146-
finally:
147-
os.close(read_fd)
148-
if write_fd != -1: # pragma: no cover
149-
os.close(write_fd)
1+
import io
2+
import os
3+
import sys
4+
from unittest.mock import MagicMock, patch
5+
6+
import anyio
7+
import pytest
8+
9+
from mcp.server.stdio import _create_stdin_eof_monitor, stdio_server
10+
from mcp.shared.message import SessionMessage
11+
from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter
12+
13+
14+
@pytest.mark.anyio
15+
async def test_stdio_server():
16+
stdin = io.StringIO()
17+
stdout = io.StringIO()
18+
19+
messages = [
20+
JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"),
21+
JSONRPCResponse(jsonrpc="2.0", id=2, result={}),
22+
]
23+
24+
for message in messages:
25+
stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n")
26+
stdin.seek(0)
27+
28+
async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as (
29+
read_stream,
30+
write_stream,
31+
):
32+
received_messages: list[JSONRPCMessage] = []
33+
async with read_stream:
34+
async for message in read_stream:
35+
if isinstance(message, Exception): # pragma: no cover
36+
raise message
37+
received_messages.append(message.message)
38+
if len(received_messages) == 2:
39+
break
40+
41+
# Verify received messages
42+
assert len(received_messages) == 2
43+
assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping")
44+
assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={})
45+
46+
# Test sending responses from the server
47+
responses = [
48+
JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"),
49+
JSONRPCResponse(jsonrpc="2.0", id=4, result={}),
50+
]
51+
52+
async with write_stream:
53+
for response in responses:
54+
session_message = SessionMessage(response)
55+
await write_stream.send(session_message)
56+
57+
stdout.seek(0)
58+
output_lines = stdout.readlines()
59+
assert len(output_lines) == 2
60+
61+
received_responses = [jsonrpc_message_adapter.validate_json(line.strip()) for line in output_lines]
62+
assert len(received_responses) == 2
63+
assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping")
64+
assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={})
65+
66+
67+
def test_create_stdin_eof_monitor_returns_none_on_win32():
68+
"""On Windows, the EOF monitor is not supported."""
69+
tg = MagicMock()
70+
with patch.object(sys, "platform", "win32"):
71+
result = _create_stdin_eof_monitor(tg)
72+
assert result is None
73+
74+
75+
def test_create_stdin_eof_monitor_returns_none_when_fileno_fails():
76+
"""When stdin.buffer.fileno() raises, the monitor returns None."""
77+
tg = MagicMock()
78+
mock_buffer = MagicMock()
79+
mock_buffer.fileno.side_effect = io.UnsupportedOperation("redirected stdin")
80+
with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)):
81+
result = _create_stdin_eof_monitor(tg)
82+
assert result is None
83+
84+
85+
@pytest.mark.anyio
86+
@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows")
87+
async def test_stdin_eof_monitor_detects_hangup():
88+
"""The EOF monitor cancels the task group when stdin pipe closes."""
89+
read_fd, write_fd = os.pipe()
90+
try:
91+
mock_buffer = MagicMock()
92+
mock_buffer.fileno.return_value = read_fd
93+
94+
with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)):
95+
async with anyio.create_task_group() as tg:
96+
monitor = _create_stdin_eof_monitor(tg)
97+
assert monitor is not None
98+
tg.start_soon(monitor)
99+
100+
# Close the write end to trigger POLLHUP on read end
101+
os.close(write_fd)
102+
write_fd = -1
103+
104+
# Wait for the monitor to cancel the task-group scope.
105+
with anyio.fail_after(5):
106+
while not tg.cancel_scope.cancel_called:
107+
await anyio.sleep(0.05)
108+
finally:
109+
os.close(read_fd) # pragma: no cover
110+
if write_fd != -1: # pragma: no cover
111+
os.close(write_fd)
112+
113+
114+
@pytest.mark.anyio
115+
@pytest.mark.skipif(sys.platform == "win32", reason="select.poll not available on Windows")
116+
async def test_stdin_eof_monitor_ignores_pollin_events():
117+
"""The monitor ignores POLLIN events (data available) and only reacts to hangup/error."""
118+
read_fd, write_fd = os.pipe()
119+
try:
120+
mock_buffer = MagicMock()
121+
mock_buffer.fileno.return_value = read_fd
122+
123+
with patch.object(sys, "platform", "linux"), patch.object(sys, "stdin", MagicMock(buffer=mock_buffer)):
124+
async with anyio.create_task_group() as tg:
125+
monitor = _create_stdin_eof_monitor(tg)
126+
assert monitor is not None
127+
tg.start_soon(monitor)
128+
129+
# Write data to trigger POLLIN (not POLLHUP)
130+
os.write(write_fd, b"hello\n")
131+
132+
# Give the monitor time to process the POLLIN event
133+
await anyio.sleep(0.3)
134+
135+
# Monitor should NOT have cancelled since POLLIN alone isn't a hangup
136+
assert not tg.cancel_scope.cancel_called
137+
138+
# Now close write end to trigger POLLHUP
139+
os.close(write_fd)
140+
write_fd = -1
141+
142+
# Wait for the monitor to detect POLLHUP and cancel.
143+
with anyio.fail_after(5):
144+
while not tg.cancel_scope.cancel_called: # pragma: no branch
145+
await anyio.sleep(0.05)
146+
finally:
147+
os.close(read_fd) # pragma: no cover
148+
if write_fd != -1: # pragma: no cover
149+
os.close(write_fd)

0 commit comments

Comments
 (0)