From 8eb86f8941b0ab9ac5143529765c095f99378139 Mon Sep 17 00:00:00 2001 From: Miroslav Smiljanic Date: Tue, 16 Sep 2025 12:04:32 +0200 Subject: [PATCH 1/4] OAK-11932 - add segment prefetching capability for CachingSegmentReader --- .../CachingSegmentArchiveReader.java | 99 +++- .../CachingPersistenceTest.java | 523 +++++++++++++++++- 2 files changed, 614 insertions(+), 8 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java index df5aadb3ea8..bbf01e557aa 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java @@ -19,34 +19,130 @@ package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collections; import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class CachingSegmentArchiveReader implements SegmentArchiveReader { + private static final Logger LOG = LoggerFactory.getLogger(CachingSegmentArchiveReader.class); + + @NotNull private final PersistentCache persistentCache; @NotNull private final SegmentArchiveReader delegate; + private final ExecutorService prefetchExecutor; + private final Set inFlightPrefetch = + Collections.newSetFromMap( + new ConcurrentHashMap<>()); + private final boolean prefetchEnabled; + private final int prefetchMaxRefs; + + + public CachingSegmentArchiveReader( @NotNull PersistentCache persistentCache, @NotNull SegmentArchiveReader delegate) { this.persistentCache = persistentCache; this.delegate = delegate; + int threads = Integer.getInteger("oak.segment.cache.threads", 10); + this.prefetchEnabled = Boolean.getBoolean("oak.segment.cache.prefetch.enabled"); + this.prefetchMaxRefs = Integer.getInteger("oak.segment.cache.prefetch.maxRefs", 20); + this.prefetchExecutor = Executors.newFixedThreadPool(threads); } @Override @Nullable public Buffer readSegment(long msb, long lsb) throws IOException { - return persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + Buffer buf = persistentCache.readSegment(msb, lsb, () -> delegate.readSegment(msb, lsb)); + if (buf != null && prefetchEnabled) { + schedulePrefetch(msb, lsb, buf); + } + return buf; + } + + /** + * Extract direct references from a segment buffer. Subclasses may override for testing. + */ + private java.util.List extractReferences(Buffer buffer) { + var data = org.apache.jackrabbit.oak.segment.data.SegmentData.newSegmentData(buffer); + int refs = data.getSegmentReferencesCount(); + java.util.ArrayList out = new java.util.ArrayList<>(refs); + for (int i = 0; i < refs; i++) { + out.add(new java.util.UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); + } + return out; + } + + private void schedulePrefetch(long msb, long lsb, Buffer buffer) { + try { + java.util.List refs = extractReferences(buffer); + int limit = Math.min(refs.size(), prefetchMaxRefs); + for (int i = 0; i < limit; i++) { + final java.util.UUID ref = refs.get(i); + final long rMsb = ref.getMostSignificantBits(); + final long rLsb = ref.getLeastSignificantBits(); + + // Skip if already present in cache + if (persistentCache.containsSegment(rMsb, rLsb)) { + continue; + } + + // Drop prefetch if already in progress for this segment + boolean registered = inFlightPrefetch.add(ref); + if (!registered) { + continue; + } + + try { + prefetchExecutor.execute(() -> { + try { + Buffer b = delegate.readSegment(rMsb, rLsb); + if (b != null) { + // Double-check cache before write to avoid redundant writes + if (!persistentCache.containsSegment(rMsb, rLsb)) { + persistentCache.writeSegment(rMsb, rLsb, b); + } + } + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch failed for segment {}", new java.util.UUID(rMsb, rLsb), e); + } + } finally { + inFlightPrefetch.remove(ref); + } + }); + } catch (Throwable t) { + // If task submission failed (e.g., executor shutting down), undo the registration + inFlightPrefetch.remove(ref); + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch submission failed for segment {}", new java.util.UUID(rMsb, rLsb), t); + } + } + } + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Prefetch scheduling failed for segment {}", new java.util.UUID(msb, lsb), t); + } + } } @Override @@ -88,6 +184,7 @@ public String getName() { @Override public void close() throws IOException { delegate.close(); + new ExecutorCloser(prefetchExecutor).close(); } @Override diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java index 94e09729577..e815e3e0c5a 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java @@ -18,16 +18,33 @@ package org.apache.jackrabbit.oak.segment.spi.persistence.persistentcache; import org.apache.jackrabbit.oak.commons.Buffer; +import org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState; +import org.apache.jackrabbit.oak.segment.CachingSegmentReader; +import org.apache.jackrabbit.oak.segment.DefaultSegmentWriterBuilder; +import org.apache.jackrabbit.oak.segment.RecordId; import org.apache.jackrabbit.oak.segment.Segment; import org.apache.jackrabbit.oak.segment.SegmentId; import org.apache.jackrabbit.oak.segment.SegmentNotFoundException; +import org.apache.jackrabbit.oak.segment.SegmentReader; +import org.apache.jackrabbit.oak.segment.SegmentWriter; import org.apache.jackrabbit.oak.segment.file.FileStore; import org.apache.jackrabbit.oak.segment.file.FileStoreBuilder; import org.apache.jackrabbit.oak.segment.file.InvalidFileStoreVersionException; +import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.file.tar.TarPersistence; import org.apache.jackrabbit.oak.segment.spi.RepositoryNotReachableException; +import org.apache.jackrabbit.oak.segment.spi.monitor.FileStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.IOMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.monitor.RemoteStoreMonitorAdapter; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveManager; +import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentNodeStorePersistence; +import org.apache.jackrabbit.oak.spi.state.NodeBuilder; +import org.apache.jackrabbit.oak.stats.NoopStats; import org.jetbrains.annotations.NotNull; +import org.junit.AfterClass; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -36,13 +53,16 @@ import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.jackrabbit.oak.segment.file.FileStoreBuilder.fileStoreBuilder; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; public class CachingPersistenceTest { @@ -53,6 +73,20 @@ private File getFileStoreFolder() { return folder.getRoot(); } + @Before + public void setup() { + System.setProperty("oak.segment.cache.prefetch.enabled", "true"); + System.setProperty("oak.segment.cache.prefetch.maxRefs", "8"); + System.setProperty("oak.segment.cache.threads", "2"); + } + + @AfterClass + public static void disablePrefetch() { + System.clearProperty("oak.segment.cache.prefetch.enabled"); + System.clearProperty("oak.segment.cache.prefetch.maxRefs"); + System.clearProperty("oak.segment.cache.threads"); + } + @Test(expected = RepositoryNotReachableException.class) public void testRepositoryNotReachableWithCachingPersistence() throws IOException, InvalidFileStoreVersionException { FileStoreBuilder fileStoreBuilder; @@ -95,10 +129,455 @@ public void testRepositoryNotReachableWithCachingPersistence() throws IOExceptio } } - /** - * @param repoNotReachable - if set to true, {@code RepositoryNotReachableException} will be thrown when calling {@code SegmentArchiveReader}#readSegment - * @return - */ + @Test + public void prefetchOnCacheHitLoadsReferences() throws Exception { + + // Create a tar-backed FileStore and build real segments with references + File dir = getFileStoreFolder(); + FileStore fs = fileStoreBuilder(dir).build(); + SegmentArchiveReader archiveReader = null; + try { + SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); + // Reader for NodeState from RecordId within this FileStore + SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); + + // Two referenced nodes in separate segments + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + RecordId a2 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + + // Root node referencing both + NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); + builder.setChildNode("ref1", sr.readNode(a1)); + builder.setChildNode("ref2", sr.readNode(a2)); + RecordId rootRec = writer.writeNode(builder.getNodeState()); + writer.flush(); + + UUID r1 = a1.getSegmentId().asUUID(); + UUID r2 = a2.getSegmentId().asUUID(); + UUID root = rootRec.getSegmentId().asUUID(); + + // Close the FileStore to ensure tar index is written and readable + fs.close(); + + fs = null; + + // Open the tar archive and fetch exact on-disk buffers + TarPersistence tar = new TarPersistence(dir); + SegmentArchiveManager am = tar.createArchiveManager(false, false, + new IOMonitorAdapter(), + new FileStoreMonitorAdapter(), + new RemoteStoreMonitorAdapter()); + List archives = am.listArchives(); + assertFalse("No archives found", archives.isEmpty()); + archiveReader = am.open(archives.get(0)); + assertNotNull(archiveReader); + + Buffer rootBuffer = archiveReader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + final Buffer buf1 = archiveReader.readSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits()); + final Buffer buf2 = archiveReader.readSegment(r2.getMostSignificantBits(), r2.getLeastSignificantBits()); + assertNotNull(rootBuffer); + assertNotNull(buf1); + assertNotNull(buf2); + + // Instrumented cache: pre-seed root and observe writes for r1, r2 + CountDownLatch latch = new CountDownLatch(2); + PersistentCache cache = new MemoryPersistentCache(false) { + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + super.writeSegment(msb, lsb, buffer); + if ((msb == r1.getMostSignificantBits() && lsb == r1.getLeastSignificantBits()) || + (msb == r2.getMostSignificantBits() && lsb == r2.getLeastSignificantBits())) { + latch.countDown(); + } + } + }; + // seed root + cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); + + // Reader under test, delegate reads from real tar archive + CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, archiveReader); + + // Act: read root. Since it's a cache hit, prefetch should be scheduled for r1 and r2. + Buffer got = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + assertNotNull(got); + + boolean completed = latch.await(5, TimeUnit.SECONDS); + assertTrue("Prefetch did not complete in time", completed); + + assertTrue(cache.containsSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits())); + assertTrue(cache.containsSegment(r2.getMostSignificantBits(), r2.getLeastSignificantBits())); + + reader.close(); + } finally { + if (archiveReader != null) archiveReader.close(); + if (fs != null) fs.close(); + } + } + + + + @Test + public void skipAlreadyCachedReferencesAreNotPrefetched() throws Exception { + + // Create a tar-backed FileStore and build segments with references + File dir = getFileStoreFolder(); + FileStore fs = fileStoreBuilder(dir).build(); + try { + SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); + SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); + + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); // will be preseeded in cache + writer.flush(); + RecordId a2 = writer.writeNode(EmptyNodeState.EMPTY_NODE); // will be prefetched + writer.flush(); + + NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); + builder.setChildNode("ref1", sr.readNode(a1)); + builder.setChildNode("ref2", sr.readNode(a2)); + RecordId rootRec = writer.writeNode(builder.getNodeState()); + writer.flush(); + + UUID r1 = a1.getSegmentId().asUUID(); + UUID r2 = a2.getSegmentId().asUUID(); + UUID root = rootRec.getSegmentId().asUUID(); + + + // Close the FileStore to ensure tar index is written and readable + fs.close(); + fs = null; + + + // Open the tar archive and fetch buffers + TarPersistence tar = new TarPersistence(dir); + SegmentArchiveManager am = tar.createArchiveManager(false, false, + new IOMonitorAdapter(), + new FileStoreMonitorAdapter(), + new RemoteStoreMonitorAdapter()); + List archives = am.listArchives(); + assertFalse("No archives found", archives.isEmpty()); + SegmentArchiveReader archiveReader = am.open(archives.get(0)); + assertNotNull(archiveReader); + + Buffer rootBuffer = archiveReader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + final Buffer buf1 = archiveReader.readSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits()); + final Buffer buf2 = archiveReader.readSegment(r2.getMostSignificantBits(), r2.getLeastSignificantBits()); + assertNotNull(rootBuffer); + assertNotNull(buf1); + assertNotNull(buf2); + + // Instrumented cache: count writes for r1 and r2 + AtomicInteger r1Writes = new AtomicInteger(); + AtomicInteger r2Writes = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); // expect only r2 to be written + PersistentCache cache = new MemoryPersistentCache(false) { + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + boolean isR1 = (msb == r1.getMostSignificantBits() && lsb == r1.getLeastSignificantBits()); + boolean isR2 = (msb == r2.getMostSignificantBits() && lsb == r2.getLeastSignificantBits()); + super.writeSegment(msb, lsb, buffer); + if (isR1) { + r1Writes.incrementAndGet(); + } + if (isR2) { + if (r2Writes.incrementAndGet() == 1) { + latch.countDown(); + } + } + } + }; + + // Seed root and r1 into cache (r1 already cached => should be skipped by prefetch) + cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); + cache.writeSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits(), buf1); + + // Reader under test, delegate reads from real tar archive + CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, archiveReader); + + // Act + Buffer got = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + assertNotNull(got); + + boolean completed = latch.await(5, TimeUnit.SECONDS); + assertTrue("Prefetch for non-cached ref did not complete in time", completed); + + // Assert r1 was not written again (still exactly the initial seed), r2 was written once by prefetch + assertTrue(cache.containsSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits())); + assertTrue(cache.containsSegment(r2.getMostSignificantBits(), r2.getLeastSignificantBits())); + // r1 write count includes the pre-seed; ensure no additional write happened via prefetch. + // Pre-seed performed exactly 1 write for r1, so count should remain 1. + assertEquals(1, r1Writes.get()); + assertEquals(1, r2Writes.get()); + + reader.close(); + } finally { + if (fs != null) fs.close(); + } + } + + @Test + public void prefetchDisabledDoesNotScheduleOrWrite() throws Exception { + System.setProperty("oak.segment.cache.prefetch.enabled", "false"); + + File dir = getFileStoreFolder(); + FileStore fs = fileStoreBuilder(dir).build(); + SegmentArchiveReader archiveReader = null; + try { + SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); + SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); + + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + RecordId a2 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + + NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); + builder.setChildNode("ref1", sr.readNode(a1)); + builder.setChildNode("ref2", sr.readNode(a2)); + RecordId rootRec = writer.writeNode(builder.getNodeState()); + writer.flush(); + + UUID r1 = a1.getSegmentId().asUUID(); + UUID r2 = a2.getSegmentId().asUUID(); + UUID root = rootRec.getSegmentId().asUUID(); + + fs.close(); + fs = null; + + // Open the tar archive and fetch exact on-disk buffers + TarPersistence tar = new TarPersistence(dir); + SegmentArchiveManager am = tar.createArchiveManager(false, false, + new IOMonitorAdapter(), + new FileStoreMonitorAdapter(), + new RemoteStoreMonitorAdapter()); + List archives = am.listArchives(); + assertFalse("No archives found", archives.isEmpty()); + archiveReader = am.open(archives.get(0)); + assertNotNull(archiveReader); + + Buffer rootBuffer = archiveReader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + final Buffer buf1 = archiveReader.readSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits()); + final Buffer buf2 = archiveReader.readSegment(r2.getMostSignificantBits(), r2.getLeastSignificantBits()); + assertNotNull(rootBuffer); + assertNotNull(buf1); + assertNotNull(buf2); + + AtomicInteger r1Writes = new AtomicInteger(); + AtomicInteger r2Writes = new AtomicInteger(); + CountDownLatch latch = new CountDownLatch(1); + PersistentCache cache = new MemoryPersistentCache(false) { + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + boolean isR1 = (msb == r1.getMostSignificantBits() && lsb == r1.getLeastSignificantBits()); + boolean isR2 = (msb == r2.getMostSignificantBits() && lsb == r2.getLeastSignificantBits()); + super.writeSegment(msb, lsb, buffer); + if (isR1) { + r1Writes.incrementAndGet(); + } + if (isR2) { + if (r2Writes.incrementAndGet() == 1) { + latch.countDown(); + } + } + } + }; + + // Seed root; with prefetch disabled, reading root should not write r1 or r2 + cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); + + CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, archiveReader); + + Buffer got = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + assertNotNull(got); + + boolean anyWrite = latch.await(2, TimeUnit.SECONDS); + assertFalse("Prefetch disabled should not schedule writes", anyWrite); + assertFalse(cache.containsSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits())); + assertFalse(cache.containsSegment(r2.getMostSignificantBits(), r2.getLeastSignificantBits())); + assertEquals(0, r1Writes.get()); + assertEquals(0, r2Writes.get()); + + reader.close(); + } finally { + if (archiveReader != null) archiveReader.close(); + if (fs != null) fs.close(); + } + } + + @Test + public void concurrentCacheHitsDeduplicatePrefetchTasks() throws Exception { + // Build segments: root references a1 only, to focus on a single target UUID + File dir = getFileStoreFolder(); + FileStore fs = fileStoreBuilder(dir).build(); + SegmentArchiveReader archiveReader = null; + try { + SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); + SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); + + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + + NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); + builder.setChildNode("ref1", sr.readNode(a1)); + RecordId rootRec = writer.writeNode(builder.getNodeState()); + writer.flush(); + + UUID r1 = a1.getSegmentId().asUUID(); + UUID root = rootRec.getSegmentId().asUUID(); + + fs.close(); + fs = null; + + // Open the tar archive and fetch exact on-disk buffers + TarPersistence tar = new TarPersistence(dir); + SegmentArchiveManager am = tar.createArchiveManager(false, false, + new IOMonitorAdapter(), + new FileStoreMonitorAdapter(), + new RemoteStoreMonitorAdapter()); + List archives = am.listArchives(); + assertFalse("No archives found", archives.isEmpty()); + archiveReader = am.open(archives.get(0)); + assertNotNull(archiveReader); + + Buffer rootBuffer = archiveReader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + final Buffer buf1 = archiveReader.readSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits()); + assertNotNull(rootBuffer); + assertNotNull(buf1); + + AtomicInteger r1Writes = new AtomicInteger(); + CountDownLatch firstWrite = new CountDownLatch(1); + PersistentCache cache = new MemoryPersistentCache(false) { + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + boolean isR1 = (msb == r1.getMostSignificantBits() && lsb == r1.getLeastSignificantBits()); + super.writeSegment(msb, lsb, buffer); + if (isR1) { + if (r1Writes.incrementAndGet() == 1) { + firstWrite.countDown(); + } + } + } + }; + + // Seed root in cache so reads are cache hits, triggering prefetch for r1 + cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); + + CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, archiveReader); + + // Fire multiple concurrent reads of the same root buffer against the same reader instance + int threads = 12; + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threads); + for (int i = 0; i < threads; i++) { + new Thread(() -> { + try { + start.await(); + Buffer got = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + assertNotNull(got); + } catch (Exception e) { + // propagate as assertion failure + fail("Concurrent read failed: " + e); + } finally { + done.countDown(); + } + }, "t-" + i).start(); + } + start.countDown(); + + assertTrue("Concurrent reads did not finish in time", done.await(5, TimeUnit.SECONDS)); + assertTrue("Prefetch did not write target in time", firstWrite.await(5, TimeUnit.SECONDS)); + + Thread.sleep(200); + + // Assert exactly one write for r1 + assertEquals("Prefetch should be deduplicated across concurrent cache hits", 1, r1Writes.get()); + assertTrue(cache.containsSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits())); + + reader.close(); + } finally { + if (archiveReader != null) archiveReader.close(); + if (fs != null) fs.close(); + } + } + + @Test + public void inFlightDedupIsReleasedAfterDelegateFailure() throws Exception { + // Build segments: root references a1 only + File dir = getFileStoreFolder(); + FileStore fs = fileStoreBuilder(dir).build(); + SegmentArchiveReader archiveReader = null; + try { + SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); + SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); + + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + + NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); + builder.setChildNode("ref1", sr.readNode(a1)); + RecordId rootRec = writer.writeNode(builder.getNodeState()); + writer.flush(); + + UUID r1 = a1.getSegmentId().asUUID(); + UUID root = rootRec.getSegmentId().asUUID(); + + // Close the FileStore to ensure tar index is written and readable + fs.close(); + fs = null; + + // Open the tar archive and fetch exact on-disk buffers + TarPersistence tar = new TarPersistence(dir); + SegmentArchiveManager am = tar.createArchiveManager(false, false, + new IOMonitorAdapter(), + new FileStoreMonitorAdapter(), + new RemoteStoreMonitorAdapter()); + List archives = am.listArchives(); + assertFalse("No archives found", archives.isEmpty()); + archiveReader = am.open(archives.get(0)); + assertNotNull(archiveReader); + + Buffer rootBuffer = archiveReader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + final Buffer buf1 = archiveReader.readSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits()); + assertNotNull(rootBuffer); + assertNotNull(buf1); + + AtomicInteger r1Writes = new AtomicInteger(); + CountDownLatch writeLatch = new CountDownLatch(1); + PersistentCache cache = new MemoryPersistentCache(false) { + @Override + public void writeSegment(long msb, long lsb, Buffer buffer) { + boolean isR1 = (msb == r1.getMostSignificantBits() && lsb == r1.getLeastSignificantBits()); + super.writeSegment(msb, lsb, buffer); + if (isR1) { + if (r1Writes.incrementAndGet() == 1) { + writeLatch.countDown(); + } + } + } + }; + + // Seed only root in cache + cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); + + CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, new FailingOnceReader(archiveReader, r1)); + + // First read: schedules prefetch, delegate fails once, no write occurs, in-flight must be cleared + Buffer got1 = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + assertNotNull(got1); + // Wait a bit for async attempt to complete + Thread.sleep(200); + assertEquals(0, r1Writes.get()); + assertFalse(cache.containsSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits())); + + // Second read: should schedule again and now succeed + Buffer got2 = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); + assertNotNull(got2); + assertTrue("Prefetch after failure did not write target in time", writeLatch.await(5, TimeUnit.SECONDS)); + assertEquals(1, r1Writes.get()); + assertTrue(cache.containsSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits())); + + reader.close(); + } finally { + if (archiveReader != null) archiveReader.close(); + if (fs != null) fs.close(); + } + } + @NotNull private FileStoreBuilder getFileStoreBuilderWithCachingPersistence(boolean repoNotReachable) { FileStoreBuilder fileStoreBuilder; @@ -156,4 +635,34 @@ public void cleanUp() { } } + + class FailingOnceReader implements SegmentArchiveReader { + final SegmentArchiveReader segmentArchiveReader; + final java.util.concurrent.atomic.AtomicBoolean failed = new java.util.concurrent.atomic.AtomicBoolean(false); + final UUID segmentId; + + FailingOnceReader(SegmentArchiveReader d, UUID r1) { + this.segmentArchiveReader = d; + this.segmentId = r1; + } + + @Override public Buffer readSegment(long msb, long lsb) throws IOException { + if (msb == segmentId.getMostSignificantBits() && lsb == segmentId.getLeastSignificantBits()) { + if (failed.compareAndSet(false, true)) { + throw new IOException("fail once"); + } + } + return segmentArchiveReader.readSegment(msb, lsb); + } + + @Override public boolean containsSegment(long msb, long lsb) { return segmentArchiveReader.containsSegment(msb, lsb); } + @Override public List listSegments() { return segmentArchiveReader.listSegments(); } + @Override public SegmentGraph getGraph() throws IOException { return segmentArchiveReader.getGraph(); } + @Override public Buffer getBinaryReferences() throws IOException { return segmentArchiveReader.getBinaryReferences(); } + @Override public long length() { return segmentArchiveReader.length(); } + @Override public String getName() { return segmentArchiveReader.getName(); } + @Override public void close() throws IOException { segmentArchiveReader.close(); } + @Override public int getEntrySize(int size) { return segmentArchiveReader.getEntrySize(size); } + @Override public boolean isRemote() { return segmentArchiveReader.isRemote(); } + } } From ceab496ea573144df26481185bba21c4812a7739 Mon Sep 17 00:00:00 2001 From: Miroslav Smiljanic Date: Tue, 16 Sep 2025 17:47:33 +0200 Subject: [PATCH 2/4] OAK-11932 format --- .../CachingSegmentArchiveReader.java | 31 +++++-------- .../CachingPersistenceTest.java | 46 +++++++------------ 2 files changed, 29 insertions(+), 48 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java index bbf01e557aa..9e2ac39c164 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingSegmentArchiveReader.java @@ -20,6 +20,7 @@ import org.apache.jackrabbit.oak.commons.Buffer; import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; +import org.apache.jackrabbit.oak.segment.data.SegmentData; import org.apache.jackrabbit.oak.segment.file.tar.SegmentGraph; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveEntry; import org.apache.jackrabbit.oak.segment.spi.persistence.SegmentArchiveReader; @@ -29,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; @@ -36,7 +38,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; public class CachingSegmentArchiveReader implements SegmentArchiveReader { @@ -79,25 +80,22 @@ public Buffer readSegment(long msb, long lsb) throws IOException { return buf; } - /** - * Extract direct references from a segment buffer. Subclasses may override for testing. - */ - private java.util.List extractReferences(Buffer buffer) { - var data = org.apache.jackrabbit.oak.segment.data.SegmentData.newSegmentData(buffer); + private List extractReferences(Buffer buffer) { + var data = SegmentData.newSegmentData(buffer); int refs = data.getSegmentReferencesCount(); - java.util.ArrayList out = new java.util.ArrayList<>(refs); + ArrayList out = new ArrayList<>(refs); for (int i = 0; i < refs; i++) { - out.add(new java.util.UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); + out.add(new UUID(data.getSegmentReferenceMsb(i), data.getSegmentReferenceLsb(i))); } return out; } private void schedulePrefetch(long msb, long lsb, Buffer buffer) { try { - java.util.List refs = extractReferences(buffer); + List refs = extractReferences(buffer); int limit = Math.min(refs.size(), prefetchMaxRefs); for (int i = 0; i < limit; i++) { - final java.util.UUID ref = refs.get(i); + final UUID ref = refs.get(i); final long rMsb = ref.getMostSignificantBits(); final long rLsb = ref.getLeastSignificantBits(); @@ -123,9 +121,7 @@ private void schedulePrefetch(long msb, long lsb, Buffer buffer) { } } } catch (Exception e) { - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch failed for segment {}", new java.util.UUID(rMsb, rLsb), e); - } + LOG.debug("Prefetch failed for segment {}", new java.util.UUID(rMsb, rLsb), e); } finally { inFlightPrefetch.remove(ref); } @@ -133,15 +129,12 @@ private void schedulePrefetch(long msb, long lsb, Buffer buffer) { } catch (Throwable t) { // If task submission failed (e.g., executor shutting down), undo the registration inFlightPrefetch.remove(ref); - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch submission failed for segment {}", new java.util.UUID(rMsb, rLsb), t); - } + LOG.debug("Prefetch submission failed for segment {}", new java.util.UUID(rMsb, rLsb), t); + } } } catch (Throwable t) { - if (LOG.isDebugEnabled()) { - LOG.debug("Prefetch scheduling failed for segment {}", new java.util.UUID(msb, lsb), t); - } + LOG.debug("Prefetch scheduling failed for segment {}", new java.util.UUID(msb, lsb), t); } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java index e815e3e0c5a..91532eef503 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java @@ -131,8 +131,6 @@ public void testRepositoryNotReachableWithCachingPersistence() throws IOExceptio @Test public void prefetchOnCacheHitLoadsReferences() throws Exception { - - // Create a tar-backed FileStore and build real segments with references File dir = getFileStoreFolder(); FileStore fs = fileStoreBuilder(dir).build(); SegmentArchiveReader archiveReader = null; @@ -142,8 +140,10 @@ public void prefetchOnCacheHitLoadsReferences() throws Exception { SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); // Two referenced nodes in separate segments - RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); - RecordId a2 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); + writer.flush(); + RecordId a2 = writer.writeNode(EmptyNodeState.EMPTY_NODE); + writer.flush(); // Root node referencing both NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); @@ -194,10 +194,9 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { // seed root cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); - // Reader under test, delegate reads from real tar archive CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, archiveReader); - // Act: read root. Since it's a cache hit, prefetch should be scheduled for r1 and r2. + // Read root. Since it's a cache hit, prefetch should be scheduled for r1 and r2. Buffer got = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); assertNotNull(got); @@ -217,9 +216,7 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { @Test - public void skipAlreadyCachedReferencesAreNotPrefetched() throws Exception { - - // Create a tar-backed FileStore and build segments with references + public void alreadyCachedReferencesAreNotPrefetched() throws Exception { File dir = getFileStoreFolder(); FileStore fs = fileStoreBuilder(dir).build(); try { @@ -241,13 +238,10 @@ public void skipAlreadyCachedReferencesAreNotPrefetched() throws Exception { UUID r2 = a2.getSegmentId().asUUID(); UUID root = rootRec.getSegmentId().asUUID(); - - // Close the FileStore to ensure tar index is written and readable fs.close(); fs = null; - // Open the tar archive and fetch buffers TarPersistence tar = new TarPersistence(dir); SegmentArchiveManager am = tar.createArchiveManager(false, false, new IOMonitorAdapter(), @@ -290,21 +284,17 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); cache.writeSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits(), buf1); - // Reader under test, delegate reads from real tar archive CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, archiveReader); - // Act Buffer got = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); assertNotNull(got); boolean completed = latch.await(5, TimeUnit.SECONDS); assertTrue("Prefetch for non-cached ref did not complete in time", completed); - // Assert r1 was not written again (still exactly the initial seed), r2 was written once by prefetch assertTrue(cache.containsSegment(r1.getMostSignificantBits(), r1.getLeastSignificantBits())); assertTrue(cache.containsSegment(r2.getMostSignificantBits(), r2.getLeastSignificantBits())); - // r1 write count includes the pre-seed; ensure no additional write happened via prefetch. - // Pre-seed performed exactly 1 write for r1, so count should remain 1. + assertEquals(1, r1Writes.get()); assertEquals(1, r2Writes.get()); @@ -325,8 +315,10 @@ public void prefetchDisabledDoesNotScheduleOrWrite() throws Exception { SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); - RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); - RecordId a2 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); + writer.flush(); + RecordId a2 = writer.writeNode(EmptyNodeState.EMPTY_NODE); + writer.flush(); NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); builder.setChildNode("ref1", sr.readNode(a1)); @@ -368,13 +360,8 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { boolean isR1 = (msb == r1.getMostSignificantBits() && lsb == r1.getLeastSignificantBits()); boolean isR2 = (msb == r2.getMostSignificantBits() && lsb == r2.getLeastSignificantBits()); super.writeSegment(msb, lsb, buffer); - if (isR1) { - r1Writes.incrementAndGet(); - } - if (isR2) { - if (r2Writes.incrementAndGet() == 1) { - latch.countDown(); - } + if ((isR1 || isR2) && latch.getCount() > 0) { + latch.countDown(); } } }; @@ -403,7 +390,6 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { @Test public void concurrentCacheHitsDeduplicatePrefetchTasks() throws Exception { - // Build segments: root references a1 only, to focus on a single target UUID File dir = getFileStoreFolder(); FileStore fs = fileStoreBuilder(dir).build(); SegmentArchiveReader archiveReader = null; @@ -411,7 +397,8 @@ public void concurrentCacheHitsDeduplicatePrefetchTasks() throws Exception { SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); - RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); + writer.flush(); NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); builder.setChildNode("ref1", sr.readNode(a1)); @@ -506,7 +493,8 @@ public void inFlightDedupIsReleasedAfterDelegateFailure() throws Exception { SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); SegmentReader sr = new CachingSegmentReader(() -> writer, null, 16, 2, NoopStats.INSTANCE); - RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); writer.flush(); + RecordId a1 = writer.writeNode(EmptyNodeState.EMPTY_NODE); + writer.flush(); NodeBuilder builder = EmptyNodeState.EMPTY_NODE.builder(); builder.setChildNode("ref1", sr.readNode(a1)); From 24d2970ce8b2d90ed978de890a6ea4b9775b1c62 Mon Sep 17 00:00:00 2001 From: Miroslav Smiljanic Date: Tue, 16 Sep 2025 18:27:11 +0200 Subject: [PATCH 3/4] OAK-11932 fix the test --- .../persistentcache/CachingPersistenceTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java index 91532eef503..7ad701e5795 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java @@ -131,7 +131,7 @@ public void testRepositoryNotReachableWithCachingPersistence() throws IOExceptio @Test public void prefetchOnCacheHitLoadsReferences() throws Exception { - File dir = getFileStoreFolder(); + File dir = folder.newFolder(); FileStore fs = fileStoreBuilder(dir).build(); SegmentArchiveReader archiveReader = null; try { @@ -217,7 +217,7 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { @Test public void alreadyCachedReferencesAreNotPrefetched() throws Exception { - File dir = getFileStoreFolder(); + File dir = folder.newFolder(); FileStore fs = fileStoreBuilder(dir).build(); try { SegmentWriter writer = DefaultSegmentWriterBuilder.defaultSegmentWriterBuilder("t").build(fs); @@ -308,7 +308,7 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { public void prefetchDisabledDoesNotScheduleOrWrite() throws Exception { System.setProperty("oak.segment.cache.prefetch.enabled", "false"); - File dir = getFileStoreFolder(); + File dir = folder.newFolder(); FileStore fs = fileStoreBuilder(dir).build(); SegmentArchiveReader archiveReader = null; try { @@ -390,7 +390,7 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { @Test public void concurrentCacheHitsDeduplicatePrefetchTasks() throws Exception { - File dir = getFileStoreFolder(); + File dir = folder.newFolder(); FileStore fs = fileStoreBuilder(dir).build(); SegmentArchiveReader archiveReader = null; try { @@ -486,7 +486,7 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { @Test public void inFlightDedupIsReleasedAfterDelegateFailure() throws Exception { // Build segments: root references a1 only - File dir = getFileStoreFolder(); + File dir = folder.newFolder(); FileStore fs = fileStoreBuilder(dir).build(); SegmentArchiveReader archiveReader = null; try { From e7d4c084fb235ed88c7cdc986b736bce4a14f584 Mon Sep 17 00:00:00 2001 From: Miroslav Smiljanic Date: Tue, 16 Sep 2025 18:34:08 +0200 Subject: [PATCH 4/4] OAK-11932 fix the test --- .../persistentcache/CachingPersistenceTest.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java index 7ad701e5795..8c03f06da6d 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/spi/persistence/persistentcache/CachingPersistenceTest.java @@ -45,6 +45,7 @@ import org.jetbrains.annotations.NotNull; import org.junit.AfterClass; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -306,7 +307,6 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { @Test public void prefetchDisabledDoesNotScheduleOrWrite() throws Exception { - System.setProperty("oak.segment.cache.prefetch.enabled", "false"); File dir = folder.newFolder(); FileStore fs = fileStoreBuilder(dir).build(); @@ -369,7 +369,14 @@ public void writeSegment(long msb, long lsb, Buffer buffer) { // Seed root; with prefetch disabled, reading root should not write r1 or r2 cache.writeSegment(root.getMostSignificantBits(), root.getLeastSignificantBits(), rootBuffer); + String prev = System.getProperty("oak.segment.cache.prefetch.enabled"); + System.setProperty("oak.segment.cache.prefetch.enabled", "false"); CachingSegmentArchiveReader reader = new CachingSegmentArchiveReader(cache, archiveReader); + if (prev != null) { + System.setProperty("oak.segment.cache.prefetch.enabled", prev); + } else { + System.clearProperty("oak.segment.cache.prefetch.enabled"); + } Buffer got = reader.readSegment(root.getMostSignificantBits(), root.getLeastSignificantBits()); assertNotNull(got);