Async-IO pipeline on the FusedPQ search path#8
Merged
Conversation
When the GraphSearcher iterates the candidate heap on level 0 with a FusedPQ
score function, each visited node requires a contiguous read of the per-node
block (PQ codes + degree + neighbor ids). With a remote-storage RandomAccessReader,
that read dominates wall time: profiling a workload running on herddb's
RemoteRandomAccessReader shows 53% of FJ-worker wall time blocked in
RemoteRandomAccessReader.readFully serving FusedPQ$PackedNeighbors.readInto.
The underlying network client supports concurrent reads but the jvector API
exposes only synchronous reads, so the IO is forced to serialize.
This change introduces a non-blocking range read on RandomAccessReader and
uses it from GraphSearcher to overlap the next visited node's IO with the
current node's similarity compute:
* RandomAccessReader.readRangeAsync(long, int): default sync fallback
(position-preserving) so local / mmap readers are unaffected;
network-backed readers can override to dispatch a true async read.
* OnDiskGraphIndex.View.readPackedNeighborsAsync(int): reads the contiguous
per-node block in one shot and parses it into a self-contained
PackedNeighborData (codes + neighbors[] + degree), so two reads can be
in flight without sharing scratch buffers.
* ScoreFunction.enableSimilarityToNeighbors(int, ByteSequence<?>) +
FusedPQDecoder override: consume already-loaded codes, bypassing the
implicit disk read inside the decoder.
* GraphSearcher.searchOneLayer dispatches to a new searchOneLayerAsync
when conditions match (level 0, FusedPQ view, score-fn supports
neighbor-batch) and the new setAsyncPipelineEnabled(true) is set.
The async loop is a 2-slot pipeline: while waiting on the current
node's read, it peeks the candidate heap and starts a speculative
read for the next likely-visited node. On peek hits, that future is
consumed for free; on peek misses, the bytes still land in the
reader's block cache (if any) for later reuse. Node visit order and
pop scoring are unchanged, so search results are bit-equivalent to
the sync path.
A new TestAsyncPipelineSearch checks (a) the default readRangeAsync returns
the same bytes as a sync seek+readFully and preserves the reader position,
and (b) sync vs pipeline search produce identical node ids and scores on a
deterministic FusedPQ graph.
Opt-in via GraphSearcher.setAsyncPipelineEnabled(true); intended for callers
that supply a RandomAccessReader with a non-blocking readRangeAsync override.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
eolivelli
added a commit
to eolivelli/herddb
that referenced
this pull request
May 13, 2026
The parallel-fusedpq-io PR branch was merged into main, so both CI workflows can go back to checking out main. Drops the transient ref: parallel-fusedpq-io pin (and the explanatory comment) introduced earlier in this PR.
eolivelli
added a commit
to eolivelli/herddb
that referenced
this pull request
May 13, 2026
…r parallel FusedPQ IO (#549) Fixes #547. ## Changes - **`.github/workflows/ci.yml` + `kubernetes-tests.yml`**: pin jvector checkout to `parallel-fusedpq-io` branch while [eolivelli/jvector#8](eolivelli/jvector#8) is unmerged. Both workflows had `ref: main`; must track the PR branch to pick up the new `readRangeAsync` API. TODO: revert to `ref: main` once the PR merges. - **`SegmentBlockCache`**: add `AsyncBlockLoader` functional interface + `getBlockAsync(path, offset, length, AsyncBlockLoader)` method. Uses a `ConcurrentHashMap<BlockKey, CompletableFuture<ByteBuf>> inFlightAsync` for single-flight deduplication: concurrent misses for the same block share one loader call, each getting an independent retained slice. Hit path is an atomic `computeIfPresent` retain under the Caffeine map lock — the same pattern as `getBlock`. Cache insertion uses `compute()` to handle the rare race with a concurrent sync `getBlock` call. RefCnt ownership rules (refCnt=2 on insert: cache ref + caller ref) are identical to the sync path. - **`RemoteRandomAccessReader`**: override `readRangeAsync(long, int)` from jvector's `RandomAccessReader`. Does NOT touch `position`, `blockBuffer`, or `bufferedBlockIndex` (async reads bypass the sliding-window cursor). Single-block fast path calls one `getBlockAsync`; multi-block path fires all covering blocks in parallel with `allOf`, assembling a heap `ByteBuffer` in the completion callback. All `ByteBuf` slices are released in a `finally` block (both success and failure paths) so no off-heap memory escapes. `fetchBlockFromRemoteAsync` helper mirrors `fetchBlockFromRemote` — clamps length to avoid reading past EOF, updates `rfs_client_read_*` counters and `VectorSearchRequestContext` accounting. - **`PersistentVectorStore`**: add `static volatile boolean searchAsyncPipelineEnabled` with `setSearchAsyncPipelineEnabled()` / `isSearchAsyncPipelineEnabled()`. Mirrors the `setStreamingCompactionEnabled` pattern. Default `false` until a production profile confirms a net win. - **`VectorSegment.search()`**: after getting or creating the `ThreadLocal`-cached `GraphSearcher`, call `searcher.setAsyncPipelineEnabled(PersistentVectorStore.searchAsyncPipelineEnabled)` on every search invocation. Cheap field write; enables runtime toggle without discarding cached searcher instances. - **`IndexingServerConfiguration`**: add `PROPERTY_VECTOR_SEARCH_ASYNC_PIPELINE_ENABLED` / `SYSPROP_VECTOR_SEARCH_ASYNC_PIPELINE_ENABLED` constants (default `false`). - **`IndexingServiceEngine`**: read the new property at IS startup and call `PersistentVectorStore.setSearchAsyncPipelineEnabled()`; config key takes precedence over system property. Logs the resolved value. ## Tests - **`RemoteRandomAccessReaderAsyncTest`** (11 cases, Netty PARANOID leak detector): `readRangeAsync` returns identical bytes to `seek+readFully` for single-block, cross-block, three-block, and end-of-file ranges; position is invariant after async reads; 16 concurrent async reads alongside a synchronous `readFully` loop on the same reader instance produce correct bytes with no refcount exceptions; cache warm-up via sync path is visible to subsequent async reads; disabled-cache pass-through works. - **`SegmentBlockCacheAsyncTest`** (7 cases, Netty PARANOID leak detector): pass-through on disabled cache; miss populates then next call is a hit; sync miss populates then async call is a hit; 16 concurrent async misses share exactly one loader call; each concurrent caller receives an independent `ByteBuf` slice; loader failure propagates to all waiting callers; multi-threaded async loads are byte-correct. 🤖 Implemented by the `pr-worker` agent.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Reduces
GraphSearcherwall time on the layer-0 FusedPQ path for callers backed by a remote/asyncRandomAccessReader(e.g. herddb'sRemoteRandomAccessReader). While waiting on the current frontier node's read, the search loop now kicks off a speculative read for the next likely-visited node — overlapping IO with similarity compute.Profiling motivation (herddb indexing-service workload):
GraphSearcher.search/searchOneLayerOnDiskGraphIndex$View.processNeighbors(FusedPQ branch)RemoteRandomAccessReader.readFullyservingFusedPQ$PackedNeighbors.readIntoThe herddb network client already supports concurrent reads via
readFileRangeAsByteBufAsync, but the jvectorRandomAccessReaderAPI is purely synchronous, forcing every read to serialize on the calling thread.Changes
RandomAccessReader.readRangeAsync(long, int)— new default method returningCompletableFuture<ByteBuffer>. Default is a position-preserving sync fallback so mmap / local readers are unaffected; network-backed readers override to dispatch a truly non-blocking read.OnDiskGraphIndex.View.readPackedNeighborsAsync(int)— reads one node's contiguous block (PQ codes + degree + neighbor ids) and parses into a self-containedPackedNeighborDataso two reads can be in flight without sharing scratch buffers.ScoreFunction.enableSimilarityToNeighbors(int, ByteSequence<?>)+FusedPQDecoderoverride — consume already-loaded codes, bypassing the implicit disk read.GraphSearcher— newsearchOneLayerAsyncimplementing a 2-slot pipeline. Same node visit order and pop scoring as the sync path, so search results are bit-equivalent.GraphSearcher.setAsyncPipelineEnabled(true); disabled by default. The pipeline branch only activates whenlevel == 0, view isOnDiskGraphIndex.Viewwith aFUSED_PQfeature, and the score function supports neighbor-batch similarity.Tradeoffs vs a
void prefetch(offset, len)hint designA simpler "fire-and-forget prefetch" design was considered first. The async-read primitive was preferred because it (a) gives the search loop explicit ownership of the pipeline depth, (b) bounds per-node IO at 2× (vs
1 + fanoutfor a hint with a tuning knob), and (c) produces a deterministic wall-time win that doesn't depend on the underlying block-cache cooperating with the hint.Test plan
TestAsyncPipelineSearch.testDefaultReadRangeAsyncMatchesSync— default sync fallback returns identical bytes toseek+readFullyand preserves the reader's position.TestAsyncPipelineSearch.testAsyncPipelineMatchesSyncOnFusedPQ— over 25 random queries on a deterministic FusedPQ graph, sync and pipeline search return the same node ids in the same order with identical scores.TestFusedGraphIndex,TestOnDiskGraphIndex,TestVectorGraph,TestOnDiskGraphIndexCompactor,Test2DThreshold(42 tests).readRangeAsyncroutes toRemoteFileServiceClient.readFileRangeAsByteBufAsync(separate repo); re-profile and confirmRemoteRandomAccessReader.readFullywall share drops.🤖 Generated with Claude Code