Skip to content

fix: free bulkT on dispatch abort to prevent memory leak under load#6747

Open
ycombinator wants to merge 8 commits intoelastic:mainfrom
ycombinator:fix/free-blk-on-dispatch-abort
Open

fix: free bulkT on dispatch abort to prevent memory leak under load#6747
ycombinator wants to merge 8 commits intoelastic:mainfrom
ycombinator:fix/free-blk-on-dispatch-abort

Conversation

@ycombinator
Copy link
Copy Markdown
Contributor

@ycombinator ycombinator commented Apr 3, 2026

What is the problem this PR solves?

During a 30k Serverless scale test, 22 of 39 fleet-server pods were OOMKilled. Analysis of the captured pod logs showed:

  • An upgrade + policy reassignment storm caused checkin durations to spike from normal (~ms) to 5-45 seconds average.
  • This created ~479 concurrent checkins per pod, each blocked in dispatch() waiting to enqueue onto the bulk engine's channel (capacity 32).
  • Elasticsearch was not the bottleneck — ES had ~88% free heap after GC with sub-30ms pause times.
  • The memory growth came from bulkT objects allocated via sync.Pool in dispatch(). On the success path, callers (waitBulkAction, Search, ReadRaw) return the blk to the pool via freeBlk. But when dispatch() aborted due to context cancellation (agent disconnect), neither dispatch nor the callers freed the blk, causing it to leak.
  • Under sustained load, thousands of blk objects leaked per minute until pods hit their memory limit (~154 Mi) and were killed.

How does this PR solve the problem?

When dispatch() aborts due to context cancellation, the bulkT object is never returned to the sync.Pool. Under high concurrency (30k+ agents), thousands of blk objects leak per minute during checkin storms, causing memory growth until OOM.

The abort can occur in two places:

  • When the abort occurs before the blk is enqueued, free it immediately since the Run loop never received it:
    case <-ctx.Done():
    zerolog.Ctx(ctx).Error().
    Err(ctx.Err()).
    Str("mod", kModBulk).
    Str("action", blk.action.String()).
    Bool("refresh", blk.flags.Has(flagRefresh)).
    Dur("rtt", time.Since(start)).
    Msg("Dispatch abort queue")
    b.freeBlk(blk)
    return respT{err: ctx.Err()}
  • When the abort occurs after the blk is enqueued, spawn a small drain goroutine that waits for the flush response, then frees the blk:
    case <-ctx.Done():
    zerolog.Ctx(ctx).Error().
    Err(ctx.Err()).
    Str("mod", kModBulk).
    Str("action", blk.action.String()).
    Bool("refresh", blk.flags.Has(flagRefresh)).
    Dur("rtt", time.Since(start)).
    Msg("Dispatch abort response")
    // blk is in the Run loop's queue; drain the response and free asynchronously.
    // Use a timeout based on the flush interval so the goroutine doesn't block
    // forever if the Run loop never processes this blk (e.g. during shutdown).
    // Three flush intervals should give enough time for the blk to be picked up,
    // sent to ES, and responded to, even under heavy load.
    go func() {
    select {
    case <-blk.ch:
    b.freeBlk(blk)
    case <-time.After(3 * defaultFlushInterval):
    }
    }()
    }

How to test this PR locally

go test -race ./internal/pkg/bulk/ -run TestDispatch -v
go test -bench=BenchmarkDispatch -benchmem -count=5 ./internal/pkg/bulk/

Benchmark results

$ go test -bench=BenchmarkDispatch -benchmem -count=5 ./internal/pkg/bulk/

Before

  BenchmarkDispatchAbortQueue-12         2834217               435.3 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortQueue-12         2806820               423.8 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortQueue-12         2851818               424.0 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortQueue-12         2849653               420.2 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortQueue-12         2857449               423.2 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortResponse-12      2526525               459.9 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortResponse-12      2493390               489.2 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortResponse-12      2451482               496.2 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortResponse-12      2455975               500.9 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchAbortResponse-12      2489379               508.6 ns/op           488 B/op          7 allocs/op
  BenchmarkDispatchSuccess-12            1294516               931.3 ns/op           264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1297554               932.0 ns/op           264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1243687               941.3 ns/op           264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1288376               936.3 ns/op           264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1289594               928.8 ns/op           264 B/op          4 allocs/op

After

  BenchmarkDispatchAbortQueue-12         3785550               313.1 ns/op           248 B/op          3 allocs/op
  BenchmarkDispatchAbortQueue-12         3879040               313.5 ns/op           248 B/op          3 allocs/op
  BenchmarkDispatchAbortQueue-12         3791301               322.9 ns/op           248 B/op          3 allocs/op
  BenchmarkDispatchAbortQueue-12         3825862               313.4 ns/op           248 B/op          3 allocs/op
  BenchmarkDispatchAbortQueue-12         3864537               315.0 ns/op           248 B/op          3 allocs/op
  BenchmarkDispatchAbortResponse-12      1000000              1981 ns/op             886 B/op          8 allocs/op
  BenchmarkDispatchAbortResponse-12      1000000              1395 ns/op             891 B/op          8 allocs/op
  BenchmarkDispatchAbortResponse-12      1776688               780.4 ns/op           889 B/op          8 allocs/op
  BenchmarkDispatchAbortResponse-12      1560524               743.1 ns/op           891 B/op          8 allocs/op
  BenchmarkDispatchAbortResponse-12      1950848               679.8 ns/op           674 B/op          7 allocs/op
  BenchmarkDispatchSuccess-12            1000000              1061 ns/op             264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1000000              1005 ns/op             264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1249441               996.3 ns/op           264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1359787               911.4 ns/op           264 B/op          4 allocs/op
  BenchmarkDispatchSuccess-12            1312446               891.9 ns/op           264 B/op          4 allocs/op

The AbortResponse path shows higher B/op because it includes the drain goroutine overhead but the blk gets reclaimed.

Design Checklist

  • I have ensured my design is stateless and will work when multiple fleet-server instances are behind a load balancer.
  • I have or intend to scale test my changes, ensuring it will work reliably with 100K+ agents connected.
  • I have included fail safe mechanisms to limit the load on fleet-server: rate limiting, circuit breakers, caching, load shedding, etc.

Checklist

  • I have commented my code, particularly in hard-to-understand areas
  • I have added tests that prove my fix is effective or that my feature works

Related issues

🤖 Generated with Claude Code

@mergify
Copy link
Copy Markdown
Contributor

mergify bot commented Apr 3, 2026

This pull request does not have a backport label. Could you fix it @ycombinator? 🙏
To fixup this pull request, you need to add the backport labels for the needed
branches, such as:

  • backport-./d./d is the label to automatically backport to the 8./d branch. /d is the digit
  • backport-active-all is the label that automatically backports to all active branches.
  • backport-active-8 is the label that automatically backports to all active minor branches for the 8 major.
  • backport-active-9 is the label that automatically backports to all active minor branches for the 9 major.

@ycombinator ycombinator marked this pull request as ready for review April 3, 2026 20:04
@ycombinator ycombinator requested a review from a team as a code owner April 3, 2026 20:04
@pierrehilbert pierrehilbert added the Team:Elastic-Agent-Control-Plane Label for the Agent Control Plane team label Apr 4, 2026
Comment on lines +644 to +649
go func() {
select {
case <-blk.ch:
b.freeBlk(blk)
case <-time.After(3 * defaultFlushInterval):
}
Copy link
Copy Markdown
Contributor

@michel-laterman michel-laterman Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a time based approach, is it possible to close blk.ch when the context is cancelled and detect/drain it here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure closing blk.ch when the context is cancelled in this function is a good idea. The senders on this channel, e.g. from the flushBulk method in opBulk.go would still keep sending and sending on a closed channel will panic.

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the newer b.Loop() mechanism

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 00949c2.

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, b.Loop() can be used

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 00949c2.

Copy link
Copy Markdown
Contributor

@michel-laterman michel-laterman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new tests are not using the require package like all the other tests?
I'm also concerned out the sleep times in the tests that can easily lead to flakey builds in BK

Comment on lines +28 to +30
if resp.err == nil {
t.Fatal("expected error from cancelled context")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require.NoError?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4bb5dde.

Comment on lines +35 to +40
if reused.buf.Len() != 0 {
t.Fatal("expected reused blk to have reset buf")
}
if reused.action != 0 {
t.Fatal("expected reused blk to have reset action")
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require.NotZero?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 4bb5dde.


// Let dispatch enqueue, then cancel context to trigger Phase 2 abort.
go func() {
time.Sleep(20 * time.Millisecond)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was 20ms chosen? timeouts like this can be really problematic in buildkite.
Is there another way to detect the dispatch?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote the test to not rely on timeouts in fc690ea.

blk.ch <- respT{}

// Give the drain goroutine time to complete.
time.Sleep(50 * time.Millisecond)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 50ms the dispatch time? Why was this used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewrote the test to not rely on timeouts in fc690ea.

@ycombinator ycombinator force-pushed the fix/free-blk-on-dispatch-abort branch 2 times, most recently from 129fc50 to 1f40cf4 Compare April 8, 2026 17:44
ycombinator and others added 8 commits April 8, 2026 21:31
When dispatch() aborts due to context cancellation, the bulkT object was
never returned to the sync.Pool. Under high concurrency (30k+ agents),
thousands of blk objects leaked per minute during checkin storms, causing
memory growth until OOM.

When the abort occurs before the blk is enqueued, free it immediately
since the Run loop never received it:
https://github.com/elastic/fleet-server/blob/0fec36cc/internal/pkg/bulk/engine.go#L607-L615

When the abort occurs after the blk is enqueued, spawn a small drain
goroutine that waits for the flush response, then frees the blk:
https://github.com/elastic/fleet-server/blob/0fec36cc/internal/pkg/bulk/engine.go#L630-L638

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use 3x the flush interval (15s) instead of the flush context timeout
(1m) for the drain goroutine's safety timeout. This keeps abandoned
goroutines short-lived while still giving enough time for the Run
loop to process the blk under heavy load.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Use Go 1.24's b.Loop() mechanism in the dispatch benchmarks instead of
the legacy `for i := 0; i < b.N; i++` loop.

Because b.Loop() makes b.N unknown up-front, BenchmarkDispatchAbortResponse
can no longer pre-size the block queue; instead, a drain goroutine empties
bulker.ch so each iteration reliably enters Phase 2.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace hand-rolled error/value checks in the dispatch tests with
require.Error / require.NoError / require.Zero, matching the convention
used elsewhere in the repo.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
The previous Phase 2 abort test used time.Sleep to guess when dispatch
had reached its second select and how long the drain goroutine needed
to run, making it prone to flakiness on Buildkite.

Extract the inline drain goroutine in dispatch() into a method
drainAndFreeAbortedBlk so it can be invoked synchronously from tests.
Also promote 3 * defaultFlushInterval to a named constant
dispatchAbortDrainTimeout.

Split the old test into two:

- TestDispatchAbortResponseReachesPhase2 verifies that dispatch() returns
  ctx.Err() after the Run loop has taken the blk, using channel-based
  synchronization (no sleeps) to deterministically time the cancel.

- TestDrainAndFreeAbortedBlkResponse invokes drainAndFreeAbortedBlk
  synchronously, so the test goroutine is the sole owner of blk and
  can inspect its reset fields race-free (blk.reset is only called
  from freeBlk, so the reset observation proves the free happened).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Replace "Phase 1" / "Phase 2" terminology in dispatch-related comments
and test names with references to dispatch's first and second select()
blocks, which is more self-explanatory for readers unfamiliar with the
prior review discussion.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Explain why ctx is cancelled before dispatch is invoked: to make the
first select() deterministically take the abort path so blk is never
enqueued.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add a comment on bulkT.reset noting that only freeBlk should call it,
since TestDrainAndFreeAbortedBlkResponse relies on that invariant to
use reset as an observable proxy for freeBlk having run.

Also split the two concerns in the test comment — synchronous execution
(for race-free reads) and the reset-as-proxy assertion — onto separate
lines so they're not conflated.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@ycombinator ycombinator force-pushed the fix/free-blk-on-dispatch-abort branch from 1f40cf4 to a4f4564 Compare April 9, 2026 04:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Team:Elastic-Agent-Control-Plane Label for the Agent Control Plane team

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants