fix: timer-based cleanup of listenQueues after transient exporter disconnect#417
fix: timer-based cleanup of listenQueues after transient exporter disconnect#417ambient-code[bot] wants to merge 4 commits intomainfrom
Conversation
✅ Deploy Preview for jumpstarter-docs ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Important Review skippedBot user detected. To trigger a single review, invoke the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@ambient-code please rebase this |
d55da90 to
65c93a4
Compare
|
Rebased onto latest
No conflicts during rebase. All CI checks were passing before the rebase; the rebased commit ( Status summary:
|
| func TestListenQueueTimerCleanup(t *testing.T) { | ||
| // Shorten the delay so the test completes quickly. | ||
| original := listenQueueCleanupDelay | ||
| listenQueueCleanupDelay = 50 * time.Millisecond | ||
| t.Cleanup(func() { listenQueueCleanupDelay = original }) | ||
|
|
||
| svc := &ControllerService{} | ||
| leaseName := "test-lease" | ||
|
|
||
| // Seed the queue as Listen() would via LoadOrStore. | ||
| ch := make(chan *pb.ListenResponse, 8) | ||
| svc.listenQueues.Store(leaseName, ch) | ||
|
|
||
| // Simulate the stream-error path: schedule deferred cleanup. | ||
| t.Run("queue survives transient error", func(t *testing.T) { | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| }) |
There was a problem hiding this comment.
[medium] These tests directly replicate the production timer logic (time.AfterFunc + Delete callbacks) rather than exercising the actual Listen() code path. For example, the timer callback on lines 322-325 is a verbatim copy of the production callback. If the production logic changes, these tests would still pass.
This is understandable -- calling Listen() directly would require substantial mock infrastructure (gRPC stream, K8s client, authentication). But it does mean a refactoring bug in Listen() could go undetected by these tests.
Consider extracting the timer-scheduling and cleanup logic into a small, testable helper method on ControllerService that both Listen() and the tests can call. Alternatively, a follow-up integration test that exercises Listen() through a mock gRPC stream would close the gap.
AI-generated, human reviewed
There was a problem hiding this comment.
Fair point about the tests replicating the timer callback verbatim. However, extracting the timer-scheduling logic into a separate helper would meaningfully increase the scope of this fix -- the current patch is intentionally minimal (a targeted fix for a production issue where orphaned queues accumulate after transient exporter disconnects).
A follow-up PR that refactors the timer logic into a testable helper and adds integration-style tests (with a mock gRPC stream) would be the right way to close this gap without bloating the fix itself. Happy to file an issue to track that.
|
Pushed 57df90a — added inline documentation of the known race limitation at timer expiry, as promised in the reply to @raballew's review comment. The comment documents:
Build and all |
…connect When an exporter's Listen() gRPC stream fails with a transient error the queue is no longer deleted immediately. Instead a cleanup timer (default 2 min) is scheduled. If the exporter reconnects before the timer fires, Listen() cancels the timer and inherits the existing queue — ensuring that any router token already buffered there by a concurrent Dial() call is delivered to the reconnected exporter. On clean shutdown (ctx.Done() — lease ended or server stopping) the timer is cancelled and the queue is removed straight away, so there is no memory leak for the normal lifecycle. Fixes #414
Add inline comment documenting the narrow race window where Dial() could obtain a queue reference just before the cleanup timer callback deletes it, as flagged in review by @raballew. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
57df90a to
fecd007
Compare
Use a 10-second cleanup delay for the first two subtests in TestListenQueueTimerCleanup so the timer cannot fire during the gap between sequential subtests under CI load. Only the third subtest (which verifies timer expiry) uses the short 50ms delay. Addresses review feedback from @raballew. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Pushed 0d372ec — fixes the test flake flagged by @raballew in the latest review. What changed:
CI note: The |
Replace the missing queue cleanup in Listen() with a single defer s.listenQueues.CompareAndDelete(leaseName, queue) call. This fixes issue jumpstarter-dev#414 where a race between Listen() cleanup and Dial() token delivery causes intermittent "Connection to exporter lost" errors in E2E tests. CompareAndDelete only removes the queue if it is still the same channel instance that this invocation created, so a reconnecting exporter's new queue is never accidentally deleted by an old invocation's deferred cleanup. Compared to the timer-based approach in PR jumpstarter-dev#417, this solution: - Eliminates the known race at timer expiry - Requires no additional struct fields (listenTimers) or goroutines - Has no timing-dependent test behavior Generated-By: Forge/20260415_224144_3227186_20142bee Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| t := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| s.listenQueues.Delete(leaseName) | ||
| s.listenTimers.Delete(leaseName) | ||
| }) | ||
| s.listenTimers.Store(leaseName, t) |
There was a problem hiding this comment.
[Medium] When multiple Send() errors fire in rapid succession, time.AfterFunc creates a new timer and Store overwrites the previous map entry without calling Stop() on the old one. The old timer still fires and deletes the queue, so the earliest timer wins rather than the latest. Double-delete on sync.Map is safe, but the cleanup window silently shrinks.
Suggested fix: load and stop the existing timer before creating the new one:
if old, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
old.(*time.Timer).Stop()
}
t := time.AfterFunc(listenQueueCleanupDelay, func() { ... })
s.listenTimers.Store(leaseName, t)AI-generated, human reviewed
There was a problem hiding this comment.
Good catch. Fixed in c239709 -- the error path now calls LoadAndDelete + Stop() on any existing timer before creating the new one. This ensures the latest timer always governs the cleanup window.
| func TestListenQueueTimerCleanup(t *testing.T) { | ||
| original := listenQueueCleanupDelay | ||
| t.Cleanup(func() { listenQueueCleanupDelay = original }) | ||
|
|
||
| svc := &ControllerService{} | ||
| leaseName := "test-lease" | ||
|
|
||
| // Seed the queue as Listen() would via LoadOrStore. | ||
| ch := make(chan *pb.ListenResponse, 8) | ||
| svc.listenQueues.Store(leaseName, ch) | ||
|
|
||
| // Use a long delay for the first two subtests so the timer cannot fire | ||
| // between sequential subtests under CI load (fixes flake when >50ms | ||
| // elapses between subtest boundaries). | ||
| listenQueueCleanupDelay = 10 * time.Second | ||
|
|
||
| // Simulate the stream-error path: schedule deferred cleanup. | ||
| t.Run("queue survives transient error", func(t *testing.T) { | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| }) | ||
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Queue must still be present immediately after the error. | ||
| if _, ok := svc.listenQueues.Load(leaseName); !ok { | ||
| t.Fatal("listen queue was removed immediately after stream error — Dial token would be lost") | ||
| } | ||
| }) | ||
|
|
||
| t.Run("reconnecting exporter cancels cleanup timer", func(t *testing.T) { | ||
| // Simulate Listen() reconnect: cancel the timer and call LoadOrStore. | ||
| if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok { | ||
| raw.(*time.Timer).Stop() | ||
| } | ||
| got, _ := svc.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8)) | ||
| if got != ch { | ||
| t.Fatal("reconnecting Listen() did not inherit the existing queue") | ||
| } | ||
|
|
||
| // Verify the queue is still present — the stopped timer must not | ||
| // have fired. | ||
| if _, ok := svc.listenQueues.Load(leaseName); !ok { | ||
| t.Fatal("listen queue was removed even though cleanup timer was cancelled") | ||
| } | ||
| }) | ||
|
|
||
| t.Run("timer fires and removes queue when exporter does not reconnect", func(t *testing.T) { | ||
| // Shorten the delay so this subtest completes quickly. | ||
| listenQueueCleanupDelay = 50 * time.Millisecond | ||
|
|
||
| // Re-arm the timer without cancelling it this time. | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| }) | ||
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Wait for the timer to fire. | ||
| time.Sleep(listenQueueCleanupDelay * 4) | ||
| if _, ok := svc.listenQueues.Load(leaseName); ok { | ||
| t.Fatal("listen queue was not removed after cleanup timer fired") | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // TestListenQueueCleanShutdown verifies that a clean context cancellation | ||
| // (lease end / server stop) removes the queue immediately without waiting for | ||
| // the cleanup timer. | ||
| func TestListenQueueCleanShutdown(t *testing.T) { | ||
| original := listenQueueCleanupDelay | ||
| listenQueueCleanupDelay = 2 * time.Minute // keep long — must NOT fire during test | ||
| t.Cleanup(func() { listenQueueCleanupDelay = original }) | ||
|
|
||
| svc := &ControllerService{} | ||
| leaseName := "test-lease-shutdown" | ||
|
|
||
| ch := make(chan *pb.ListenResponse, 8) | ||
| svc.listenQueues.Store(leaseName, ch) | ||
|
|
||
| // Arm a timer that should be cancelled before it fires. | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| }) | ||
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Simulate the ctx.Done() path in Listen(). | ||
| if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok { | ||
| raw.(*time.Timer).Stop() | ||
| } | ||
| svc.listenQueues.Delete(leaseName) | ||
|
|
||
| if _, ok := svc.listenQueues.Load(leaseName); ok { | ||
| t.Fatal("listen queue was not removed on clean shutdown") | ||
| } | ||
| if _, ok := svc.listenTimers.Load(leaseName); ok { | ||
| t.Fatal("cleanup timer was not cancelled on clean shutdown") | ||
| } | ||
| } |
There was a problem hiding this comment.
[Medium] Both TestListenQueueTimerCleanup and TestListenQueueCleanShutdown replicate the timer scheduling, cancellation, and map operations from Listen() inline rather than calling Listen() with a mock gRPC stream. This validates the pattern but not the implementation -- a refactoring bug in the timer path of Listen() would go undetected.
Consider adding at least one test that invokes Listen() directly (e.g., with a mock ControllerService_ListenServer that returns an error on the second Send()) to cover the production code path end-to-end.
AI-generated, human reviewed
There was a problem hiding this comment.
This was discussed in the previous review round (see reply to comment r3075337964). The mock gRPC stream infrastructure required to call Listen() directly is substantial -- it needs a mock K8s client, authentication layer, lease objects, and a ControllerService_ListenServer implementation. That level of integration testing would be better suited to a follow-up PR to keep this fix minimal and focused on the production race condition.
The new "buffered token survives disconnect" subtest (added in c239709) does cover the primary user story end-to-end at the sync.Map level, which is where the actual bug lived.
| if _, ok := svc.listenTimers.Load(leaseName); ok { | ||
| t.Fatal("cleanup timer was not cancelled on clean shutdown") | ||
| } | ||
| } |
There was a problem hiding this comment.
[Medium] The core scenario this fix targets is that a Dial() token buffered during a transient disconnect survives until the exporter reconnects. Subtest 2 verifies queue inheritance by channel identity, but no test actually writes a token into the queue and reads it back after reconnect.
Adding a subtest that writes a ListenResponse into the channel between the "transient error" and "reconnect" phases, then reads it back from the inherited queue, would directly cover the primary user story.
AI-generated, human reviewed
There was a problem hiding this comment.
Great suggestion. Added in c239709 -- new subtest "buffered token survives disconnect and is readable after reconnect" writes a ListenResponse into the channel during the disconnect window, simulates the reconnect path, then reads the token back from the inherited queue and verifies identity.
| // Queue must still be present immediately after the error. | ||
| if _, ok := svc.listenQueues.Load(leaseName); !ok { | ||
| t.Fatal("listen queue was removed immediately after stream error — Dial token would be lost") | ||
| } |
There was a problem hiding this comment.
[Low] Em-dash character (U+2014) in this test message. Project guidelines prefer -- over non-ASCII punctuation.
AI-generated, human reviewed
There was a problem hiding this comment.
Fixed in c239709 -- replaced all em-dashes with -- in both the test file and production code.
| // have fired. | ||
| if _, ok := svc.listenQueues.Load(leaseName); !ok { | ||
| t.Fatal("listen queue was removed even though cleanup timer was cancelled") | ||
| } |
There was a problem hiding this comment.
[Low] Same em-dash (U+2014) issue here -- swap to -- per project guidelines.
AI-generated, human reviewed
| t := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| s.listenQueues.Delete(leaseName) | ||
| s.listenTimers.Delete(leaseName) | ||
| }) |
There was a problem hiding this comment.
[Low] When the timer fires, up to 8 buffered Dial tokens in the channel are silently dropped. Adding a log line that includes len(queue) when a non-empty queue is deleted would make it much easier to notice lost tokens in production logs.
AI-generated, human reviewed
There was a problem hiding this comment.
Agreed -- implemented in c239709. The timer callback now uses LoadAndDelete and logs a logger.Info with the lease name and bufferedTokens count (len(queue)) when a non-empty queue is deleted. The logger from the enclosing Listen() scope is captured by the closure.
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Wait for the timer to fire. | ||
| time.Sleep(listenQueueCleanupDelay * 4) |
There was a problem hiding this comment.
[Low] This subtest uses time.Sleep(200ms) to wait for a 50ms timer. A channel-based signal from the callback with a select/timeout would be more robust and avoid potential flakiness under CI load.
AI-generated, human reviewed
There was a problem hiding this comment.
Good point. Fixed in c239709 -- the "timer fires" subtest now uses a done channel that the callback closes, with a select/time.After(5s) fallback timeout instead of time.Sleep.
…est, replace em-dashes - Stop existing timer before creating a new one on rapid Send() failures, preventing the cleanup window from silently shrinking - Add "buffered token survives disconnect" subtest that writes a token into the queue during disconnect and reads it back after reconnect - Replace em-dash (U+2014) with -- per project guidelines - Log lease name and buffered token count when cleanup timer fires - Use channel-based signal instead of time.Sleep in timer-fires subtest Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
Pushed c239709 addressing @raballew's latest review (6 of 7 comments implemented, 1 politely declined): Implemented:
Declined (scope):
All |
Replace the missing queue cleanup in Listen() with a single defer s.listenQueues.CompareAndDelete(leaseName, queue) call. This fixes issue jumpstarter-dev#414 where a race between Listen() cleanup and Dial() token delivery causes intermittent "Connection to exporter lost" errors in E2E tests. CompareAndDelete only removes the queue if it is still the same channel instance that this invocation created, so a reconnecting exporter's new queue is never accidentally deleted by an old invocation's deferred cleanup. Compared to the timer-based approach in PR jumpstarter-dev#417, this solution: - Eliminates the known race at timer expiry - Requires no additional struct fields (listenTimers) or goroutines - Has no timing-dependent test behavior Generated-By: Forge/20260415_224144_3227186_20142bee Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the missing queue cleanup in Listen() with a single defer s.listenQueues.CompareAndDelete(leaseName, queue) call. This fixes issue jumpstarter-dev#414 where a race between Listen() cleanup and Dial() token delivery causes intermittent "Connection to exporter lost" errors in E2E tests. CompareAndDelete only removes the queue if it is still the same channel instance that this invocation created, so a reconnecting exporter's new queue is never accidentally deleted by an old invocation's deferred cleanup. Compared to the timer-based approach in PR jumpstarter-dev#417, this solution: - Eliminates the known race at timer expiry - Requires no additional struct fields (listenTimers) or goroutines - Has no timing-dependent test behavior Generated-By: Forge/20260415_224144_3227186_20142bee Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
Fixes the race condition in
listenQueuescleanup that caused intermittentError: Connection to exporter lostin E2E tests (issue #414).This is the proper follow-up to the revert in #416.
Root Cause
When an exporter's
Listen()gRPC stream exits with a transient error, the queue for that lease must not be deleted immediately — a concurrentDial()call may have already loaded the same queue and be about to (or have already) written a router token into its buffer. If the queue is deleted before the reconnecting exporter callsListen()again, the token is lost and the client times out after 20 s with "Connection to exporter lost".Fix
Instead of cleaning up immediately on stream error, a
time.AfterFunctimer is scheduled forlistenQueueCleanupDelay(default 2 minutes). The reconnect path inListen()cancels this timer vialistenTimers.LoadAndDeletebefore callingLoadOrStore, so the reconnected exporter inherits the existing queue — and any bufferedDialtoken.On clean shutdown (
ctx.Done()— lease ended or server stopping) the timer is cancelled and the queue removed straight away, so there is no memory leak for the normal lifecycle.Changes
controller/internal/service/controller_service.golistenQueueCleanupDelay(var, default 2 min — overridable in tests)listenTimers sync.Mapfield toControllerServiceListen(): cancel pending timer on reconnect; schedule timer on stream error; immediate cleanup onctx.Done()controller/internal/service/controller_service_test.goTestListenQueueTimerCleanup: queue survives transient error; reconnect cancels timer; timer fires when exporter never returnsTestListenQueueCleanShutdown: cleanctx.Done()path removes queue immediatelyTesting
Closes #414
🤖 Generated with Claude Code