Skip to content

Commit 9bb4ae6

Browse files
committed
test(tracing): cover Agentex per-loop cache and speed up cap test
Addresses both Greptile P3 findings on PR #362: - AgentexAsyncTracingProcessor implemented the same per-loop client cache pattern as SGPAsyncTracingProcessor but had no dedicated test file. Added test_agentex_tracing_processor.py mirroring the SGP coverage: single-build-per-loop, keepalive-enabled regression guard, and WeakKeyDictionary eviction after GC. Skipped cleanly with pytest.importorskip when pydantic_ai isn't installed (the SDK dev venv state), since agentex_tracing_processor pulls in agentex.lib.adk which requires it. - test_linger_respects_batch_size_cap used linger_ms=500, forcing the tail singleton batch to wait out the full 500ms timeout — the test only asserts no batch exceeds the cap, so dropping to linger_ms=50 keeps correctness while cutting wall time by ~10x.
1 parent 2991a10 commit 9bb4ae6

2 files changed

Lines changed: 135 additions & 3 deletions

File tree

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import weakref
5+
from unittest.mock import MagicMock, patch
6+
7+
import pytest
8+
9+
# AgentexAsyncTracingProcessor pulls in agentex.lib.adk via
10+
# create_async_agentex_client, which in turn imports pydantic_ai at package
11+
# init. Skip these tests cleanly when pydantic_ai isn't installed (the SDK
12+
# dev venv state) so collection doesn't error out.
13+
pytest.importorskip(
14+
"pydantic_ai",
15+
reason="agentex.lib.adk import chain requires pydantic_ai",
16+
)
17+
18+
# Import the processor module up front so unittest.mock.patch() can resolve
19+
# attributes by string path. The tracing_processor_manager only loads this
20+
# module lazily, so without this explicit import the patches below would fail
21+
# with AttributeError at __enter__ time.
22+
import agentex.lib.core.tracing.processors.agentex_tracing_processor # noqa: E402, F401
23+
24+
MODULE = "agentex.lib.core.tracing.processors.agentex_tracing_processor"
25+
26+
27+
def _make_config() -> MagicMock:
28+
"""Empty config — AgentexTracingProcessorConfig is unused by __init__."""
29+
return MagicMock()
30+
31+
32+
class TestAgentexAsyncTracingProcessor:
33+
"""Coverage for the per-event-loop client cache. The SGP processor has
34+
matching tests; mirror them here so a regression in the Agentex side
35+
(e.g. an accidental refactor that switches back to a plain dict, or
36+
drops the lazy lookup) does not slip through unnoticed.
37+
"""
38+
39+
async def test_client_caches_per_event_loop(self):
40+
"""First access builds the client; subsequent accesses in the same
41+
running loop must return the cached instance.
42+
"""
43+
with patch(f"{MODULE}.create_async_agentex_client") as mock_factory:
44+
mock_factory.side_effect = lambda **kwargs: MagicMock()
45+
46+
from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
47+
AgentexAsyncTracingProcessor,
48+
)
49+
50+
processor = AgentexAsyncTracingProcessor(_make_config())
51+
52+
# Construction must not eagerly build the client (no running loop
53+
# guarantee at module import time).
54+
assert mock_factory.call_count == 0
55+
56+
c1 = processor.client
57+
c2 = processor.client
58+
c3 = processor.client
59+
60+
assert mock_factory.call_count == 1, (
61+
f"Expected client to be built once per loop, but "
62+
f"create_async_agentex_client was called {mock_factory.call_count} times"
63+
)
64+
assert c1 is c2 is c3
65+
66+
async def test_client_keepalive_is_enabled(self):
67+
"""Regression guard: the per-loop client must use keepalive — the
68+
whole reason for the per-loop cache. Verify max_keepalive_connections > 0.
69+
"""
70+
import httpx as _httpx
71+
72+
captured_limits: list[_httpx.Limits] = []
73+
original_async_client = _httpx.AsyncClient
74+
75+
def capture_limits(*args, **kwargs):
76+
limits = kwargs.get("limits")
77+
if limits is not None:
78+
captured_limits.append(limits)
79+
return original_async_client(*args, **kwargs)
80+
81+
with patch(f"{MODULE}.create_async_agentex_client") as mock_factory, patch(
82+
"httpx.AsyncClient", side_effect=capture_limits
83+
):
84+
mock_factory.side_effect = lambda **kwargs: MagicMock()
85+
86+
from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
87+
AgentexAsyncTracingProcessor,
88+
)
89+
90+
processor = AgentexAsyncTracingProcessor(_make_config())
91+
_ = processor.client
92+
93+
assert len(captured_limits) == 1
94+
max_keepalive = captured_limits[0].max_keepalive_connections
95+
assert max_keepalive is not None and max_keepalive > 0, (
96+
f"Agentex async client should have keepalive enabled, got "
97+
f"max_keepalive_connections={max_keepalive}"
98+
)
99+
100+
def test_cache_is_weakkeydict_and_evicts_dead_loops(self):
101+
"""Regression guard for the id()-reuse bug: the per-loop cache must
102+
be a WeakKeyDictionary so a GC'd loop's entry is evicted. Otherwise
103+
a new loop landing at the same memory address would reuse the dead
104+
loop's client, reintroducing the "bound to a different event loop"
105+
error the per-loop cache was built to prevent.
106+
"""
107+
import gc
108+
109+
with patch(f"{MODULE}.create_async_agentex_client"):
110+
from agentex.lib.core.tracing.processors.agentex_tracing_processor import (
111+
AgentexAsyncTracingProcessor,
112+
)
113+
114+
processor = AgentexAsyncTracingProcessor(_make_config())
115+
116+
# Storage type itself: WeakKeyDictionary, not plain dict.
117+
assert isinstance(processor._clients_by_loop, weakref.WeakKeyDictionary)
118+
119+
# End-to-end check: insert under a loop, drop the loop, the entry
120+
# must vanish after GC.
121+
loop = asyncio.new_event_loop()
122+
try:
123+
processor._clients_by_loop[loop] = MagicMock()
124+
assert len(processor._clients_by_loop) == 1
125+
finally:
126+
loop.close()
127+
del loop
128+
gc.collect()
129+
assert len(processor._clients_by_loop) == 0, (
130+
"WeakKeyDictionary should have evicted the dead loop's entry; "
131+
"remaining keys would cause stale-client reuse on id() recycling."
132+
)

tests/lib/core/tracing/test_span_queue.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,9 @@ async def capture_starts(spans: list[Span]) -> None:
389389
proc.on_spans_start = AsyncMock(side_effect=capture_starts)
390390
proc.on_spans_end = AsyncMock()
391391

392-
# Tight batch cap, generous linger. When the cap fills, the drain
393-
# should fire immediately rather than waiting out the linger.
394-
queue = AsyncSpanQueue(batch_size=3, linger_ms=500)
392+
# Tight batch cap, linger wide enough to coalesce but not so large
393+
# that the tail singleton stalls the test for hundreds of ms.
394+
queue = AsyncSpanQueue(batch_size=3, linger_ms=50)
395395

396396
ids = [f"span-{i}" for i in range(7)]
397397
for i in ids:

0 commit comments

Comments
 (0)