Skip to content

Commit 22a482e

Browse files
fix: add event-loop affinity detection and thread-safety documentation
DqliteConnection now captures the event loop at connect() time and verifies it in _check_in_use(). Using a connection from a different event loop (e.g., from another OS thread running its own loop) raises InterfaceError with a clear message instead of silently corrupting the protocol stream. Also adds thread-safety documentation to DqliteConnection, ConnectionPool, and the module docstring, clearly stating that these classes are not thread-safe and must be used within a single event loop. Fixes #094 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8fbec2d commit 22a482e

File tree

4 files changed

+97
-4
lines changed

4 files changed

+97
-4
lines changed

src/dqliteclient/__init__.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
"""Async Python client for dqlite."""
1+
"""Async Python client for dqlite.
2+
3+
Thread safety: connections and pools are NOT thread-safe. All operations
4+
must be performed within a single asyncio event loop. To submit work from
5+
other threads, use ``asyncio.run_coroutine_threadsafe()``. Free-threaded
6+
Python (no-GIL) is not supported.
7+
"""
28

39
from dqliteclient.cluster import ClusterClient
410
from dqliteclient.connection import DqliteConnection

src/dqliteclient/connection.py

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,15 @@ def _parse_address(address: str) -> tuple[str, int]:
4747

4848

4949
class DqliteConnection:
50-
"""High-level async connection to a dqlite database."""
50+
"""High-level async connection to a dqlite database.
51+
52+
Thread safety: this class is NOT thread-safe. All operations must be
53+
performed within a single asyncio event loop. Do not share instances
54+
across OS threads or event loops. To submit work from other threads,
55+
use ``asyncio.run_coroutine_threadsafe()`` — the coroutines execute
56+
safely in the event loop thread. Free-threaded Python (no-GIL) is
57+
not supported.
58+
"""
5159

5260
def __init__(
5361
self,
@@ -72,6 +80,7 @@ def __init__(
7280
self._db_id: int | None = None
7381
self._in_transaction = False
7482
self._in_use = False
83+
self._bound_loop: asyncio.AbstractEventLoop | None = None
7584

7685
@property
7786
def address(self) -> str:
@@ -89,6 +98,7 @@ async def connect(self) -> None:
8998
if self._protocol is not None:
9099
return
91100

101+
self._bound_loop = asyncio.get_running_loop()
92102
self._in_use = True
93103
try:
94104
host, port = _parse_address(self._address)
@@ -138,7 +148,19 @@ def _ensure_connected(self) -> tuple[DqliteProtocol, int]:
138148
return self._protocol, self._db_id
139149

140150
def _check_in_use(self) -> None:
141-
"""Raise if another coroutine is using this connection."""
151+
"""Raise on misuse: wrong event loop or concurrent coroutine access."""
152+
if self._bound_loop is not None:
153+
try:
154+
current_loop = asyncio.get_running_loop()
155+
except RuntimeError:
156+
raise InterfaceError(
157+
"DqliteConnection must be used from within an async context."
158+
) from None
159+
if current_loop is not self._bound_loop:
160+
raise InterfaceError(
161+
"DqliteConnection is bound to a different event loop. "
162+
"Do not share connections across event loops or OS threads."
163+
)
142164
if self._in_use:
143165
raise InterfaceError(
144166
"Cannot perform operation: another operation is in progress on this "

src/dqliteclient/pool.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,15 @@
1212

1313

1414
class ConnectionPool:
15-
"""Connection pool with automatic leader detection."""
15+
"""Connection pool with automatic leader detection.
16+
17+
Thread safety: this class is NOT thread-safe. All operations must be
18+
performed within a single asyncio event loop. Do not share pool
19+
instances across OS threads or event loops. To submit work from other
20+
threads, use ``asyncio.run_coroutine_threadsafe()`` — the coroutines
21+
execute safely in the event loop thread. Free-threaded Python (no-GIL)
22+
is not supported.
23+
"""
1624

1725
def __init__(
1826
self,

tests/test_connection.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -842,3 +842,60 @@ async def tx_b():
842842
f"got {type(errors[0]).__name__}: {errors[0]}"
843843
)
844844
assert "Nested" in str(errors[0]) or "nested" in str(errors[0])
845+
846+
async def test_cross_event_loop_raises_interface_error(self) -> None:
847+
"""Using a connection from a different event loop must raise InterfaceError."""
848+
import asyncio
849+
import threading
850+
851+
from dqliteclient.exceptions import InterfaceError
852+
853+
conn = DqliteConnection("localhost:9001")
854+
855+
mock_reader = AsyncMock()
856+
mock_writer = MagicMock()
857+
mock_writer.drain = AsyncMock()
858+
mock_writer.close = MagicMock()
859+
mock_writer.wait_closed = AsyncMock()
860+
861+
from dqlitewire.messages import DbResponse, ResultResponse, WelcomeResponse
862+
863+
responses = [
864+
WelcomeResponse(heartbeat_timeout=15000).encode(),
865+
DbResponse(db_id=1).encode(),
866+
]
867+
mock_reader.read.side_effect = responses
868+
869+
with patch("asyncio.open_connection", return_value=(mock_reader, mock_writer)):
870+
await conn.connect()
871+
872+
# Now try to use the connection from a different event loop in another thread
873+
error_from_thread: Exception | None = None
874+
875+
def run_in_other_loop():
876+
nonlocal error_from_thread
877+
878+
async def use_conn():
879+
# Provide a fresh response so the operation could succeed
880+
# if the guard doesn't catch it
881+
mock_reader.read.side_effect = [
882+
ResultResponse(last_insert_id=0, rows_affected=0).encode(),
883+
]
884+
await conn.execute("SELECT 1")
885+
886+
try:
887+
asyncio.run(use_conn())
888+
except InterfaceError as e:
889+
error_from_thread = e
890+
except Exception as e:
891+
error_from_thread = e
892+
893+
thread = threading.Thread(target=run_in_other_loop)
894+
thread.start()
895+
thread.join(timeout=5)
896+
897+
assert error_from_thread is not None, "Expected InterfaceError from cross-loop access"
898+
assert isinstance(error_from_thread, InterfaceError), (
899+
f"Expected InterfaceError, got {type(error_from_thread).__name__}: {error_from_thread}"
900+
)
901+
assert "event loop" in str(error_from_thread).lower()

0 commit comments

Comments
 (0)