Skip to content

Commit ccf1061

Browse files
fix: close transport in _invalidate() to prevent socket leaks
_invalidate() set _protocol to None without closing the underlying TCP transport, leaking a socket on every connection error or leader change. Now calls protocol.close() (synchronous) before nulling, wrapped in try/except since the connection may already be broken. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b5f2088 commit ccf1061

File tree

2 files changed

+34
-0
lines changed

2 files changed

+34
-0
lines changed

src/dqliteclient/connection.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ def _ensure_connected(self) -> tuple[DqliteProtocol, int]:
129129

130130
def _invalidate(self) -> None:
131131
"""Mark the connection as broken after an unrecoverable error."""
132+
if self._protocol is not None:
133+
# Connection may already be broken; suppress close errors
134+
with contextlib.suppress(Exception):
135+
self._protocol.close()
132136
self._protocol = None
133137
self._db_id = None
134138

tests/test_connection.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,36 @@ async def test_connection_invalidated_after_protocol_error(self) -> None:
258258
# Connection should be invalidated
259259
assert not conn.is_connected
260260

261+
async def test_invalidate_closes_transport(self) -> None:
262+
"""_invalidate() should close the underlying transport to avoid socket leaks."""
263+
conn = DqliteConnection("localhost:9001")
264+
265+
mock_reader = AsyncMock()
266+
mock_writer = MagicMock()
267+
mock_writer.drain = AsyncMock()
268+
mock_writer.close = MagicMock()
269+
mock_writer.wait_closed = AsyncMock()
270+
271+
from dqlitewire.messages import DbResponse, WelcomeResponse
272+
273+
responses = [
274+
WelcomeResponse(heartbeat_timeout=15000).encode(),
275+
DbResponse(db_id=1).encode(),
276+
]
277+
mock_reader.read.side_effect = responses
278+
279+
with patch("asyncio.open_connection", return_value=(mock_reader, mock_writer)):
280+
await conn.connect()
281+
282+
# Trigger invalidation via a connection error
283+
mock_reader.read.side_effect = [b""]
284+
285+
with pytest.raises(DqliteConnectionError):
286+
await conn.execute("SELECT 1")
287+
288+
# The writer should have been closed to release the socket
289+
mock_writer.close.assert_called()
290+
261291
async def test_fetchone_returns_first_row(self) -> None:
262292
conn = DqliteConnection("localhost:9001")
263293

0 commit comments

Comments
 (0)