Skip to content

Commit 2991a10

Browse files
committed
fix(tracing): use WeakKeyDictionary for per-loop client cache
Addresses Greptile review feedback on PR #362. The original `dict[int, AsyncSGPClient]` cache used `id(asyncio.get_running_loop())` as the key. In CPython `id()` returns a memory address, and once a loop is garbage-collected its address can be assigned to a new loop — a fresh loop hashing to a stale entry would receive a client whose httpx.AsyncClient was bound to the dead loop, reintroducing the "bound to a different event loop" error this PR was built to prevent. Switching the cache to `weakref.WeakKeyDictionary` keyed on the loop object itself fixes the bug: the entry is evicted automatically when the loop is collected, so id() recycling can't cause stale-client reuse. Multi-loop caching benefit is preserved (better than the single-slot pattern in TracingModule for agents that bounce between loops). Same fix applied to AgentexAsyncTracingProcessor. Added a regression test verifying the cache evicts a closed/dropped loop's entry after gc.collect().
1 parent 958320a commit 2991a10

3 files changed

Lines changed: 61 additions & 12 deletions

File tree

src/agentex/lib/core/tracing/processors/agentex_tracing_processor.py

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import weakref
23
from typing import TYPE_CHECKING, Any, Dict, override
34

45
from agentex import Agentex
@@ -74,8 +75,13 @@ def __init__(self, config: AgentexTracingProcessorConfig): # noqa: ARG002
7475
# Per-event-loop client cache. httpx.AsyncClient is bound to the
7576
# loop that created it, so in sync-ACP / streaming contexts (where
7677
# the active loop can change between requests) we keep one client
77-
# per loop instead of disabling keepalive entirely.
78-
self._clients_by_loop_id: dict[int, "AsyncAgentex"] = {}
78+
# per loop instead of disabling keepalive entirely. The cache is a
79+
# WeakKeyDictionary so a GC'd loop and its client are evicted
80+
# automatically — using id() as a key would reuse entries when
81+
# CPython recycles a freed loop's memory address.
82+
self._clients_by_loop: weakref.WeakKeyDictionary[
83+
asyncio.AbstractEventLoop, "AsyncAgentex"
84+
] = weakref.WeakKeyDictionary()
7985

8086
def _build_client(self) -> "AsyncAgentex":
8187
import httpx
@@ -91,13 +97,13 @@ def _build_client(self) -> "AsyncAgentex":
9197
@property
9298
def client(self) -> "AsyncAgentex":
9399
try:
94-
loop_id = id(asyncio.get_running_loop())
100+
loop = asyncio.get_running_loop()
95101
except RuntimeError:
96102
return self._build_client()
97-
client = self._clients_by_loop_id.get(loop_id)
103+
client = self._clients_by_loop.get(loop)
98104
if client is None:
99105
client = self._build_client()
100-
self._clients_by_loop_id[loop_id] = client
106+
self._clients_by_loop[loop] = client
101107
return client
102108

103109
# TODO(AGX1-199): Add batch create/update endpoints to Agentex API and use

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import asyncio
4+
import weakref
45
from typing import cast, override
56

67
import scale_gp_beta.lib.tracing as tracing
@@ -96,11 +97,14 @@ def __init__(self, config: SGPTracingProcessorConfig):
9697
self._config = config
9798
# Per-event-loop client cache. httpx.AsyncClient ties its connection
9899
# pool to the loop it was created on; in sync-ACP / streaming contexts
99-
# the active loop can change between requests. Keying by loop id lets
100+
# the active loop can change between requests. Caching per loop lets
100101
# us keep keepalive on within each loop while staying safe across
101-
# loops. The construction can also happen at module import time when
102-
# no loop is running, so we have to defer it until the first call.
103-
self._clients_by_loop_id: dict[int, AsyncSGPClient] = {}
102+
# loops. The cache is a WeakKeyDictionary so a GC'd loop and its
103+
# client are evicted automatically — using id() as a key would reuse
104+
# entries when CPython recycles a freed loop's memory address.
105+
self._clients_by_loop: weakref.WeakKeyDictionary[
106+
asyncio.AbstractEventLoop, AsyncSGPClient
107+
] = weakref.WeakKeyDictionary()
104108
self.env_vars = EnvironmentVariables.refresh()
105109

106110
def _build_client(self) -> AsyncSGPClient:
@@ -124,15 +128,15 @@ def _get_client(self) -> AsyncSGPClient | None:
124128
if self.disabled:
125129
return None
126130
try:
127-
loop_id = id(asyncio.get_running_loop())
131+
loop = asyncio.get_running_loop()
128132
except RuntimeError:
129133
# Called from outside an event loop — should not happen on the
130134
# hot path, but build a one-off client rather than crashing.
131135
return self._build_client()
132-
client = self._clients_by_loop_id.get(loop_id)
136+
client = self._clients_by_loop.get(loop)
133137
if client is None:
134138
client = self._build_client()
135-
self._clients_by_loop_id[loop_id] = client
139+
self._clients_by_loop[loop] = client
136140
return client
137141

138142
@override

tests/lib/core/tracing/processors/test_sgp_tracing_processor.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import uuid
4+
import asyncio
45
from datetime import UTC, datetime
56
from unittest.mock import AsyncMock, MagicMock, patch
67

@@ -292,6 +293,44 @@ def capture_limits(*args, **kwargs):
292293
f"max_keepalive_connections={max_keepalive}"
293294
)
294295

296+
def test_cache_is_weakkeydict_and_evicts_dead_loops(self):
297+
"""Regression guard for the id()-reuse bug: the per-loop cache must
298+
be a WeakKeyDictionary so a GC'd loop's entry is evicted. Otherwise
299+
a new loop landing at the same memory address would reuse the dead
300+
loop's client, reintroducing the "bound to a different event loop"
301+
error the per-loop cache was built to prevent.
302+
"""
303+
import gc
304+
import weakref
305+
306+
mock_env = MagicMock()
307+
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
308+
309+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), patch(f"{MODULE}.AsyncSGPClient"):
310+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
311+
SGPAsyncTracingProcessor,
312+
)
313+
314+
processor = SGPAsyncTracingProcessor(_make_config())
315+
316+
# Storage type itself: WeakKeyDictionary, not plain dict.
317+
assert isinstance(processor._clients_by_loop, weakref.WeakKeyDictionary)
318+
319+
# End-to-end check: insert under a loop, drop the loop, the entry
320+
# must vanish after GC.
321+
loop = asyncio.new_event_loop()
322+
try:
323+
processor._clients_by_loop[loop] = MagicMock()
324+
assert len(processor._clients_by_loop) == 1
325+
finally:
326+
loop.close()
327+
del loop
328+
gc.collect()
329+
assert len(processor._clients_by_loop) == 0, (
330+
"WeakKeyDictionary should have evicted the dead loop's entry; "
331+
"remaining keys would cause stale-client reuse on id() recycling."
332+
)
333+
295334
async def test_disabled_processor_returns_none_client(self):
296335
"""When config is missing api_key/account_id, _get_client must return
297336
None and no HTTP client must be constructed."""

0 commit comments

Comments
 (0)