Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@
package org.apache.jackrabbit.oak.segment;

import static java.util.Objects.requireNonNull;
import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
import static org.apache.jackrabbit.oak.segment.CacheWeights.segmentWeight;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.jackrabbit.guava.common.cache.Cache;
Expand All @@ -35,6 +46,9 @@
import org.apache.jackrabbit.oak.cache.AbstractCacheStats;
import org.apache.jackrabbit.oak.segment.CacheWeights.SegmentCacheWeigher;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A cache for {@link SegmentId#isDataSegmentId() data} {@link Segment}
Expand All @@ -46,7 +60,7 @@
* SegmentId#segment}. Every time an segment is evicted from this cache the
* memoised segment is discarded (see {@code SegmentId#onAccess}.
*/
public abstract class SegmentCache {
public abstract class SegmentCache implements Closeable {

/**
* Default maximum weight of this cache in MB
Expand All @@ -59,17 +73,71 @@ public abstract class SegmentCache {
* Create a new segment cache of the given size. Returns an always empty
* cache for {@code cacheSizeMB <= 0}.
*
* @param cacheSizeMB size of the cache in megabytes.
* @param cacheSizeMB size of the cache in megabytes.
*/
@NotNull
public static SegmentCache newSegmentCache(long cacheSizeMB) {
if (cacheSizeMB > 0) {
public static SegmentCache newSegmentCache(int cacheSizeMB) {
return new SegmentCache.Config()
.withCacheSizeMB(cacheSizeMB)
.build(null, null);
}

private static SegmentCache newSegmentCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @Nullable Function<UUID, SegmentId> uuidToSegmentId, @Nullable Function<SegmentId, Segment> segmentLoader) {
if (cacheSizeMB > 0 && prefetchThreads > 0 && prefetchDepth > 0 && uuidToSegmentId != null && segmentLoader != null) {
return new PrefetchCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader);
} else if (cacheSizeMB > 0) {
return new NonEmptyCache(cacheSizeMB);
} else {
return new EmptyCache();
}
}

public static final class Config {

private int cacheSizeMB = DEFAULT_SEGMENT_CACHE_MB;

private int prefetchThreads = 0;

private int prefetchDepth = 1;

public Config withCacheSizeMB(int cacheSizeMB) {
this.cacheSizeMB = cacheSizeMB;
return this;
}

public Config withPrefetchThreads(int prefetchThreads) {
this.prefetchThreads = prefetchThreads;
return this;
}

public Config withPrefetchDepth(int prefetchDepth) {
this.prefetchDepth = prefetchDepth;
return this;
}

@NotNull
public SegmentCache build(@Nullable Function<UUID, SegmentId> uuidToSegmentId, @Nullable Function<SegmentId, Segment> segmentLoader) {
if (prefetchThreads > 0 && prefetchDepth > 0) {
checkArgument(uuidToSegmentId != null, "uuidToSegmentId must be provided when prefetching is enabled");
checkArgument(segmentLoader != null, "segmentLoader must be provided when prefetching is enabled");
}
return SegmentCache.newSegmentCache(cacheSizeMB, prefetchThreads, prefetchDepth, uuidToSegmentId, segmentLoader);
}

public int getSegmentCacheSize() {
return cacheSizeMB;
}

@Override
public String toString() {
return "Config{" +
"cacheSizeMB=" + cacheSizeMB +
", prefetchThreads=" + prefetchThreads +
", prefetchDepth=" + prefetchDepth +
'}';
}
}

/**
* Retrieve an segment from the cache or load it and cache it if not yet in
* the cache.
Expand Down Expand Up @@ -109,6 +177,11 @@ public abstract Segment getSegment(@NotNull SegmentId id, @NotNull Callable<Segm
*/
public abstract void recordHit();

@Override
public void close() {
// nothing to do
}

private static class NonEmptyCache extends SegmentCache {

/**
Expand All @@ -129,8 +202,8 @@ private static class NonEmptyCache extends SegmentCache {
*
* @param cacheSizeMB size of the cache in megabytes.
*/
private NonEmptyCache(long cacheSizeMB) {
long maximumWeight = cacheSizeMB * 1024 * 1024;
private NonEmptyCache(int cacheSizeMB) {
long maximumWeight = cacheSizeMB * 1024 * 1024L;
this.cache = CacheBuilder.newBuilder()
.concurrencyLevel(16)
.maximumWeight(maximumWeight)
Expand Down Expand Up @@ -215,6 +288,151 @@ public void recordHit() {
}
}


private static class PrefetchCache extends NonEmptyCache {

private static final Logger LOG = LoggerFactory.getLogger(PrefetchCache.class);

private final ThreadPoolExecutor prefetchPool;

private final int prefetchDepth;

private final Function<UUID, SegmentId> uuidToSegmentId;

private final Function<SegmentId, Segment> segmentLoader;

/**
* Create a new cache of the given size.
*
* @param cacheSizeMB size of the cache in megabytes.
* @param prefetchThreads the number of threads to use for prefetching
* @param prefetchDepth the depth to prefetch
* @param uuidToSegmentId the function to get the SegmentId by its UUID
*/
private PrefetchCache(int cacheSizeMB, int prefetchThreads, int prefetchDepth, @NotNull Function<UUID, SegmentId> uuidToSegmentId, @NotNull Function<SegmentId, Segment> segmentLoader) {
super(cacheSizeMB);
this.prefetchPool = new ThreadPoolExecutor(prefetchThreads, prefetchThreads,
10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(prefetchThreads * 16),
r -> {
String threadName = String.format("segment-prefetch-%s", Long.toHexString(System.nanoTime() & 0xFFFFF));
return new Thread(r, threadName) {
{
setUncaughtExceptionHandler((t, e) -> {
if (!(e instanceof InterruptedException)) {
LOG.warn("Uncaught exception in thread {}", t.getName(), e);
}
});
}
};
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown() && r instanceof PrefetchRunnable && ((PrefetchRunnable) r).isExpedite()) {
PrefetchRunnable prefetchRunnable = (PrefetchRunnable) r;
// make space for the redispatch of the expedited prefetch task
executor.getQueue().poll();
// re-dispatch from a pool thread as un-expedited prefetch tasks,
// takes the dispatching load off the critical path
executor.execute(new PrefetchRunnable(prefetchRunnable.segment, prefetchRunnable.depth, false));
}
}
});
this.prefetchPool.allowCoreThreadTimeOut(true);
this.prefetchDepth = prefetchDepth;
this.segmentLoader = segmentLoader;
this.uuidToSegmentId = uuidToSegmentId;
}

@Override
public @NotNull Segment getSegment(@NotNull SegmentId id, @NotNull Callable<Segment> loader) throws ExecutionException {
return super.getSegment(id, () -> {
Segment s = loader.call();
if (s != null && id.isDataSegmentId()) {
long start = System.nanoTime();
prefetchPool.execute(new PrefetchRunnable(s, prefetchDepth, true));
long micros = TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - start);
if (micros > 10_000) { // log only if it took more than 10ms
LOG.info("Reference prefetch for segment {} enqueued in {}µs", id, micros);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Reference prefetch for segment {} enqueued in {}µs", id, micros);
}
}
return s;
});
}

@Override
public void close() {
super.close();
try {
prefetchPool.shutdown();
if (!prefetchPool.awaitTermination(10, TimeUnit.SECONDS)) {
prefetchPool.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
prefetchPool.shutdownNow();
}
}

private class PrefetchRunnable implements Runnable {

private final Segment segment;

private final int depth;

private final boolean expedite;

PrefetchRunnable(Segment segment, int depth, boolean expedite) {
this.segment = segment;
this.depth = depth;
this.expedite = expedite;
}

@Override
public void run() {
SegmentId segmentId = segment.getSegmentId();
if (SegmentId.isDataSegmentId(segmentId.getLeastSignificantBits())) {
for (int i = 0; i < segment.getReferencedSegmentIdCount(); i++) {
UUID referencedSegmentId = segment.getReferencedSegmentId(i);
SegmentId id = uuidToSegmentId.apply(referencedSegmentId);
prefetchPool.execute(() -> {
try {
PrefetchCache.super.getSegment(id, () -> {
Segment s = null;
try {
LOG.debug("Prefetching segment {}, referenced by segment {}", id, segmentId);
return s = segmentLoader.apply(id);
} catch (SegmentNotFoundException e) {
LOG.warn("SegmentNotFoundException during prefetch of segment {}, referenced by segment {}", id, segmentId);
throw e;
} finally {
if (s != null && depth > 0) {
prefetchPool.execute(new PrefetchRunnable(s, depth - 1, false));
}
}
});
} catch (ExecutionException e) {
LOG.warn("Error prefetching segment {}", id, e);
}
});
}
}
}

@Override
public String toString() {
return "PrefetchRunnable{segment=" + segment.getSegmentId() + '}';
}

public boolean isExpedite() {
return expedite;
}
}
}

/** An always empty cache */
private static class EmptyCache extends SegmentCache {
private final Stats stats = new Stats(NAME, 0, () -> 0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.jackrabbit.oak.segment.file;

import static org.apache.jackrabbit.oak.segment.SegmentCache.newSegmentCache;
import static org.apache.jackrabbit.oak.segment.data.SegmentData.newSegmentData;

import java.io.Closeable;
Expand Down Expand Up @@ -130,7 +129,7 @@ public void recoverEntry(UUID uuid, byte[] data, EntryRecovery entryRecovery) th
protected final IOMonitor ioMonitor;

protected final RemoteStoreMonitor remoteStoreMonitor;

protected final int binariesInlineThreshold;

AbstractFileStore(final FileStoreBuilder builder) {
Expand All @@ -142,7 +141,10 @@ public SegmentId newSegmentId(long msb, long lsb) {
}
});
this.blobStore = builder.getBlobStore();
this.segmentCache = newSegmentCache(builder.getSegmentCacheSize());
this.segmentCache = builder.getSegmentCacheConfig().build(
uuid -> tracker.newSegmentId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()),
segmentId -> readSegmentUncached(getTarFiles(), segmentId)
);
this.segmentReader = new CachingSegmentReader(
this::getWriter,
blobStore,
Expand Down Expand Up @@ -192,7 +194,7 @@ public SegmentReader getReader() {
public SegmentIdProvider getSegmentIdProvider() {
return tracker;
}

public int getBinariesInlineThreshold() {
return binariesInlineThreshold;
}
Expand All @@ -202,6 +204,13 @@ public int getBinariesInlineThreshold() {
*/
public abstract Revisions getRevisions();

/**
* Access to the tar files managed by subclasses.
*
* @return the tar files
*/
protected abstract @NotNull TarFiles getTarFiles();

/**
* Convenience method for accessing the root node for the current head.
* This is equivalent to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,11 @@ public TarRevisions getRevisions() {
}
}

@Override
protected @NotNull TarFiles getTarFiles() {
return tarFiles;
}

@Override
public void close() {
try (ShutDownCloser ignored = shutDown.shutDown()) {
Expand All @@ -487,6 +492,7 @@ public void close() {
closer.register(repositoryLock::unlock);
closer.register(tarFiles) ;
closer.register(revisions);
closer.register(segmentCache);

closeAndLogOnFail(closer);
}
Expand Down
Loading
Loading