From 3cb10d306124757ea043973f2157a899a9265bb3 Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Wed, 17 Sep 2025 23:21:21 +0200 Subject: [PATCH 1/3] OAK-11934 - segment prefetching for segmentstore cache --- .../jackrabbit/oak/segment/SegmentCache.java | 200 +++++++++++++++++- .../oak/segment/file/AbstractFileStore.java | 8 +- .../oak/segment/file/FileStore.java | 1 + .../oak/segment/file/FileStoreBuilder.java | 31 ++- .../oak/segment/file/ReadOnlyFileStore.java | 1 + .../oak/segment/SegmentCacheTest.java | 40 ++++ 6 files changed, 264 insertions(+), 17 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java index 41238c04b4d..7ea7b8b7923 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java @@ -20,12 +20,20 @@ package org.apache.jackrabbit.oak.segment; import static java.util.Objects.requireNonNull; +import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument; import static org.apache.jackrabbit.oak.segment.CacheWeights.segmentWeight; +import java.io.Closeable; +import java.io.IOException; +import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.jackrabbit.guava.common.cache.Cache; @@ -35,6 +43,9 @@ import org.apache.jackrabbit.oak.cache.AbstractCacheStats; import org.apache.jackrabbit.oak.segment.CacheWeights.SegmentCacheWeigher; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A cache for {@link SegmentId#isDataSegmentId() data} {@link Segment} @@ -46,7 +57,7 @@ * SegmentId#segment}. Every time an segment is evicted from this cache the * memoised segment is discarded (see {@code SegmentId#onAccess}. */ -public abstract class SegmentCache { +public abstract class SegmentCache implements Closeable { /** * Default maximum weight of this cache in MB @@ -59,17 +70,71 @@ public abstract class SegmentCache { * Create a new segment cache of the given size. Returns an always empty * cache for {@code cacheSizeMB <= 0}. * - * @param cacheSizeMB size of the cache in megabytes. + * @param cacheSizeMB size of the cache in megabytes. */ @NotNull - public static SegmentCache newSegmentCache(long cacheSizeMB) { - if (cacheSizeMB > 0) { + public static SegmentCache newSegmentCache(int cacheSizeMB) { + return new SegmentCache.Config() + .withCacheSizeMB(cacheSizeMB) + .build(null, null); + } + + private static SegmentCache newSegmentCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @Nullable Function uuidToSegmentId, @Nullable Function segmentLoader) { + if (cacheSizeMB > 0 && prefetchThreads > 0 && prefetchDepth > 0 && uuidToSegmentId != null && segmentLoader != null) { + return new PrefetchCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader); + } else if (cacheSizeMB > 0) { return new NonEmptyCache(cacheSizeMB); } else { return new EmptyCache(); } } + public static final class Config { + + private int cacheSizeMB = DEFAULT_SEGMENT_CACHE_MB; + + private int prefetchThreads = 0; + + private int prefetchDepth = 1; + + public Config withCacheSizeMB(int cacheSizeMB) { + this.cacheSizeMB = cacheSizeMB; + return this; + } + + public Config withPrefetchThreads(int prefetchThreads) { + this.prefetchThreads = prefetchThreads; + return this; + } + + public Config withPrefetchDepth(int prefetchDepth) { + this.prefetchDepth = prefetchDepth; + return this; + } + + @NotNull + public SegmentCache build(@Nullable Function uuidToSegmentId, @Nullable Function segmentLoader) { + if (prefetchThreads > 0 && prefetchDepth > 0) { + checkArgument(uuidToSegmentId != null, "uuidToSegmentId must be provided when prefetching is enabled"); + checkArgument(segmentLoader != null, "segmentLoader must be provided when prefetching is enabled"); + } + return SegmentCache.newSegmentCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader); + } + + public int getSegmentCacheSize() { + return cacheSizeMB; + } + + @Override + public String toString() { + return "Config{" + + "cacheSizeMB=" + cacheSizeMB + + ", prefetchThreads=" + prefetchThreads + + ", prefetchDepth=" + prefetchDepth + + '}'; + } + } + /** * Retrieve an segment from the cache or load it and cache it if not yet in * the cache. @@ -109,6 +174,11 @@ public abstract Segment getSegment(@NotNull SegmentId id, @NotNull Callable uuidToSegmentId; + + private final Function segmentLoader; + + /** + * Create a new cache of the given size. + * + * @param cacheSizeMB size of the cache in megabytes. + * @param prefetchThreads the number of threads to use for prefetching + * @param prefetchDepth the depth to prefetch + * @param uuidToSegmentId the function to get the SegmentId by its UUID + */ + private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @NotNull Function uuidToSegmentId, @NotNull Function segmentLoader) { + super(cacheSizeMB); + this.prefetchPool = new ThreadPoolExecutor(prefetchThreads, prefetchThreads, + 30, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(prefetchThreads * 8), + r -> { + String threadName = String.format("segment-prefetch-%s", + Long.toHexString(System.nanoTime() & 0xFFFFF)); + return new Thread(r, threadName) { + { + setUncaughtExceptionHandler((t, e) -> { + if (!(e instanceof InterruptedException)) { + LOG.warn("Uncaught exception in thread {}", t.getName(), e); + } + }); + } + }; + }, + new ThreadPoolExecutor.DiscardPolicy()); + this.prefetchPool.allowCoreThreadTimeOut(true); + this.prefetchDepth = prefetchDepth; + this.segmentLoader = segmentLoader; + this.uuidToSegmentId = uuidToSegmentId; + } + + @Override + public @NotNull Segment getSegment(@NotNull SegmentId id, @NotNull Callable loader) throws ExecutionException { + return super.getSegment(id, () -> { + Segment s = loader.call(); + if (s != null && id.isDataSegmentId()) { + prefetchPool.execute(new PrefetchRunnable(s, prefetchDepth)); + } + return s; + }); + } + + @Override + public void close() { + super.close(); + try { + prefetchPool.shutdown(); + if (!prefetchPool.awaitTermination(10, TimeUnit.SECONDS)) { + prefetchPool.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + prefetchPool.shutdownNow(); + } + } + + private class PrefetchRunnable implements Runnable { + private final Segment segment; + private final int depth; + + PrefetchRunnable(Segment segment, int depth) { + this.segment = segment; + this.depth = depth; + } + + @Override + public void run() { + SegmentId segmentId = segment.getSegmentId(); + if (SegmentId.isDataSegmentId(segmentId.getLeastSignificantBits())) { + for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { + UUID referencedSegmentId = segment.getReferencedSegmentId(i); + SegmentId id = uuidToSegmentId.apply(referencedSegmentId); + prefetchPool.execute(() -> { + try { + PrefetchCache.super.getSegment(id, () -> { + Segment s = null; + try { + LOG.debug("Prefetching segment {}, referenced by segment {}", id, segmentId); + return s = segmentLoader.apply(id); + } catch (SegmentNotFoundException e) { + LOG.warn("SegmentNotFoundException during prefetch of segment {}, referenced by segment {}", id, segmentId); + throw e; + } finally { + if (s != null && depth > 0) { + prefetchPool.execute(new PrefetchRunnable(s, depth - 1)); + } + } + }); + } catch (ExecutionException e) { + LOG.warn("Error prefetching segment {}", id, e); + } + }); + } + } + } + + @Override + public String toString() { + return "PrefetchRunnable{segment=" + segment.getSegmentId() + '}'; + } + } + } + /** An always empty cache */ private static class EmptyCache extends SegmentCache { private final Stats stats = new Stats(NAME, 0, () -> 0L); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 33a23b0e465..2db6bb5b645 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -18,7 +18,6 @@ */ package org.apache.jackrabbit.oak.segment.file; -import static org.apache.jackrabbit.oak.segment.SegmentCache.newSegmentCache; import static org.apache.jackrabbit.oak.segment.data.SegmentData.newSegmentData; import java.io.Closeable; @@ -130,7 +129,7 @@ public void recoverEntry(UUID uuid, byte[] data, EntryRecovery entryRecovery) th protected final IOMonitor ioMonitor; protected final RemoteStoreMonitor remoteStoreMonitor; - + protected final int binariesInlineThreshold; AbstractFileStore(final FileStoreBuilder builder) { @@ -142,7 +141,10 @@ public SegmentId newSegmentId(long msb, long lsb) { } }); this.blobStore = builder.getBlobStore(); - this.segmentCache = newSegmentCache(builder.getSegmentCacheSize()); + this.segmentCache = builder.getSegmentCacheConfig().build( + uuid -> tracker.newSegmentId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), + segmentId -> readSegmentUncached(getTarFiles(), segmentId) + ); this.segmentReader = new CachingSegmentReader( this::getWriter, blobStore, diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index e933b5611e8..9cccd9c8e10 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -487,6 +487,7 @@ public void close() { closer.register(repositoryLock::unlock); closer.register(tarFiles) ; closer.register(revisions); + closer.register(segmentCache); closeAndLogOnFail(closer); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java index fd390022321..bcba75e429d 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; +import java.util.function.Consumer; import org.apache.jackrabbit.oak.commons.conditions.Validate; import org.apache.jackrabbit.oak.segment.CacheWeights.NodeCacheWeigher; @@ -40,6 +41,7 @@ import org.apache.jackrabbit.oak.segment.CacheWeights.TemplateCacheWeigher; import org.apache.jackrabbit.oak.segment.RecordCache; import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentCache; import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener; import org.apache.jackrabbit.oak.segment.WriterCacheManager; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; @@ -79,7 +81,8 @@ public class FileStoreBuilder { private int maxFileSize = DEFAULT_MAX_FILE_SIZE; - private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB; + private final SegmentCache.Config segmentCacheConfig = new SegmentCache.Config() + .withCacheSizeMB(DEFAULT_SEGMENT_CACHE_MB); private int stringCacheSize = DEFAULT_STRING_CACHE_MB; @@ -94,7 +97,7 @@ public class FileStoreBuilder { private boolean memoryMapping = MEMORY_MAPPING_DEFAULT; private boolean offHeapAccess = getBoolean("access.off.heap"); - + private int binariesInlineThreshold = Segment.MEDIUM_LIMIT; private SegmentNodeStorePersistence persistence; @@ -192,7 +195,18 @@ public FileStoreBuilder withMaxFileSize(int maxFileSize) { */ @NotNull public FileStoreBuilder withSegmentCacheSize(int segmentCacheSize) { - this.segmentCacheSize = segmentCacheSize; + this.segmentCacheConfig.withCacheSizeMB(segmentCacheSize); + return this; + } + /** + * Configure the segment cache. + * + * @param segmentCacheConfigurer Callback to configure segment cache + * @return this instance + */ + @NotNull + public FileStoreBuilder withSegmentCache(Consumer segmentCacheConfigurer) { + segmentCacheConfigurer.accept(this.segmentCacheConfig); return this; } @@ -397,7 +411,7 @@ public FileStoreBuilder withEagerSegmentCaching(boolean eagerSegmentCaching) { this.eagerSegmentCaching = eagerSegmentCaching; return this; } - + /** * Sets the threshold under which binaries are inlined in data segments. * @param binariesInlineThreshold the threshold @@ -505,8 +519,9 @@ public int getMaxFileSize() { return maxFileSize; } - int getSegmentCacheSize() { - return segmentCacheSize; + @NotNull + SegmentCache.Config getSegmentCacheConfig() { + return segmentCacheConfig; } int getStringCacheSize() { @@ -585,7 +600,7 @@ boolean getStrictVersionCheck() { boolean getEagerSegmentCaching() { return eagerSegmentCaching; } - + int getBinariesInlineThreshold() { return binariesInlineThreshold; } @@ -598,7 +613,7 @@ public String toString() { ", blobStore=" + blobStore + ", binariesInlineThreshold=" + binariesInlineThreshold + ", maxFileSize=" + maxFileSize + - ", segmentCacheSize=" + segmentCacheSize + + ", segmentCacheConfig=" + segmentCacheConfig + ", stringCacheSize=" + stringCacheSize + ", templateCacheSize=" + templateCacheSize + ", stringDeduplicationCacheSize=" + stringDeduplicationCacheSize + diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java index 5037f172ffe..050f61ad0c0 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java @@ -136,6 +136,7 @@ public void close() { Closer closer = Closer.create(); closer.register(tarFiles); closer.register(revisions); + closer.register(segmentCache); closeAndLogOnFail(closer); System.gc(); // for any memory-mappings that are no longer used log.info("TarMK closed: {}", directory); diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java index 4b965ce07d0..3e7bef90e3d 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java @@ -23,13 +23,18 @@ import static org.apache.jackrabbit.oak.segment.SegmentStore.EMPTY_STORE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.apache.jackrabbit.oak.cache.AbstractCacheStats; import org.junit.Test; @@ -240,6 +245,41 @@ public void emptyCacheStatsTest() throws Exception { assertEquals(0, stats.getEvictionCount()); } + @Test + public void prefetchCache() throws Exception { + // let segment1 reference segment2 and segment3 + when(segment1.getReferencedSegmentIdCount()).thenReturn(2); + when(segment1.getReferencedSegmentId(eq(0))).thenReturn(id2.asUUID()); + when(segment1.getReferencedSegmentId(eq(1))).thenReturn(id3.asUUID()); + + try (SegmentCache segmentCache = new SegmentCache.Config() + .withCacheSizeMB(10) + .withPrefetchThreads(1) + .build( + uuid -> Stream.of(id1, id2, id3).filter(id -> id.asUUID().equals(uuid)).findFirst().orElse(null), + Map.of( + id1, segment1, + id2, segment2, + id3, segment3 + )::get + )) { + + segmentCache.getSegment(id1, () -> segment1); + long timeout = System.currentTimeMillis() + 5000; + while (segmentCache.getCacheStats().getLoadCount() < 3) { + // wait for prefetch to complete + TimeUnit.MILLISECONDS.sleep(1); + if (System.currentTimeMillis() > timeout) { + fail("Timeout waiting for prefetch to complete"); + } + } + + Segment shouldNeverBeLoaded = mock(Segment.class); + assertSame(segment2, segmentCache.getSegment(id2, () -> shouldNeverBeLoaded)); + assertSame(segment3, segmentCache.getSegment(id3, () -> shouldNeverBeLoaded)); + } + } + private static void expect(Class exceptionType, Callable thunk) { try { thunk.call(); From ecaf0b9dbd714816ef2f9646730ac194f9466c44 Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Thu, 18 Sep 2025 09:21:00 +0200 Subject: [PATCH 2/3] OAK-11934 - segment prefetching for segmentstore cache - add getTarFiles method missed in the previous commit --- .../jackrabbit/oak/segment/SegmentCache.java | 55 ++++++++++++++++--- .../oak/segment/file/AbstractFileStore.java | 9 ++- .../oak/segment/file/FileStore.java | 5 ++ .../oak/segment/file/ReadOnlyFileStore.java | 5 ++ 4 files changed, 65 insertions(+), 9 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java index 7ea7b8b7923..61054ddc71b 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java @@ -25,10 +25,13 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Instant; import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -308,12 +311,11 @@ private static class PrefetchCache extends NonEmptyCache { */ private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @NotNull Function uuidToSegmentId, @NotNull Function segmentLoader) { super(cacheSizeMB); - this.prefetchPool = new ThreadPoolExecutor(prefetchThreads, prefetchThreads, + this.prefetchPool = new ThreadPoolExecutor(Math.max(1, Math.round(prefetchThreads / 2f)), prefetchThreads, 30, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(prefetchThreads * 8), + new LinkedBlockingQueue<>(prefetchThreads * 2), // TODO - increase queue size r -> { - String threadName = String.format("segment-prefetch-%s", - Long.toHexString(System.nanoTime() & 0xFFFFF)); + String threadName = String.format("segment-prefetch-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); return new Thread(r, threadName) { { setUncaughtExceptionHandler((t, e) -> { @@ -324,7 +326,26 @@ private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @ } }; }, - new ThreadPoolExecutor.DiscardPolicy()); + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (!executor.isShutdown() && r instanceof PrefetchRunnable && ((PrefetchRunnable) r).isExpedite()) { + PrefetchRunnable prefetchRunnable = (PrefetchRunnable) r; + // make space for the redispatch of the expedited prefetch task + Runnable oldest = executor.getQueue().poll(); + // re-dispatch from a pool thread as un-expedited prefetch tasks, + // takes the dispatching load off the critical path + executor.execute(() -> { + if (oldest != null) { + executor.execute(oldest); + } + executor.execute(new PrefetchRunnable(prefetchRunnable.segment, prefetchRunnable.depth, false)); + }); + } else { + LOG.info("The prefetch queue is full, dropping prefetch task {}", r); + } + } + }); this.prefetchPool.allowCoreThreadTimeOut(true); this.prefetchDepth = prefetchDepth; this.segmentLoader = segmentLoader; @@ -334,9 +355,13 @@ private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @ @Override public @NotNull Segment getSegment(@NotNull SegmentId id, @NotNull Callable loader) throws ExecutionException { return super.getSegment(id, () -> { + Instant start = Instant.now(); Segment s = loader.call(); + LOG.info("Segment {} loaded on critical path ({}ms)", id, Instant.now().toEpochMilli() - start.toEpochMilli()); if (s != null && id.isDataSegmentId()) { - prefetchPool.execute(new PrefetchRunnable(s, prefetchDepth)); + start = Instant.now(); + prefetchPool.execute(new PrefetchRunnable(s, prefetchDepth, true)); + LOG.info("Reference prefetch for segment {} enqueued on critical path ({}ms)", id, Instant.now().toEpochMilli() - start.toEpochMilli()); } return s; }); @@ -357,12 +382,17 @@ public void close() { } private class PrefetchRunnable implements Runnable { + private final Segment segment; + private final int depth; - PrefetchRunnable(Segment segment, int depth) { + private final boolean expedite; + + PrefetchRunnable(Segment segment, int depth, boolean expedite) { this.segment = segment; this.depth = depth; + this.expedite = expedite; } @Override @@ -384,7 +414,12 @@ public void run() { throw e; } finally { if (s != null && depth > 0) { - prefetchPool.execute(new PrefetchRunnable(s, depth - 1)); + BlockingQueue queue = prefetchPool.getQueue(); + if (queue.remainingCapacity() < queue.size() / 4) { + // throttle the enqueuing of prefetch tasks + TimeUnit.SECONDS.sleep(5); + } + prefetchPool.execute(new PrefetchRunnable(s, depth - 1, false)); } } }); @@ -400,6 +435,10 @@ public void run() { public String toString() { return "PrefetchRunnable{segment=" + segment.getSegmentId() + '}'; } + + public boolean isExpedite() { + return expedite; + } } } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 2db6bb5b645..2ace12b3e2e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -194,7 +194,7 @@ public SegmentReader getReader() { public SegmentIdProvider getSegmentIdProvider() { return tracker; } - + public int getBinariesInlineThreshold() { return binariesInlineThreshold; } @@ -204,6 +204,13 @@ public int getBinariesInlineThreshold() { */ public abstract Revisions getRevisions(); + /** + * Access to the tar files managed by subclasses. + * + * @return the tar files + */ + protected abstract @NotNull TarFiles getTarFiles(); + /** * Convenience method for accessing the root node for the current head. * This is equivalent to diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index 9cccd9c8e10..27744b5643f 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -470,6 +470,11 @@ public TarRevisions getRevisions() { } } + @Override + protected @NotNull TarFiles getTarFiles() { + return tarFiles; + } + @Override public void close() { try (ShutDownCloser ignored = shutDown.shutDown()) { diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java index 050f61ad0c0..c85015203dc 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java @@ -171,6 +171,11 @@ public ReadOnlyRevisions getRevisions() { return revisions; } + @Override + protected @NotNull TarFiles getTarFiles() { + return tarFiles; + } + public Set getReferencedSegmentIds() { return tracker.getReferencedSegmentIds(); } From f616bcfcc818a88a432aae13e8bdf249365b8957 Mon Sep 17 00:00:00 2001 From: Julian Sedding Date: Thu, 18 Sep 2025 17:21:52 +0200 Subject: [PATCH 3/3] OAK-11934 - segment prefetching for segmentstore cache - pool tuning and unit-test improvement --- .../jackrabbit/oak/segment/SegmentCache.java | 33 +++++++------------ .../oak/segment/SegmentCacheTest.java | 4 +++ 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java index 61054ddc71b..2906fd4c283 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java @@ -311,9 +311,9 @@ private static class PrefetchCache extends NonEmptyCache { */ private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @NotNull Function uuidToSegmentId, @NotNull Function segmentLoader) { super(cacheSizeMB); - this.prefetchPool = new ThreadPoolExecutor(Math.max(1, Math.round(prefetchThreads / 2f)), prefetchThreads, - 30, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(prefetchThreads * 2), // TODO - increase queue size + this.prefetchPool = new ThreadPoolExecutor(prefetchThreads, prefetchThreads, + 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(prefetchThreads * 16), r -> { String threadName = String.format("segment-prefetch-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); return new Thread(r, threadName) { @@ -332,17 +332,10 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (!executor.isShutdown() && r instanceof PrefetchRunnable && ((PrefetchRunnable) r).isExpedite()) { PrefetchRunnable prefetchRunnable = (PrefetchRunnable) r; // make space for the redispatch of the expedited prefetch task - Runnable oldest = executor.getQueue().poll(); + executor.getQueue().poll(); // re-dispatch from a pool thread as un-expedited prefetch tasks, // takes the dispatching load off the critical path - executor.execute(() -> { - if (oldest != null) { - executor.execute(oldest); - } - executor.execute(new PrefetchRunnable(prefetchRunnable.segment, prefetchRunnable.depth, false)); - }); - } else { - LOG.info("The prefetch queue is full, dropping prefetch task {}", r); + executor.execute(new PrefetchRunnable(prefetchRunnable.segment, prefetchRunnable.depth, false)); } } }); @@ -355,13 +348,16 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { @Override public @NotNull Segment getSegment(@NotNull SegmentId id, @NotNull Callable loader) throws ExecutionException { return super.getSegment(id, () -> { - Instant start = Instant.now(); Segment s = loader.call(); - LOG.info("Segment {} loaded on critical path ({}ms)", id, Instant.now().toEpochMilli() - start.toEpochMilli()); if (s != null && id.isDataSegmentId()) { - start = Instant.now(); + long start = System.nanoTime(); prefetchPool.execute(new PrefetchRunnable(s, prefetchDepth, true)); - LOG.info("Reference prefetch for segment {} enqueued on critical path ({}ms)", id, Instant.now().toEpochMilli() - start.toEpochMilli()); + long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start); + if (micros > 10_000) { // log only if it took more than 10ms + LOG.info("Reference prefetch for segment {} enqueued in {}µs", id, micros); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Reference prefetch for segment {} enqueued in {}µs", id, micros); + } } return s; }); @@ -414,11 +410,6 @@ public void run() { throw e; } finally { if (s != null && depth > 0) { - BlockingQueue queue = prefetchPool.getQueue(); - if (queue.remainingCapacity() < queue.size() / 4) { - // throttle the enqueuing of prefetch tasks - TimeUnit.SECONDS.sleep(5); - } prefetchPool.execute(new PrefetchRunnable(s, depth - 1, false)); } } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java index 3e7bef90e3d..754d8dd130a 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java @@ -252,6 +252,10 @@ public void prefetchCache() throws Exception { when(segment1.getReferencedSegmentId(eq(0))).thenReturn(id2.asUUID()); when(segment1.getReferencedSegmentId(eq(1))).thenReturn(id3.asUUID()); + when(segment2.estimateMemoryUsage()).thenReturn(128 * 1024); + when(segment2.estimateMemoryUsage()).thenReturn(128 * 1024); + when(segment3.estimateMemoryUsage()).thenReturn(128 * 1024); + try (SegmentCache segmentCache = new SegmentCache.Config() .withCacheSizeMB(10) .withPrefetchThreads(1)