diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java index 41238c04b4d..2906fd4c283 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentCache.java @@ -20,12 +20,23 @@ package org.apache.jackrabbit.oak.segment; import static java.util.Objects.requireNonNull; +import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument; import static org.apache.jackrabbit.oak.segment.CacheWeights.segmentWeight; +import java.io.Closeable; +import java.io.IOException; +import java.time.Instant; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.jackrabbit.guava.common.cache.Cache; @@ -35,6 +46,9 @@ import org.apache.jackrabbit.oak.cache.AbstractCacheStats; import org.apache.jackrabbit.oak.segment.CacheWeights.SegmentCacheWeigher; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A cache for {@link SegmentId#isDataSegmentId() data} {@link Segment} @@ -46,7 +60,7 @@ * SegmentId#segment}. Every time an segment is evicted from this cache the * memoised segment is discarded (see {@code SegmentId#onAccess}. */ -public abstract class SegmentCache { +public abstract class SegmentCache implements Closeable { /** * Default maximum weight of this cache in MB @@ -59,17 +73,71 @@ public abstract class SegmentCache { * Create a new segment cache of the given size. Returns an always empty * cache for {@code cacheSizeMB <= 0}. * - * @param cacheSizeMB size of the cache in megabytes. + * @param cacheSizeMB size of the cache in megabytes. */ @NotNull - public static SegmentCache newSegmentCache(long cacheSizeMB) { - if (cacheSizeMB > 0) { + public static SegmentCache newSegmentCache(int cacheSizeMB) { + return new SegmentCache.Config() + .withCacheSizeMB(cacheSizeMB) + .build(null, null); + } + + private static SegmentCache newSegmentCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @Nullable Function uuidToSegmentId, @Nullable Function segmentLoader) { + if (cacheSizeMB > 0 && prefetchThreads > 0 && prefetchDepth > 0 && uuidToSegmentId != null && segmentLoader != null) { + return new PrefetchCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader); + } else if (cacheSizeMB > 0) { return new NonEmptyCache(cacheSizeMB); } else { return new EmptyCache(); } } + public static final class Config { + + private int cacheSizeMB = DEFAULT_SEGMENT_CACHE_MB; + + private int prefetchThreads = 0; + + private int prefetchDepth = 1; + + public Config withCacheSizeMB(int cacheSizeMB) { + this.cacheSizeMB = cacheSizeMB; + return this; + } + + public Config withPrefetchThreads(int prefetchThreads) { + this.prefetchThreads = prefetchThreads; + return this; + } + + public Config withPrefetchDepth(int prefetchDepth) { + this.prefetchDepth = prefetchDepth; + return this; + } + + @NotNull + public SegmentCache build(@Nullable Function uuidToSegmentId, @Nullable Function segmentLoader) { + if (prefetchThreads > 0 && prefetchDepth > 0) { + checkArgument(uuidToSegmentId != null, "uuidToSegmentId must be provided when prefetching is enabled"); + checkArgument(segmentLoader != null, "segmentLoader must be provided when prefetching is enabled"); + } + return SegmentCache.newSegmentCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader); + } + + public int getSegmentCacheSize() { + return cacheSizeMB; + } + + @Override + public String toString() { + return "Config{" + + "cacheSizeMB=" + cacheSizeMB + + ", prefetchThreads=" + prefetchThreads + + ", prefetchDepth=" + prefetchDepth + + '}'; + } + } + /** * Retrieve an segment from the cache or load it and cache it if not yet in * the cache. @@ -109,6 +177,11 @@ public abstract Segment getSegment(@NotNull SegmentId id, @NotNull Callable uuidToSegmentId; + + private final Function segmentLoader; + + /** + * Create a new cache of the given size. + * + * @param cacheSizeMB size of the cache in megabytes. + * @param prefetchThreads the number of threads to use for prefetching + * @param prefetchDepth the depth to prefetch + * @param uuidToSegmentId the function to get the SegmentId by its UUID + */ + private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @NotNull Function uuidToSegmentId, @NotNull Function segmentLoader) { + super(cacheSizeMB); + this.prefetchPool = new ThreadPoolExecutor(prefetchThreads, prefetchThreads, + 10, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(prefetchThreads * 16), + r -> { + String threadName = String.format("segment-prefetch-%s", Long.toHexString(System.nanoTime() & 0xFFFFF)); + return new Thread(r, threadName) { + { + setUncaughtExceptionHandler((t, e) -> { + if (!(e instanceof InterruptedException)) { + LOG.warn("Uncaught exception in thread {}", t.getName(), e); + } + }); + } + }; + }, + new RejectedExecutionHandler() { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + if (!executor.isShutdown() && r instanceof PrefetchRunnable && ((PrefetchRunnable) r).isExpedite()) { + PrefetchRunnable prefetchRunnable = (PrefetchRunnable) r; + // make space for the redispatch of the expedited prefetch task + executor.getQueue().poll(); + // re-dispatch from a pool thread as un-expedited prefetch tasks, + // takes the dispatching load off the critical path + executor.execute(new PrefetchRunnable(prefetchRunnable.segment, prefetchRunnable.depth, false)); + } + } + }); + this.prefetchPool.allowCoreThreadTimeOut(true); + this.prefetchDepth = prefetchDepth; + this.segmentLoader = segmentLoader; + this.uuidToSegmentId = uuidToSegmentId; + } + + @Override + public @NotNull Segment getSegment(@NotNull SegmentId id, @NotNull Callable loader) throws ExecutionException { + return super.getSegment(id, () -> { + Segment s = loader.call(); + if (s != null && id.isDataSegmentId()) { + long start = System.nanoTime(); + prefetchPool.execute(new PrefetchRunnable(s, prefetchDepth, true)); + long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start); + if (micros > 10_000) { // log only if it took more than 10ms + LOG.info("Reference prefetch for segment {} enqueued in {}µs", id, micros); + } else if (LOG.isDebugEnabled()) { + LOG.debug("Reference prefetch for segment {} enqueued in {}µs", id, micros); + } + } + return s; + }); + } + + @Override + public void close() { + super.close(); + try { + prefetchPool.shutdown(); + if (!prefetchPool.awaitTermination(10, TimeUnit.SECONDS)) { + prefetchPool.shutdownNow(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + prefetchPool.shutdownNow(); + } + } + + private class PrefetchRunnable implements Runnable { + + private final Segment segment; + + private final int depth; + + private final boolean expedite; + + PrefetchRunnable(Segment segment, int depth, boolean expedite) { + this.segment = segment; + this.depth = depth; + this.expedite = expedite; + } + + @Override + public void run() { + SegmentId segmentId = segment.getSegmentId(); + if (SegmentId.isDataSegmentId(segmentId.getLeastSignificantBits())) { + for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) { + UUID referencedSegmentId = segment.getReferencedSegmentId(i); + SegmentId id = uuidToSegmentId.apply(referencedSegmentId); + prefetchPool.execute(() -> { + try { + PrefetchCache.super.getSegment(id, () -> { + Segment s = null; + try { + LOG.debug("Prefetching segment {}, referenced by segment {}", id, segmentId); + return s = segmentLoader.apply(id); + } catch (SegmentNotFoundException e) { + LOG.warn("SegmentNotFoundException during prefetch of segment {}, referenced by segment {}", id, segmentId); + throw e; + } finally { + if (s != null && depth > 0) { + prefetchPool.execute(new PrefetchRunnable(s, depth - 1, false)); + } + } + }); + } catch (ExecutionException e) { + LOG.warn("Error prefetching segment {}", id, e); + } + }); + } + } + } + + @Override + public String toString() { + return "PrefetchRunnable{segment=" + segment.getSegmentId() + '}'; + } + + public boolean isExpedite() { + return expedite; + } + } + } + /** An always empty cache */ private static class EmptyCache extends SegmentCache { private final Stats stats = new Stats(NAME, 0, () -> 0L); diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java index 33a23b0e465..2ace12b3e2e 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractFileStore.java @@ -18,7 +18,6 @@ */ package org.apache.jackrabbit.oak.segment.file; -import static org.apache.jackrabbit.oak.segment.SegmentCache.newSegmentCache; import static org.apache.jackrabbit.oak.segment.data.SegmentData.newSegmentData; import java.io.Closeable; @@ -130,7 +129,7 @@ public void recoverEntry(UUID uuid, byte[] data, EntryRecovery entryRecovery) th protected final IOMonitor ioMonitor; protected final RemoteStoreMonitor remoteStoreMonitor; - + protected final int binariesInlineThreshold; AbstractFileStore(final FileStoreBuilder builder) { @@ -142,7 +141,10 @@ public SegmentId newSegmentId(long msb, long lsb) { } }); this.blobStore = builder.getBlobStore(); - this.segmentCache = newSegmentCache(builder.getSegmentCacheSize()); + this.segmentCache = builder.getSegmentCacheConfig().build( + uuid -> tracker.newSegmentId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()), + segmentId -> readSegmentUncached(getTarFiles(), segmentId) + ); this.segmentReader = new CachingSegmentReader( this::getWriter, blobStore, @@ -192,7 +194,7 @@ public SegmentReader getReader() { public SegmentIdProvider getSegmentIdProvider() { return tracker; } - + public int getBinariesInlineThreshold() { return binariesInlineThreshold; } @@ -202,6 +204,13 @@ public int getBinariesInlineThreshold() { */ public abstract Revisions getRevisions(); + /** + * Access to the tar files managed by subclasses. + * + * @return the tar files + */ + protected abstract @NotNull TarFiles getTarFiles(); + /** * Convenience method for accessing the root node for the current head. * This is equivalent to diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java index e933b5611e8..27744b5643f 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java @@ -470,6 +470,11 @@ public TarRevisions getRevisions() { } } + @Override + protected @NotNull TarFiles getTarFiles() { + return tarFiles; + } + @Override public void close() { try (ShutDownCloser ignored = shutDown.shutDown()) { @@ -487,6 +492,7 @@ public void close() { closer.register(repositoryLock::unlock); closer.register(tarFiles) ; closer.register(revisions); + closer.register(segmentCache); closeAndLogOnFail(closer); } diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java index fd390022321..bcba75e429d 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStoreBuilder.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.util.HashSet; import java.util.Set; +import java.util.function.Consumer; import org.apache.jackrabbit.oak.commons.conditions.Validate; import org.apache.jackrabbit.oak.segment.CacheWeights.NodeCacheWeigher; @@ -40,6 +41,7 @@ import org.apache.jackrabbit.oak.segment.CacheWeights.TemplateCacheWeigher; import org.apache.jackrabbit.oak.segment.RecordCache; import org.apache.jackrabbit.oak.segment.Segment; +import org.apache.jackrabbit.oak.segment.SegmentCache; import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener; import org.apache.jackrabbit.oak.segment.WriterCacheManager; import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions; @@ -79,7 +81,8 @@ public class FileStoreBuilder { private int maxFileSize = DEFAULT_MAX_FILE_SIZE; - private int segmentCacheSize = DEFAULT_SEGMENT_CACHE_MB; + private final SegmentCache.Config segmentCacheConfig = new SegmentCache.Config() + .withCacheSizeMB(DEFAULT_SEGMENT_CACHE_MB); private int stringCacheSize = DEFAULT_STRING_CACHE_MB; @@ -94,7 +97,7 @@ public class FileStoreBuilder { private boolean memoryMapping = MEMORY_MAPPING_DEFAULT; private boolean offHeapAccess = getBoolean("access.off.heap"); - + private int binariesInlineThreshold = Segment.MEDIUM_LIMIT; private SegmentNodeStorePersistence persistence; @@ -192,7 +195,18 @@ public FileStoreBuilder withMaxFileSize(int maxFileSize) { */ @NotNull public FileStoreBuilder withSegmentCacheSize(int segmentCacheSize) { - this.segmentCacheSize = segmentCacheSize; + this.segmentCacheConfig.withCacheSizeMB(segmentCacheSize); + return this; + } + /** + * Configure the segment cache. + * + * @param segmentCacheConfigurer Callback to configure segment cache + * @return this instance + */ + @NotNull + public FileStoreBuilder withSegmentCache(Consumer segmentCacheConfigurer) { + segmentCacheConfigurer.accept(this.segmentCacheConfig); return this; } @@ -397,7 +411,7 @@ public FileStoreBuilder withEagerSegmentCaching(boolean eagerSegmentCaching) { this.eagerSegmentCaching = eagerSegmentCaching; return this; } - + /** * Sets the threshold under which binaries are inlined in data segments. * @param binariesInlineThreshold the threshold @@ -505,8 +519,9 @@ public int getMaxFileSize() { return maxFileSize; } - int getSegmentCacheSize() { - return segmentCacheSize; + @NotNull + SegmentCache.Config getSegmentCacheConfig() { + return segmentCacheConfig; } int getStringCacheSize() { @@ -585,7 +600,7 @@ boolean getStrictVersionCheck() { boolean getEagerSegmentCaching() { return eagerSegmentCaching; } - + int getBinariesInlineThreshold() { return binariesInlineThreshold; } @@ -598,7 +613,7 @@ public String toString() { ", blobStore=" + blobStore + ", binariesInlineThreshold=" + binariesInlineThreshold + ", maxFileSize=" + maxFileSize + - ", segmentCacheSize=" + segmentCacheSize + + ", segmentCacheConfig=" + segmentCacheConfig + ", stringCacheSize=" + stringCacheSize + ", templateCacheSize=" + templateCacheSize + ", stringDeduplicationCacheSize=" + stringDeduplicationCacheSize + diff --git a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java index 5037f172ffe..c85015203dc 100644 --- a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java +++ b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/ReadOnlyFileStore.java @@ -136,6 +136,7 @@ public void close() { Closer closer = Closer.create(); closer.register(tarFiles); closer.register(revisions); + closer.register(segmentCache); closeAndLogOnFail(closer); System.gc(); // for any memory-mappings that are no longer used log.info("TarMK closed: {}", directory); @@ -170,6 +171,11 @@ public ReadOnlyRevisions getRevisions() { return revisions; } + @Override + protected @NotNull TarFiles getTarFiles() { + return tarFiles; + } + public Set getReferencedSegmentIds() { return tracker.getReferencedSegmentIds(); } diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java index 4b965ce07d0..754d8dd130a 100644 --- a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentCacheTest.java @@ -23,13 +23,18 @@ import static org.apache.jackrabbit.oak.segment.SegmentStore.EMPTY_STORE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Stream; import org.apache.jackrabbit.oak.cache.AbstractCacheStats; import org.junit.Test; @@ -240,6 +245,45 @@ public void emptyCacheStatsTest() throws Exception { assertEquals(0, stats.getEvictionCount()); } + @Test + public void prefetchCache() throws Exception { + // let segment1 reference segment2 and segment3 + when(segment1.getReferencedSegmentIdCount()).thenReturn(2); + when(segment1.getReferencedSegmentId(eq(0))).thenReturn(id2.asUUID()); + when(segment1.getReferencedSegmentId(eq(1))).thenReturn(id3.asUUID()); + + when(segment2.estimateMemoryUsage()).thenReturn(128 * 1024); + when(segment2.estimateMemoryUsage()).thenReturn(128 * 1024); + when(segment3.estimateMemoryUsage()).thenReturn(128 * 1024); + + try (SegmentCache segmentCache = new SegmentCache.Config() + .withCacheSizeMB(10) + .withPrefetchThreads(1) + .build( + uuid -> Stream.of(id1, id2, id3).filter(id -> id.asUUID().equals(uuid)).findFirst().orElse(null), + Map.of( + id1, segment1, + id2, segment2, + id3, segment3 + )::get + )) { + + segmentCache.getSegment(id1, () -> segment1); + long timeout = System.currentTimeMillis() + 5000; + while (segmentCache.getCacheStats().getLoadCount() < 3) { + // wait for prefetch to complete + TimeUnit.MILLISECONDS.sleep(1); + if (System.currentTimeMillis() > timeout) { + fail("Timeout waiting for prefetch to complete"); + } + } + + Segment shouldNeverBeLoaded = mock(Segment.class); + assertSame(segment2, segmentCache.getSegment(id2, () -> shouldNeverBeLoaded)); + assertSame(segment3, segmentCache.getSegment(id3, () -> shouldNeverBeLoaded)); + } + } + private static void expect(Class exceptionType, Callable thunk) { try { thunk.call();