Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,17 @@ final class CompactWriter implements AutoCloseable {
private final ProductQuantization pq;
private final int baseDegree;
private final int maxOrdinal;
private final int entryNode;
private final ThreadLocal<ByteBuffer> bufferPerThread;
private final ThreadLocal<ByteSequence<?>> zeroPQ;
private final boolean fusedPQEnabled;
private final Path outputPath;
private final List<CommonHeader.LayerInfo> configuredLayerInfo;
private final List<Integer> configuredLayerDegrees;
private final List<UpperLayerFeatureRecord> level1FeatureRecords;
// PQ code for the entry node, required when hierarchy is disabled (no level 1).
// Mirrors what AbstractGraphIndexWriter.writeSparseLevels writes in the no-hierarchy branch.
private ByteSequence<?> entryNodePqCode;

CompactWriter(Path outputPath,
int maxOrdinal,
Expand All @@ -91,7 +95,9 @@ final class CompactWriter implements AutoCloseable {
this.baseDegree = layerDegrees.get(0);
this.pq = pq;
this.maxOrdinal = maxOrdinal;
this.entryNode = entryNode;
this.level1FeatureRecords = new ArrayList<>();
this.entryNodePqCode = null;

Map<FeatureId, Feature> featureMap = new LinkedHashMap<>();
InlineVectors inlineVectorFeature = new InlineVectors(dimension);
Expand Down Expand Up @@ -134,10 +140,21 @@ public void writeHeader() throws IOException {
}

void writeFooter() throws IOException {
if (fusedPQEnabled && version == 6 && !level1FeatureRecords.isEmpty()) {
for (UpperLayerFeatureRecord record : level1FeatureRecords) {
writer.writeInt(record.ordinal);
vectorTypeSupport.writeByteSequence(writer, record.pqCode);
if (fusedPQEnabled && version == 6) {
if (!level1FeatureRecords.isEmpty()) {
// Hierarchy is enabled: write PQ source feature for every level-1 node.
// Mirrors AbstractGraphIndexWriter.writeSparseLevels (getMaxLevel >= 1 branch).
for (UpperLayerFeatureRecord record : level1FeatureRecords) {
writer.writeInt(record.ordinal);
vectorTypeSupport.writeByteSequence(writer, record.pqCode);
}
} else if (entryNodePqCode != null) {
// No hierarchy: write the entry node's own PQ code so that
// OnDiskGraphIndex.loadInMemoryFeatures can populate hierarchyCachedFeatures
// and GraphSearcher.initializeInternal can score the entry point.
// Mirrors AbstractGraphIndexWriter.writeSparseLevels (getMaxLevel == 0 branch).
writer.writeInt(entryNode);
vectorTypeSupport.writeByteSequence(writer, entryNodePqCode);
}
}
long headerOffset = writer.position();
Expand All @@ -157,6 +174,10 @@ public Path getOutputPath() {
return outputPath;
}

public void setEntryNodePqCode(ByteSequence<?> code) {
this.entryNodePqCode = code;
}

public void writeUpperLayerNode(int level, int ordinal, int[] neighbors, ByteSequence<?> level1PqCode) throws IOException {
writer.writeInt(ordinal);
writer.writeInt(neighbors.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,31 @@ public void compact(Path outputPath) throws FileNotFoundException {
}

List<CommonHeader.LayerInfo> layerInfo = computeLayerInfoFromSources();
int entryNode = resolveEntryNode();
int[] entryNodeSource = resolveEntryNodeSource(); // {sourceIdx, originalOrdinal}
int entryNode = remappers.get(entryNodeSource[0]).oldToNew(entryNodeSource[1]);

log.info("Writing compacted graph : {} total nodes, maxOrdinal={}, dimension={}, degree={}",
numTotalNodes, maxOrdinal, dimension, maxDegrees.get(0));
try (CompactWriter writer = new CompactWriter(outputPath, maxOrdinal, numTotalNodes, 0, layerInfo, entryNode, dimension, maxDegrees, pq, pqLength, fusedPQEnabled)) {
writer.writeHeader();
compactLevels(writer, similarityFunction, fusedPQEnabled, compressedPrecision, pq);

// When FusedPQ is enabled and there is no hierarchy (only L0), the reader expects
// to find the entry node's own PQ code written after the L0 block, just as
// AbstractGraphIndexWriter.writeSparseLevels does in its getMaxLevel == 0 branch.
// Without it, loadInMemoryFeatures reads garbage and hierarchyCachedFeatures is
// missing the entry node, causing "Node X is not in the hierarchy" on first search.
if (fusedPQEnabled && maxDegrees.size() == 1) {
try (var entryView = sources.get(entryNodeSource[0]).getView()) {
var entryVec = vectorTypeSupport.createFloatVector(dimension);
entryView.getVectorInto(entryNodeSource[1], entryVec, 0);
var entryPqCode = vectorTypeSupport.createByteSequence(pq.getSubspaceCount());
entryPqCode.zero();
pq.encodeTo(entryVec, entryPqCode);
writer.setEntryNodePqCode(entryPqCode);
}
}

writer.writeFooter();
log.info("Compaction complete: {}", outputPath);
} catch (IOException | ExecutionException | InterruptedException e) {
Expand All @@ -240,12 +258,13 @@ public void compact(Path outputPath) throws FileNotFoundException {
}

/**
* Resolves the entry node for the compacted graph. The chosen node must exist at maxLevel
* (since the on-disk format sets entryNode.level = maxLevel). Prefers the designated entry
* node of any source whose maxLevel equals the global maxLevel; if all such entry nodes
* are deleted, falls back to the first live node at maxLevel across all sources.
* Returns {sourceIdx, originalOrdinal} for the entry node of the compacted graph.
* The chosen node must exist at maxLevel (since the on-disk format sets entryNode.level =
* maxLevel). Prefers the designated entry node of any source whose maxLevel equals the global
* maxLevel; if all such entry nodes are deleted, falls back to the first live node at maxLevel
* across all sources.
*/
private int resolveEntryNode() {
private int[] resolveEntryNodeSource() {
int maxLevel = sources.stream().mapToInt(OnDiskGraphIndex::getMaxLevel).max().orElse(0);

// The on-disk format sets entryNode.level = layerInfo.size() - 1 (i.e. maxLevel).
Expand All @@ -256,7 +275,7 @@ private int resolveEntryNode() {
if (sources.get(s).getMaxLevel() == maxLevel) {
int originalEntry = sources.get(s).getView().entryNode().node;
if (liveNodes.get(s).get(originalEntry)) {
return remappers.get(s).oldToNew(originalEntry);
return new int[]{s, originalEntry};
}
}
}
Expand All @@ -268,7 +287,7 @@ private int resolveEntryNode() {
while (it.hasNext()) {
int node = it.next();
if (liveNodes.get(s).get(node)) {
return remappers.get(s).oldToNew(node);
return new int[]{s, node};
}
}
}
Expand Down
Loading