DO NOT MERGE fix: per-invocation Listen channels to eliminate dial token loss on reconnect#564
DO NOT MERGE fix: per-invocation Listen channels to eliminate dial token loss on reconnect#564raballew wants to merge 22 commits intojumpstarter-dev:mainfrom
Conversation
Replace the missing queue cleanup in Listen() with a single defer s.listenQueues.CompareAndDelete(leaseName, queue) call. This fixes issue jumpstarter-dev#414 where a race between Listen() cleanup and Dial() token delivery causes intermittent "Connection to exporter lost" errors in E2E tests. CompareAndDelete only removes the queue if it is still the same channel instance that this invocation created, so a reconnecting exporter's new queue is never accidentally deleted by an old invocation's deferred cleanup. Compared to the timer-based approach in PR jumpstarter-dev#417, this solution: - Eliminates the known race at timer expiry - Requires no additional struct fields (listenTimers) or goroutines - Has no timing-dependent test behavior Generated-By: Forge/20260415_224144_3227186_20142bee Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
When a reconnecting Listen() inherits the same channel via LoadOrStore, the old Listen()'s deferred CompareAndDelete would succeed because both hold the same channel reference, incorrectly deleting the map entry that the reconnected Listen() depends on. By wrapping the channel in a unique listenQueue struct per Listen() call and using CompareAndSwap on reconnect, the old Listen()'s CompareAndDelete becomes a no-op since the pointer identity no longer matches. Generated-By: Forge/20260415_224144_3227186_20142bee
When two Listen() goroutines execute concurrently for the same lease, both attempt CompareAndSwap with the same stale reference. The loser's CAS fails, leaving its wrapper unstored. If the winner exits first, its CompareAndDelete removes the entry while the loser still reads from it. Add a retry path that re-loads the current map value and attempts CAS again, ensuring the surviving goroutine always owns the map entry. Generated-By: Forge/20260415_224144_3227186_20142bee
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds per-lease Changes
Sequence Diagram(s)sequenceDiagram
participant Exporter as Exporter (Listen)
participant Controller as ControllerService
participant Client as Client (Dial)
participant Map as listenQueues
Exporter->>Controller: Listen(lease)
Controller->>Map: swapListenQueue(lease, newWrapper)
Map-->>Controller: store newWrapper, signal oldWrapper.done
Note right of Controller: Listen loop reads from newWrapper.ch\nand exits when newWrapper.done closed
Client->>Controller: Dial(lease, token)
Controller->>Controller: acquireLeaseLock(lease)
Controller->>Map: load wrapper for lease
alt wrapper exists and not done
Controller->>newWrapper.ch: enqueue token (non-blocking)
newWrapper.ch-->>Exporter: token delivered
else no active wrapper or done closed
Controller-->>Client: return Unavailable or ResourceExhausted
end
Controller->>Controller: releaseLeaseLock(lease)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
…ader race Replace the shared-channel approach (LoadOrStore + CompareAndSwap) with per-invocation channels and done-signaling. Each Listen() call creates a fresh listenQueue with its own ch and done channels, atomically swaps it into the sync.Map, and closes the previous entry's done channel to signal the old goroutine to stop reading. Dial() now uses Load (not LoadOrStore) to send tokens to the current listener only. This eliminates the logical race where a stale goroutine with a broken gRPC stream could consume and discard a dial token meant for the reconnected goroutine. Generated-By: Forge/20260415_235731_3329604_06ed4455
…check Add a deterministic pre-check of q.done before the send select in Dial, preventing tokens from being silently lost when sent to a queue whose listener has been superseded. Also add a fallback case <-q.done in the main select for races that occur between the pre-check and the send. Generated-By: Forge/20260415_235731_3329604_06ed4455 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done channel was only closed when a listener was superseded by a new Listen call. On normal exit (context cancellation or stream error), done was left open, making it an unreliable signal for Dial's done pre-check. Use sync.Once to close done in a deferred call, ensuring it is always closed exactly once regardless of exit path. Generated-By: Forge/20260415_235731_3329604_06ed4455 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The defer statements in Listen() were ordered such that CompareAndDelete ran before closeDone (LIFO). This created a TOCTOU window where a concurrent Dial could load a queue reference, pass the done check (done still open), and send to a dead queue. Swapping the defer order ensures closeDone() runs first, so any concurrent Dial that loaded the queue reference will see the closed done channel and reject the send before the map entry is removed. Generated-By: Forge/20260415_235731_3329604_06ed4455
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@controller/internal/service/controller_service_test.go`:
- Around line 523-556: Capture the pre-swap listenQueue reference and use it in
a racing send to verify a stale Dial cannot deliver to a superseded queue:
before calling svc.listenQueues.Swap(leaseName, g2Queue) load the existing entry
(v, ok := svc.listenQueues.Load(leaseName); oldQueue := v.(*listenQueue)), then
perform Swap, start a goroutine that tries to send the token to oldQueue.ch
while concurrently invoking the supersession path that closes oldQueue.done (or
calling the helper closeDone()), and assert that the send does not reach
g1Queue.ch (stale) but does reach g2Queue.ch; repeat the same pattern for the
second block to ensure coverage of the Dial/Listen handshake using the same
unique symbols listenQueues.Swap, listenQueues.Load, listenQueue.ch,
listenQueue.done, closeDone(), Dial(), and Listen().
In `@controller/internal/service/controller_service.go`:
- Around line 456-459: Introduce a per-lease serialization (mutex or handoff) on
the listenQueue to prevent a race between superseding a queue and delivering a
token: add a lock field (e.g., mu sync.Mutex) to listenQueue and acquire it in
both the swap/close path (s.listenQueues.Swap + prev.closeDone) and the send
path (places in Dial where you do q.ch <- response and in Listen's loop) so that
the swap/close routine takes the lock before marking/closing the queue and the
sender acquires the same lock before checking that q is still the current queue
(reload s.listenQueues.Load(leaseName) or compare a generation ID) and
performing the send; release the lock after the check/send. This ensures the
swap and the token send are serialized and prevents stale sends from succeeding.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 235a6179-b75c-469c-be4f-da746cd684e0
📒 Files selected for processing (2)
controller/internal/service/controller_service.gocontroller/internal/service/controller_service_test.go
Address review findings F001, F002, F005, F006, F007, F008: - Add tests exercising listenQueue integration with actual struct behavior - Rename misleading TestListenQueueConcurrentReadersAreNonDeterministic - Add test for Dial returning Unavailable with no listener - Add concurrent Dial-during-reconnection test - Add context cancellation test - Run tests with -race flag Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The done-channel approach has a TOCTOU race: if Dial loads a queue reference before Listen reconnects, then both <-q.done and q.ch <- response are ready in the select (buffered channel), and Go may non-deterministically pick the send, delivering the token to a superseded queue. Add a per-lease mutex (leaseLocks sync.Map) that serializes swapListenQueue (Swap + closeDone) with sendToListener (Load + check + send). This guarantees that the queue loaded in Dial cannot be superseded during the send. Also replace custom contains/searchSubstring helpers with strings.Contains and add tests covering the stale-Dial scenario with pre-swap queue references. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…der backpressure When the listenQueue channel buffer is full, sendToListener blocks on the channel send while holding the per-lease mutex. This prevents a reconnecting Listen from acquiring the mutex to swap the queue, creating a deadlock chain. Adding the Dial caller's context to the select allows the blocked send to be cancelled when the Dial client disconnects, releasing the mutex for the reconnecting Listen to proceed. Generated-By: Forge/20260416_070332_3699740_7b2bda71 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Extract channel draining logic from TestListenQueueConcurrentDialDuringReconnection into a reusable drainChannel function, replacing the goto drained/g2drained pattern with idiomatic Go. Generated-By: Forge/20260416_073038_3739633_70e0127f
The leaseLocks sync.Map grew unboundedly because per-lease mutex entries were never deleted. Now when Listen exits and CompareAndDelete successfully removes the queue (meaning no new listener took over), the corresponding leaseLocks entry is also deleted. Generated-By: Forge/20260416_073038_3739633_70e0127f
Exercises the full deadlock-avoidance chain: buffer full, sendToListener blocks holding the per-lease mutex, swapListenQueue blocks on the mutex, context cancellation unblocks sendToListener, and swapListenQueue proceeds. Generated-By: Forge/20260416_073038_3739633_70e0127f
Add -race flag to the go test invocation in the Makefile test target, which is used by the controller-tests CI workflow. This ensures data races are detected in CI. Generated-By: Forge/20260416_073038_3739633_70e0127f
Extract repeated string literal "tok" into testRouterToken constant and remove trailing blank line to satisfy golangci-lint checks. Generated-By: Forge/20260416_075157_11117_6c67a5ce Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace the manual Load+select TOCTOU pattern in TestListenQueueConcurrentDialDuringReconnection with calls to svc.sendToListener() and svc.swapListenQueue(), exercising the actual production code path that serializes Dial with reconnecting Listen via the per-lease mutex. Generated-By: Forge/20260416_075157_11117_6c67a5ce
…p ops Refactor tests to use swapListenQueue() and sendToListener() instead of directly calling listenQueues.Store(), listenQueues.Swap(), and close(done). This ensures tests exercise the actual production code paths including per-lease mutex serialization, rather than testing a different (pre-fix) code path that bypasses the TOCTOU protection. Generated-By: Forge/20260416_075157_11117_6c67a5ce
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@controller/internal/service/controller_service_test.go`:
- Around line 588-592: The current select between g1Queue.done and g1Queue.ch
can mask stale deliveries because both may be ready and the select is
randomized; change the test to first wait/receive from g1Queue.done
deterministically, then immediately perform a non-blocking probe of g1Queue.ch
(e.g., a select with case <-g1Queue.ch: staleWins++ and a default: no-op) so
stale tokens are detected reliably; update the logic around g1Queue.done,
g1Queue.ch and staleWins accordingly in controller_service_test.go.
In `@controller/internal/service/controller_service.go`:
- Around line 96-99: getLeaseLock currently returns raw *sync.Mutex from
s.leaseLocks which allows concurrent cleanup to remove the map entry and break
serialization; replace the raw mutex with a small per-lease struct (e.g. type
leaseLock struct { mu sync.Mutex; refs int32 }) stored in s.leaseLocks, change
getLeaseLock to atomically load-or-store that struct and increment refs, and add
a releaseLeaseLock(leaseName string) that decrements refs and only removes the
map entry when refs reaches zero while holding the leaseLock.mu; update call
sites (Listen, Dial, swapListenQueue, sendToListener and the cleanup path that
closes wrapper.done) to call getLeaseLock and defer releaseLeaseLock so the map
entry (and closing wrapper.done) cannot race with in-flight
sendToListener/swapListenQueue.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: c365e2c1-3530-4b7f-b660-8cec55925fbf
📒 Files selected for processing (3)
controller/Makefilecontroller/internal/service/controller_service.gocontroller/internal/service/controller_service_test.go
|
the flakiness I suspect is not related to this https://github.com/jumpstarter-dev/jumpstarter/pull/564/changes |
|
Based on conversation with @mangelajo this might not be the root cause for the 10% flakiness we currently see but another issue we saw in roughly 0.1% of CI runs where TLS connections were terminated unexpectedly - still worth to investigate/fix. |
…ks entries The Listen cleanup path called closeDone() outside the per-lease mutex, which allowed an in-flight sendToListener to see a partially-torn-down queue. Worse, leaseLocks.Delete could remove the mutex while a concurrent Listen or Dial still references it, breaking serialization guarantees. Fix by acquiring the lease mutex before calling closeDone() in the cleanup defer, and by never deleting leaseLocks entries (they are tiny and bounded by the number of distinct lease names seen). Also fix the stale-reader detection test to check done and ch deterministically instead of relying on random select ordering. Generated-By: Forge/20260416_105202_199878_b08a2035
Replace the blocking channel send in sendToListener with a non-blocking send that returns ResourceExhausted when the listener buffer is full. Previously, sendToListener held the per-lease mutex while blocking on a full channel, which prevented swapListenQueue (reconnecting listeners) and other Dial attempts from proceeding until the RPC context timed out. Generated-By: Forge/20260416_105202_199878_b08a2035 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| func (s *ControllerService) sendToListener(_ context.Context, leaseName string, response *pb.ListenResponse) error { | ||
| mu := s.getLeaseLock(leaseName) | ||
| mu.Lock() | ||
| defer mu.Unlock() | ||
| v, ok := s.listenQueues.Load(leaseName) | ||
| if !ok { | ||
| return status.Errorf(codes.Unavailable, "exporter is not listening on lease %s", leaseName) | ||
| } | ||
| q := v.(*listenQueue) | ||
| select { | ||
| case <-q.done: | ||
| return status.Errorf(codes.Unavailable, "exporter is not listening on lease %s", leaseName) | ||
| default: | ||
| } | ||
| select { | ||
| case q.ch <- response: | ||
| return nil | ||
| default: | ||
| return status.Errorf(codes.ResourceExhausted, "listener buffer full on lease %s", leaseName) | ||
| } | ||
| } |
There was a problem hiding this comment.
[MEDIUM] The old Dial() implementation would block on a buffered channel (with select on ctx.Done()), so a caller whose buffer was temporarily full would eventually succeed once the exporter drained it. The new sendToListener uses a non-blocking send and returns ResourceExhausted immediately when the 8-slot buffer is full.
This means a Dial() that previously would have succeeded after a short wait now fails outright. It also means that under context cancellation with a full buffer, clients receive ResourceExhausted instead of the context error they used to get.
In practice this only fires under significant backpressure (8+ concurrent Dial calls to the same lease faster than the exporter processes them), and the change is necessary to avoid mutex starvation in the new per-lease lock design. Still worth noting:
- Consider documenting this behavioral change in the PR description so reviewers and downstream consumers are aware.
- Verify that gRPC clients already retry on
ResourceExhausted. If they do not, adding client-side retry handling would prevent user-visible regressions.
AI-generated, human reviewed
…e cleanup The previous getLeaseLock returned raw *sync.Mutex pointers from a sync.Map that was never cleaned up, leaking memory. Replace with acquireLeaseLock / releaseLeaseLock using atomic ref counting so the map entry is removed when the last listener releases. Ensure closeDone in the Listen defer runs under the lease mutex for proper serialization. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ener test Remove the racy counter increment from TestLeaseLockRefCountConcurrentAcquireRelease since goroutines that acquire-release quickly may get different mutex instances. Add TestLeaseLockRefCountConcurrentOverlappingListeners that uses a barrier to ensure all goroutines hold a reference before using the mutex, matching the real Listen lifecycle pattern. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Runs the E2E suite 100 times in parallel (max 20 concurrent) with fail-fast enabled. Builds images once, then each matrix entry sets up its own Kind cluster and runs the full test. Triggered manually via workflow_dispatch only. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (2)
.github/workflows/e2e-stress.yaml (1)
84-88:fail-fast: truemay conflict with flakiness detection goals.When
fail-fast: true, a single failure cancels all remaining iterations. For flakiness detection, you typically want all 100 runs to complete to determine the actual failure rate (e.g., "3 out of 100 failed = 3% flaky"). With fail-fast enabled, the first failure stops everything.Consider setting
fail-fast: falseif the objective is to measure the flakiness percentage.🔧 Suggested change
strategy: - fail-fast: true + fail-fast: false max-parallel: 20 matrix: run: [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100]🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In @.github/workflows/e2e-stress.yaml around lines 84 - 88, The workflow currently sets strategy.fail-fast: true which will cancel remaining matrix runs on the first failure and prevents completing all 100 iterations for flakiness measurement; update the GitHub Actions job configuration to set strategy.fail-fast to false (while keeping strategy.max-parallel and matrix.run as-is) so all runs execute and you can compute an accurate failure rate across the matrix.controller/internal/service/controller_service_test.go (1)
601-633: Test logic is correct but unnecessarily complex.Since
q.doneis closed on line 606, the firstselectwill always immediately read from<-q.doneand setrejected = true. The subsequentif !rejectedblock never executes, making it dead code.The test still validates the intended behavior (detecting a superseded queue via closed
donechannel), but could be simplified.🧹 Simplified version
func TestDialRejectsSupersededQueue(t *testing.T) { q := &listenQueue{ ch: make(chan *pb.ListenResponse, 8), done: make(chan struct{}), } close(q.done) response := &pb.ListenResponse{RouterEndpoint: "ep", RouterToken: testRouterToken} - rejected := false select { case <-q.done: - rejected = true - default: - } - if !rejected { - select { - case <-q.done: - rejected = true - case q.ch <- response: - } - } - - if !rejected { - t.Fatal("dial must reject send to a queue whose done channel is closed") + // Superseded queue detected - this is the expected path + case q.ch <- response: + t.Fatal("dial must reject send to a queue whose done channel is closed") } select { case <-q.ch: t.Fatal("token should not have been buffered in a superseded queue") default: } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@controller/internal/service/controller_service_test.go` around lines 601 - 633, The test TestDialRejectsSupersededQueue is correct but has dead code because q.done is closed before the first select; simplify by removing the redundant rejected flag and the second if block: after creating listenQueue and closing q.done, build the response and use a single non-blocking select that either reads from <-q.done (marking rejection) or tries to send to q.ch; assert that <-q.done was selected (rejected) and then assert q.ch is empty by trying a non-blocking receive from q.ch and failing if a value is returned; refer to listenQueue, q.done, q.ch, and TestDialRejectsSupersededQueue when making the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In @.github/workflows/e2e-stress.yaml:
- Around line 84-88: The workflow currently sets strategy.fail-fast: true which
will cancel remaining matrix runs on the first failure and prevents completing
all 100 iterations for flakiness measurement; update the GitHub Actions job
configuration to set strategy.fail-fast to false (while keeping
strategy.max-parallel and matrix.run as-is) so all runs execute and you can
compute an accurate failure rate across the matrix.
In `@controller/internal/service/controller_service_test.go`:
- Around line 601-633: The test TestDialRejectsSupersededQueue is correct but
has dead code because q.done is closed before the first select; simplify by
removing the redundant rejected flag and the second if block: after creating
listenQueue and closing q.done, build the response and use a single non-blocking
select that either reads from <-q.done (marking rejection) or tries to send to
q.ch; assert that <-q.done was selected (rejected) and then assert q.ch is empty
by trying a non-blocking receive from q.ch and failing if a value is returned;
refer to listenQueue, q.done, q.ch, and TestDialRejectsSupersededQueue when
making the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 9f942f45-acca-4593-84df-81792525da7f
📒 Files selected for processing (3)
.github/workflows/e2e-stress.yamlcontroller/internal/service/controller_service.gocontroller/internal/service/controller_service_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- controller/internal/service/controller_service.go
|
I will start on a clean slate again. This does not fix the 10% flakes. |
DO NOT MERGE YET - I NEED CI TO CHECK FOR FLAKINESS
Summary
Fixes #414
Root cause
The flaky
can lease and connect to exportersE2E test fails because two concurrentListen()goroutines read from the same channel, causing Dial tokens to be consumed by the stale goroutine and silently discarded.The sequence:
Listen(lease)-- goroutine G1 blocks on<-chin the select loopselect, reading fromchListen(lease)again --LoadOrStorereturns the same channel -- goroutine G2 also reads fromchDial(lease)-- sends router token intochstream.Send(token)which fails because the stream is broken. Token is consumed and discarded.Connection to exporter lostThis is a logical race, not a data race -- Go's race detector cannot catch it because channel operations are synchronized.
Fix
Replace the shared-channel design with per-invocation channels and done-channel supersession:
LoadOrStorereturns the same shared channel to allListen()goroutinesSwapgives eachListen()its own channeldonechannel is closed, signaling it to exitDial()usesLoadOrStore(creates queue if missing)Dial()usesLoad+ done-check (fails fast if no active listener)Key details:
Listen()call creates a freshlistenQueue{ch, done, closeOnce}and usessync.Map.Swap()to atomically replace the previous entrydonechannel is closed, signaling the old goroutine to exit viacase <-wrapper.done: return nilsync.Once-wrappedcloseDone()prevents double-close panics across all exit pathsdefer CompareAndDeleteensures cleanup only removes the entry if this goroutine is still the current ownerDial()pre-checks thedonechannel before sending, avoiding writes to a superseded queueTest plan
TestListenQueueStaleReaderConsumesDialTokenverifies stale goroutines cannot consume tokens;TestListenQueueConcurrentReadersAreNonDeterministicconfirms 0/100 stale winsTestListenQueueSupersessionSignaling,TestListenQueueConcurrentSwapSupersedes(3 goroutines),TestListenQueueDialTokenDeliveredToNewListenerTestListenQueueCompareAndDeleteOnStreamError,TestListenQueueCompareAndDeleteOnCleanShutdown,TestListenQueueReconnectPreventsStaleCleanup,TestListenQueueReconnectCreatesNewChannelcan lease and connect to exportersshould no longer flake (~10% failure rate before this change)