Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions exec/java-exec/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,12 @@
<artifactId>swagger-jaxrs2-servlet-initializer-v2-jakarta</artifactId>
<version>${swagger.version}</version>
</dependency>

<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.cache;

import java.util.concurrent.TimeUnit;

import org.apache.calcite.rel.RelNode;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.CacheKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

/**
* Manages caches for query plans and transform results to improve query planning performance.
* Uses Caffeine cache with configurable TTL and maximum size.
*/
public class CustomCacheManager {
private static final Logger logger = LoggerFactory.getLogger(CustomCacheManager.class);

private static volatile Cache<String, PhysicalPlan> queryCache;
private static volatile Cache<CacheKey, RelNode> transformCache;
private static volatile boolean initialized = false;

private static final int DEFAULT_MAX_ENTRIES = 100;
private static final int DEFAULT_TTL_MINUTES = 300;

private CustomCacheManager() {
// Utility class
}

/**
* Lazily initializes the caches if not already initialized.
* Uses double-checked locking for thread safety.
*/
private static void ensureInitialized() {
if (!initialized) {
synchronized (CustomCacheManager.class) {
if (!initialized) {
initializeCaches();
initialized = true;
}
}
}
}

private static void initializeCaches() {
DrillConfig config = DrillConfig.create();

int queryMaxEntries = getConfigInt(config, "planner.query.cache.max_entries_amount", DEFAULT_MAX_ENTRIES);
int queryTtlMinutes = getConfigInt(config, "planner.query.cache.plan_cache_ttl_minutes", DEFAULT_TTL_MINUTES);
int transformMaxEntries = getConfigInt(config, "planner.transform.cache.max_entries_amount", DEFAULT_MAX_ENTRIES);
int transformTtlMinutes = getConfigInt(config, "planner.transform.cache.plan_cache_ttl_minutes", DEFAULT_TTL_MINUTES);

queryCache = Caffeine.newBuilder()
.maximumSize(queryMaxEntries)
.expireAfterWrite(queryTtlMinutes, TimeUnit.MINUTES)
.recordStats()
.build();

transformCache = Caffeine.newBuilder()
.maximumSize(transformMaxEntries)
.expireAfterWrite(transformTtlMinutes, TimeUnit.MINUTES)
.recordStats()
.build();

logger.debug("Query plan cache initialized with maxEntries={}, ttlMinutes={}", queryMaxEntries, queryTtlMinutes);
logger.debug("Transform cache initialized with maxEntries={}, ttlMinutes={}", transformMaxEntries, transformTtlMinutes);
}

private static int getConfigInt(DrillConfig config, String path, int defaultValue) {
if (config.hasPath(path)) {
int value = config.getInt(path);
logger.debug("Config {}: {}", path, value);
return value;
}
logger.debug("Config {} not found, using default: {}", path, defaultValue);
return defaultValue;
}

public static PhysicalPlan getQueryPlan(String sql) {
ensureInitialized();
return queryCache.getIfPresent(sql);
}

public static void putQueryPlan(String sql, PhysicalPlan plan) {
ensureInitialized();
queryCache.put(sql, plan);
}

public static RelNode getTransformedPlan(CacheKey key) {
ensureInitialized();
return transformCache.getIfPresent(key);
}

public static void putTransformedPlan(CacheKey key, RelNode plan) {
ensureInitialized();
transformCache.put(key, plan);
}

public static void logCacheStats() {
ensureInitialized();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These stats would be really useful to expose in the UI. However, I'd suggest adding that in another PR. Let's get this merged first!

if (logger.isDebugEnabled()) {
logger.debug("Query Cache Stats: {}", queryCache.stats());
logger.debug("Query Cache Size: {}", queryCache.estimatedSize());
logger.debug("Transform Cache Stats: {}", transformCache.stats());
logger.debug("Transform Cache Size: {}", transformCache.estimatedSize());
}
}

/**
* Clears both caches. Useful for testing.
*/
public static void clearCaches() {
if (initialized) {
queryCache.invalidateAll();
transformCache.invalidateAll();
}
}

/**
* Resets the cache manager, forcing reinitialization on next use.
* Useful for testing with different configurations.
*/
public static synchronized void reset() {
if (initialized) {
queryCache.invalidateAll();
transformCache.invalidateAll();
queryCache = null;
transformCache = null;
initialized = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,20 @@ public class PlannerSettings implements Context{
public static final String UNIONALL_DISTRIBUTE_KEY = "planner.enable_unionall_distribute";
public static final BooleanValidator UNIONALL_DISTRIBUTE = new BooleanValidator(UNIONALL_DISTRIBUTE_KEY, null);

public static final BooleanValidator PLAN_CACHE = new BooleanValidator("planner.cache.enable",
new OptionDescription("Enables caching of generated query plans in memory, so repeated queries can bypass the planning phase and execute faster.")
);

// Only settable in config, due to pub-sub requirements for recreating the cache on value change
// public static final RangeLongValidator PLAN_CACHE_TTL = new RangeLongValidator("planner.cache.ttl_minutes",
// 0, Long.MAX_VALUE,
// new OptionDescription("Time-to-live for cached query plans in minutes. Plans older than this are evicted. Default is 0 (disabled)")
// );
// public static final RangeLongValidator MAX_CACHE_ENTRIES = new RangeLongValidator("planner.cache.max_entries",
// 1, Long.MAX_VALUE,
// new OptionDescription("Maximum total number of entries for cached query plans. When exceeded, least recently used plans are evicted.")
// );

// ------------------------------------------- Index planning related options BEGIN --------------------------------------------------------------
public static final String USE_SIMPLE_OPTIMIZER_KEY = "planner.use_simple_optimizer";
public static final BooleanValidator USE_SIMPLE_OPTIMIZER = new BooleanValidator(USE_SIMPLE_OPTIMIZER_KEY,
Expand Down Expand Up @@ -416,6 +430,10 @@ public boolean isUnionAllDistributeEnabled() {
return options.getOption(UNIONALL_DISTRIBUTE);
}

public boolean isPlanCacheEnabled() {
return options.getOption(PLAN_CACHE);
}

public boolean isParquetRowGroupFilterPushdownPlanningEnabled() {
return options.getOption(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@
import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.CustomCacheManager;
import org.apache.drill.exec.exception.MetadataException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.ops.QueryContext.SqlStatementType;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
Expand All @@ -52,7 +55,6 @@
import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable;
import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption;
import org.apache.drill.exec.planner.sql.parser.SqlSchema;
import org.apache.drill.exec.planner.sql.conversion.SqlConverter;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
Expand Down Expand Up @@ -210,6 +212,7 @@ private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Po

/**
* Converts sql query string into query physical plan.
* If plan caching is enabled, attempts to retrieve a cached plan first.
*
* @param context query context
* @param sql sql query
Expand All @@ -219,6 +222,17 @@ private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Po
private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer<String> textPlan)
throws ForemanSetupException, RelConversionException, IOException, ValidationException {

final boolean planCacheEnabled = context.getOptions().getOption(PlannerSettings.PLAN_CACHE);

// Check cache first if enabled
if (planCacheEnabled) {
PhysicalPlan cachedPlan = CustomCacheManager.getQueryPlan(sql);
if (cachedPlan != null) {
logger.debug("Using cached query plan for SQL: {}", sql.substring(0, Math.min(sql.length(), 100)));
return cachedPlan;
}
}

final SqlConverter parser = new SqlConverter(context);
injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class);
final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql);
Expand Down Expand Up @@ -295,7 +309,25 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point
context.getOptions().setLocalOption(ExecConstants.RETURN_RESULT_SET_FOR_DDL, true);
}

return handler.getPlan(sqlNode);
PhysicalPlan physicalPlan = handler.getPlan(sqlNode);

// Cache the plan if caching is enabled and this is a cacheable query type
if (planCacheEnabled && isCacheableQuery(sqlNode)) {
CustomCacheManager.putQueryPlan(sql, physicalPlan);
logger.debug("Cached query plan for SQL: {}", sql.substring(0, Math.min(sql.length(), 100)));
}

return physicalPlan;
}

/**
* Determines if a query type should be cached.
* Only SELECT queries are cached; DDL and other modifying statements should not be cached.
*/
private static boolean isCacheableQuery(SqlNode sqlNode) {
SqlKind kind = sqlNode.getKind();
// Only cache SELECT queries, not DDL or DML
return kind == SqlKind.SELECT || kind == SqlKind.ORDER_BY || kind == SqlKind.UNION;
}

private static boolean isAutoLimitShouldBeApplied(SqlNode sqlNode, int queryMaxRows) {
Expand Down
Loading
Loading