From ca00c3b2724e6adc7b545a522f9f3093b0a7d604 Mon Sep 17 00:00:00 2001 From: mikemolinet Date: Thu, 21 May 2026 19:30:57 -0700 Subject: [PATCH] hotfix(events): long-poll releases pool slot via commit between iterations (P0) P0 hotfix port: a downstream consumer (cue.dock.svc) experienced pool exhaustion in prod due to `_run_long_poll_wait` holding an idle-in- transaction connection for the full ~30s wait window. At ~15 concurrent long-pollers (SQLAlchemy default pool_size=5 + max_overflow=10 = 15) the entire app-side pool gets saturated; new requests time out waiting for a free connection. Root cause: the while loop runs `await asyncio.sleep` + `await pull_events(db, ...)` with NO commit between iterations. Each pull_events opens an implicit txn that the session holds for the full wait window. Fix (+2 LOC): `await db.commit()` at function entry (releases caller's initial-pull implicit txn before first sleep window) + after each empty iteration (releases per-iteration txn before next sleep window). Empirically verified (2026-05-22) that AsyncSession.commit() returns the pool slot AND ends the Postgres txn; session re-acquires from pool transparently on next operation. Tests: - 2 new regression guards added to tests/test_events_long_poll.py pinning the commit invariant (commit_count assertions) - 13/13 long-poll tests pass locally Cross-port from cueapi/cueapi#925 (mergeCommit 359961b). Co-Authored-By: Claude Opus 4.7 (1M context) --- app/routers/events.py | 28 ++++++++ tests/test_events_long_poll.py | 117 +++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+) diff --git a/app/routers/events.py b/app/routers/events.py index c7a7114..598f000 100644 --- a/app/routers/events.py +++ b/app/routers/events.py @@ -434,6 +434,28 @@ async def _run_long_poll_wait( so tests can monkeypatch shorter values without function signature churn. """ + # P0 fix (2026-05-22): release any implicit transaction held by + # the caller's initial pull_events() SELECT BEFORE entering the + # sleep loop. Without this, the SQLAlchemy session keeps the + # Postgres connection in `idle in transaction` state for the full + # ~30s wait window. At ~15 concurrent long-pollers (SQLAlchemy + # default pool_size=5 + max_overflow=10 = 15) the entire app-side + # pool is held by waiting listeners + new requests time out + # waiting for a free connection. Empirical evidence: a downstream + # consumer (cue.dock.svc on Govind's org) tipped over at ~15 + # concurrent listeners; pg_stat_activity showed 15 connections + # holding `SELECT events.id ...` for 14-20s each, matching the + # default pool ceiling. + # + # AsyncSession.commit() empirically verified to BOTH end the + # Postgres txn AND return the pool slot. The session re-acquires + # from the pool transparently on next operation. So this +2-line + # fix addresses both the Postgres idle-in-transaction symptom AND + # the SQLAlchemy app-pool exhaustion. The caller's post-helper + # _advance_ack_after_pull(db, ...) keeps working because the + # session itself remains usable across commits. + await db.commit() + deadline = asyncio.get_event_loop().time() + LONG_POLL_MAX_SECONDS events: list = [] next_cursor: Optional[int] = None @@ -457,6 +479,12 @@ async def _run_long_poll_wait( ) if events: break + # P0 fix continued: release the implicit transaction opened by + # the just-completed empty pull_events() SELECT so the next + # asyncio.sleep() window doesn't sit `idle in transaction`. + # Next iteration's pull_events() re-acquires from the pool + # transparently. + await db.commit() return events, next_cursor, has_more diff --git a/tests/test_events_long_poll.py b/tests/test_events_long_poll.py index d8de124..c064337 100644 --- a/tests/test_events_long_poll.py +++ b/tests/test_events_long_poll.py @@ -422,3 +422,120 @@ async def test_helper_respects_since_cursor( elapsed = asyncio.get_event_loop().time() - started assert events == [] assert 0.8 < elapsed < 2.0 + + +# ─────────────────────────────────────────────────────────────────────── +# P0 fix (2026-05-22) — pool exhaustion regression guard. The long-poll +# helper MUST commit/release its transaction at entry AND between every +# empty iteration so it doesn't hold an `idle in transaction` connection +# for the full wait window. A downstream consumer (cue.dock.svc) tipped +# over at ~15 concurrent listeners with pg_stat_activity showing 15 +# connections holding SELECT for 14-20s each (matching SQLAlchemy +# default pool_size=5 + max_overflow=10). Pin the invariant here as a +# regression guard. +# ─────────────────────────────────────────────────────────────────────── + + +async def test_helper_commits_at_entry_and_between_empty_iterations( + db_session: AsyncSession, + lp_agent: Agent, + fast_long_poll, +): + """Pin the P0 invariant: ``_run_long_poll_wait`` must call + ``await db.commit()`` at entry + after every empty + ``pull_events()`` so the long-poll session doesn't sit + `idle in transaction` for the full window. + + Counts commit calls via a thin spy wrapper. With LONG_POLL_MAX_SECONDS + = 1.0s and LONG_POLL_INTERNAL_INTERVAL_SECONDS = 0.2s (set by + fast_long_poll), the loop fires ~5 sleep+pull iterations. Expect + at least 1 commit at entry + 1 per empty iteration. + """ + commit_count = 0 + original_commit = db_session.commit + + async def counting_commit(): + nonlocal commit_count + commit_count += 1 + await original_commit() + + db_session.commit = counting_commit # type: ignore[assignment] + + events, cursor, has_more = await _run_long_poll_wait( + db_session, + recipient_agent_id=lp_agent.id, + since=0, + limit=100, + event_type=None, + ) + + assert events == [] + assert cursor is None + assert has_more is False + # At least 2 commits expected: one at entry + at least one + # between empty iterations. In practice with the 1.0s window + # and 0.2s interval it's ~6 (1 entry + ~5 per iter). + assert commit_count >= 2, ( + f"Expected ≥2 commits (1 at entry + ≥1 between empty iterations) " + f"to release pool slot during long-poll wait — got {commit_count}. " + f"This is the P0 pool-exhaustion regression guard (2026-05-22)." + ) + + +async def test_helper_commits_at_entry_even_when_event_arrives_immediately( + db_session: AsyncSession, + lp_agent: Agent, + fast_long_poll, +): + """Regression guard: even when an event arrives on the first + sleep+poll iteration (before any empty iterations fire), the + entry-commit MUST have run to release the caller's initial-pull + implicit txn. Counts commit calls; expects ≥1 (the entry commit) + regardless of how many iterations the loop completes. + """ + async def insert_after_delay(): + await asyncio.sleep(0.1) + # Use a fresh session — concurrent use of the helper's + # db_session would race with the helper's own commits. + from app.database import async_session + async with async_session() as fresh: + fresh.add( + Event( + event_type="message.delivered", + recipient_agent_id=lp_agent.id, + payload={"fast": True}, + emitted_at=datetime.now(timezone.utc), + ) + ) + await fresh.commit() + + commit_count = 0 + original_commit = db_session.commit + + async def counting_commit(): + nonlocal commit_count + commit_count += 1 + await original_commit() + + db_session.commit = counting_commit # type: ignore[assignment] + + # Race: helper sleeps 0.2s; inserter writes at 0.1s. Helper picks up + # the event on its first poll iteration. + inserter = asyncio.create_task(insert_after_delay()) + events, cursor, has_more = await _run_long_poll_wait( + db_session, + recipient_agent_id=lp_agent.id, + since=0, + limit=100, + event_type=None, + ) + await inserter + + assert len(events) >= 1 + # Entry-commit MUST have fired at least once even though the loop + # exited on the first iteration via `if events: break`. + assert commit_count >= 1, ( + f"Expected ≥1 commit (entry-commit to release caller's initial-" + f"pull implicit txn) even when event arrives on first iteration. " + f"Got {commit_count}. P0 pool-exhaustion regression guard." + )