Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
*/
package org.apache.jackrabbit.oak.plugins.index;

import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.jackrabbit.oak.api.jmx.IndexStatsMBean.STATUS_DONE;
import static org.apache.jackrabbit.oak.commons.PathUtils.elements;
import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.ASYNC_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.index.IndexConstants.REINDEX_PROPERTY_NAME;
import static org.apache.jackrabbit.oak.plugins.memory.EmptyNodeState.MISSING_NODE;
Expand All @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
Expand All @@ -50,7 +51,6 @@
import javax.management.openmbean.SimpleType;
import javax.management.openmbean.TabularData;

import com.codahale.metrics.MetricRegistry;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.jackrabbit.api.stats.TimeSeries;
import org.apache.jackrabbit.oak.api.CommitFailedException;
Expand All @@ -65,10 +65,13 @@
import org.apache.jackrabbit.oak.plugins.commit.ConflictValidatorProvider;
import org.apache.jackrabbit.oak.plugins.index.IndexUpdate.MissingIndexProviderStrategy;
import org.apache.jackrabbit.oak.plugins.index.TrackingCorruptIndexHandler.CorruptIndexInfo;
import org.apache.jackrabbit.oak.plugins.index.optimizer.DiffIndexUpdater;
import org.apache.jackrabbit.oak.plugins.index.optimizer.IndexDefinitionGenerator;
import org.apache.jackrabbit.oak.plugins.index.progress.MetricRateEstimator;
import org.apache.jackrabbit.oak.plugins.index.progress.NodeCounterMBeanEstimator;
import org.apache.jackrabbit.oak.plugins.memory.PropertyStates;
import org.apache.jackrabbit.oak.plugins.metric.MetricStatisticsProvider;
import org.apache.jackrabbit.oak.query.stats.QueryStatsMBean;
import org.apache.jackrabbit.oak.spi.commit.CommitContext;
import org.apache.jackrabbit.oak.spi.commit.CommitHook;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
Expand All @@ -77,6 +80,7 @@
import org.apache.jackrabbit.oak.spi.commit.EditorDiff;
import org.apache.jackrabbit.oak.spi.commit.EditorHook;
import org.apache.jackrabbit.oak.spi.commit.EditorProvider;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.ResetCommitAttributeHook;
import org.apache.jackrabbit.oak.spi.commit.SimpleCommitContext;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
Expand All @@ -86,6 +90,7 @@
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.Tracker;
import org.apache.jackrabbit.oak.stats.CounterStats;
import org.apache.jackrabbit.oak.stats.Counting;
import org.apache.jackrabbit.oak.stats.HistogramStats;
Expand All @@ -100,6 +105,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.MetricRegistry;

public class AsyncIndexUpdate implements Runnable, Closeable {
/**
* Name of service property which determines the name of Async task
Expand Down Expand Up @@ -214,13 +221,22 @@ public class AsyncIndexUpdate implements Runnable, Closeable {

private final StatisticsProvider statisticsProvider;

private final Tracker<QueryStatsMBean> statsTracker;

public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
@NotNull IndexEditorProvider provider, boolean switchOnSync) {
this(name, store, provider, StatisticsProvider.NOOP, switchOnSync);
this(name, store, provider, StatisticsProvider.NOOP, switchOnSync, null);
}

public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
@NotNull IndexEditorProvider provider, StatisticsProvider statsProvider, boolean switchOnSync) {
@NotNull IndexEditorProvider provider, StatisticsProvider statsProvider,
boolean switchOnSync) {
this(name, store, provider, statsProvider, switchOnSync, null);
}

public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
@NotNull IndexEditorProvider provider, StatisticsProvider statsProvider,
boolean switchOnSync, @Nullable Tracker<QueryStatsMBean> statsTracker) {
this.name = checkValidName(name);
this.lastIndexedTo = lastIndexedTo(name);
this.store = requireNonNull(store);
Expand All @@ -230,6 +246,7 @@ public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
this.statisticsProvider = statsProvider;
this.indexStats = new AsyncIndexStats(name, statsProvider);
this.corruptIndexHandler.setMeterStats(statsProvider.getMeter(TrackingCorruptIndexHandler.CORRUPT_INDEX_METER_NAME, StatsOptions.METRICS_ONLY));
this.statsTracker = statsTracker;
}

public AsyncIndexUpdate(@NotNull String name, @NotNull NodeStore store,
Expand Down Expand Up @@ -515,6 +532,10 @@ private void runWhenPermitted() {
}
}

if (name.equals("async")) {
improveIndexes(store);
}

// start collecting runtime statistics
preAsyncRunStatsStats(indexStats);

Expand Down Expand Up @@ -632,6 +653,80 @@ private void runWhenPermitted() {
}
}

private void improveIndexes(NodeStore store) {
NodeState rootState = store.getRoot();
NodeBuilder builder = rootState.builder();
if (statsTracker == null) {
return;
}
if (!rootState.hasChildNode("oak:index")) {
return;
}
if (!rootState.getChildNode("oak:index").hasChildNode("diff.index")) {
return;
}
List<QueryStatsMBean> list = statsTracker.getServices();
if (list.isEmpty()) {
return;
}
QueryStatsMBean stats = list.get(0);
if (stats == null) {
return;
}
TabularData slow = stats.getSlowQueries();

@SuppressWarnings("unchecked")
Collection<CompositeData> coll = new ArrayList<>((Collection<CompositeData>) slow.values());

// Find inefficient queries and add to collection for index diff generation
coll.addAll(findInefficientQueries(stats));

if (coll.isEmpty()) {
return;
}
boolean changed = false;
for (CompositeData cd : coll) {
String language = (String) cd.get("language");
String statement = (String) cd.get("statement");
if (statement.startsWith("explain") || statement.indexOf("/* oak-internal */") >= 0) {
continue;
}
log.info("language {} statement {}", language, statement);
String indexDef = IndexDefinitionGenerator.generateIndexDefinition(language, statement);
changed |= DiffIndexUpdater.applyIndexDefinition(store, rootState, builder, indexDef, statement);
}
if (changed) {
try {
store.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
} catch (CommitFailedException e) {
log.warn("Can not store indexes", e);
}
}
}

@SuppressWarnings("unchecked")
private List<CompositeData> findInefficientQueries(final QueryStatsMBean stats) {
final TabularData popularQueries = stats.getPopularQueries();
final List<CompositeData> inefficientQueries = new ArrayList<>();

for (CompositeData queryData : (Collection<? extends CompositeData>) popularQueries.values()) {
final Long rowsRead = (Long) queryData.get("rowsRead");
final Long rowsScanned = (Long) queryData.get("rowsScanned");

int readEfficiency = 100;

if (rowsScanned > 0) {
readEfficiency = (int) ((rowsRead * 100f) / rowsScanned);
}

if (readEfficiency <= stats.getIndexOptimizerLimit()) {
inefficientQueries.add(queryData);
}
}

return inefficientQueries;
}

private void clearLease() throws CommitFailedException {
NodeState root = store.getRoot();
NodeState async = root.getChildNode(ASYNC);
Expand Down Expand Up @@ -807,7 +902,7 @@ protected boolean updateIndex(NodeState before, String beforeCheckpoint,
CommitInfo info = new CommitInfo(CommitInfo.OAK_UNKNOWN, CommitInfo.OAK_UNKNOWN,
Map.of(IndexConstants.CHECKPOINT_CREATION_TIME, afterTime));
indexUpdate =
new IndexUpdate(provider, name, after, builder, callback, callback, info, corruptIndexHandler)
new IndexUpdate(provider, name, after, builder, callback, callback, info, corruptIndexHandler, store)
.withMissingProviderStrategy(missingStrategy);
configureRateEstimator(indexUpdate);
CommitFailedException exception =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.jackrabbit.oak.plugins.index;

import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -31,11 +34,13 @@
import org.apache.jackrabbit.oak.plugins.index.property.jmx.PropertyIndexAsyncReindex;
import org.apache.jackrabbit.oak.plugins.index.property.jmx.PropertyIndexAsyncReindexMBean;
import org.apache.jackrabbit.oak.plugins.observation.ChangeCollectorProvider;
import org.apache.jackrabbit.oak.query.stats.QueryStatsMBean;
import org.apache.jackrabbit.oak.spi.commit.ValidatorProvider;
import org.apache.jackrabbit.oak.spi.state.Clusterable;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Tracker;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
Expand All @@ -52,9 +57,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.jackrabbit.oak.commons.conditions.Validate.checkArgument;
import static org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.registerMBean;

@Component(
configurationPolicy = ConfigurationPolicy.REQUIRE,
service = {})
Expand Down Expand Up @@ -116,6 +118,8 @@ public class AsyncIndexerService {

private WhiteboardExecutor executor;

private Tracker<QueryStatsMBean> statsTracker;

@Activate
public void activate(BundleContext bundleContext, Configuration config) {
List<AsyncConfig> asyncIndexerConfig = getAsyncConfig(config.asyncConfigs());
Expand All @@ -124,12 +128,13 @@ public void activate(BundleContext bundleContext, Configuration config) {
indexEditorProvider.start(whiteboard);
executor = new WhiteboardExecutor();
executor.start(whiteboard);
statsTracker = whiteboard.track(QueryStatsMBean.class);

TrackingCorruptIndexHandler corruptIndexHandler = createCorruptIndexHandler(config);

for (AsyncConfig c : asyncIndexerConfig) {
AsyncIndexUpdate task = new AsyncIndexUpdate(c.name, nodeStore, indexEditorProvider,
statisticsProvider, false);
statisticsProvider, false, statsTracker);
task.setCorruptIndexHandler(corruptIndexHandler);
task.setValidatorProviders(Collections.singletonList(validatorProvider));

Expand Down Expand Up @@ -158,7 +163,7 @@ public void activate(BundleContext bundleContext, Configuration config) {
private void registerAsyncReindexSupport(Whiteboard whiteboard) {
// async reindex
String name = IndexConstants.ASYNC_REINDEX_VALUE;
AsyncIndexUpdate task = new AsyncIndexUpdate(name, nodeStore, indexEditorProvider, statisticsProvider, true);
AsyncIndexUpdate task = new AsyncIndexUpdate(name, nodeStore, indexEditorProvider, statisticsProvider, true, null);
PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(task, executor);

final Registration reg = new CompositeRegistration(
Expand All @@ -177,6 +182,9 @@ public void deactivate() throws IOException {
executor.stop();
executor = null;
}
if (statsTracker != null) {
statsTracker.stop();
}

//Close the task *after* unregistering the jobs
closer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public Iterable<IndexInfo> getAllIndexInfo() {
if (indexPathService.getMountInfoProvider().hasNonDefaultMounts()) {
activeIndexes.addAll(IndexName.filterReplacedIndexes(allIndexes, nodeStore.getRoot(), true));
} else {
activeIndexes.addAll(allIndexes);
activeIndexes.addAll(IndexName.filterNewestIndexes(allIndexes, nodeStore.getRoot()));
}
return IterableUtils.filter(IterableUtils.transform(indexPathService.getIndexPaths(), indexPath -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,23 @@ public static Collection<String> filterReplacedIndexes(Collection<String> indexP
return result;
}

public static Collection<String> filterNewestIndexes(Collection<String> indexPaths, NodeState rootState) {
HashMap<String, IndexName> latestVersions = new HashMap<>();
for (String p : indexPaths) {
IndexName indexName = IndexName.parse(p);
IndexName stored = latestVersions.get(indexName.baseName);
if (stored == null || stored.compareTo(indexName) < 0) {
// no old version, or old version is smaller: use
latestVersions.put(indexName.baseName, indexName);
}
}
ArrayList<String> result = new ArrayList<>(latestVersions.size());
for (IndexName n : latestVersions.values()) {
result.add(n.nodeName);
}
return result;
}

public String nextCustomizedName() {
return baseName + "-" + productVersion + "-custom-" + (customerVersion + 1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.jackrabbit.oak.commons.collections.SetUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback.IndexProgress;
import org.apache.jackrabbit.oak.plugins.index.NodeTraversalCallback.PathSource;
import org.apache.jackrabbit.oak.plugins.index.diff.DiffIndex;
import org.apache.jackrabbit.oak.plugins.index.diff.DiffIndexMerger;
import org.apache.jackrabbit.oak.plugins.index.progress.IndexingProgressReporter;
import org.apache.jackrabbit.oak.plugins.index.progress.NodeCountEstimator;
import org.apache.jackrabbit.oak.plugins.index.progress.TraversalRateEstimator;
Expand All @@ -60,6 +62,7 @@
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStateUtils;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
import org.apache.jackrabbit.oak.spi.state.ReadOnlyBuilder;
import org.apache.jackrabbit.util.ISO8601;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -108,6 +111,8 @@ public class IndexUpdate implements Editor, PathSource {
}
}

private final NodeStore store;

private final IndexUpdateRootState rootState;

private final NodeBuilder builder;
Expand Down Expand Up @@ -150,6 +155,16 @@ public IndexUpdate(
NodeState root, NodeBuilder builder,
IndexUpdateCallback updateCallback, NodeTraversalCallback traversalCallback,
CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler) {
this(provider, async, root, builder, updateCallback, traversalCallback, commitInfo, corruptIndexHandler, null);
}

public IndexUpdate(
IndexEditorProvider provider, String async,
NodeState root, NodeBuilder builder,
IndexUpdateCallback updateCallback, NodeTraversalCallback traversalCallback,
CommitInfo commitInfo, CorruptIndexHandler corruptIndexHandler,
@Nullable NodeStore store) {
this.store = store;
this.parent = null;
this.name = null;
this.path = "/";
Expand All @@ -158,6 +173,7 @@ public IndexUpdate(
}

private IndexUpdate(IndexUpdate parent, String name) {
this.store = parent.store;
this.parent = requireNonNull(parent);
this.name = name;
this.rootState = parent.rootState;
Expand Down Expand Up @@ -279,6 +295,12 @@ private static boolean hasAnyHiddenNodes(NodeBuilder builder) {
}

private void collectIndexEditors(NodeBuilder definitions, NodeState before) throws CommitFailedException {
if (definitions.hasChildNode(DiffIndexMerger.DIFF_INDEX)
&& "disabled".equals(definitions.child(DiffIndexMerger.DIFF_INDEX).getString("type"))) {
if (rootState.async == null || rootState.async.equals("async")) {
DiffIndex.createNewIndexesIfNeeded(store, definitions);
}
}
for (String name : definitions.getChildNodeNames()) {
NodeBuilder definition = definitions.getChildNode(name);
if (isIncluded(rootState.async, definition)) {
Expand Down
Loading
Loading