From 2347c16ebd97a84c69cce24c3d1a82bb56651ee3 Mon Sep 17 00:00:00 2001 From: alisihab Date: Mon, 30 Jun 2025 11:29:21 +0200 Subject: [PATCH 01/10] introduce caching plain sql and plan --- exec/java-exec/pom.xml | 6 ++ .../drill/exec/cache/CustomCacheManager.java | 79 +++++++++++++++++++ .../exec/planner/sql/DrillSqlWorker.java | 47 ++++++++++- .../sql/handlers/DefaultSqlHandler.java | 78 +++++++++++++++--- .../drill/exec/work/foreman/Foreman.java | 62 ++++++++++----- 5 files changed, 241 insertions(+), 31 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index fe2c229a9a0..6b1dd64d304 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -697,6 +697,12 @@ swagger-jaxrs2-servlet-initializer-v2-jakarta ${swagger.version} + + + com.github.ben-manes.caffeine + caffeine + 2.9.3 + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java new file mode 100644 index 00000000000..0e219ca90d7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java @@ -0,0 +1,79 @@ +package org.apache.drill.exec.cache; + +import java.util.concurrent.TimeUnit; + +import org.apache.calcite.rel.RelNode; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler.CacheKey; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +public class CustomCacheManager { + private static final Logger logger = LoggerFactory.getLogger(CustomCacheManager.class); + + private static Cache queryCache; + private static Cache transformCache; + + private static int queryMaxEntries; + private static int queryTtlMinutes; + private static int transformMaxEntries; + private static int transformTtlMinutes; + + static { + loadConfig(); + } + + private static void loadConfig() { + DrillConfig config = DrillConfig.create(); + + queryMaxEntries = getConfigInt(config, "custom.cache.query.max_entries", 100); + queryTtlMinutes = getConfigInt(config, "custom.cache.query.ttl_minutes", 300); + transformMaxEntries = getConfigInt(config, "custom.cache.transform.max_entries", 100); + transformTtlMinutes = getConfigInt(config, "custom.cache.transform.ttl_minutes", 300); + + queryCache = Caffeine.newBuilder() + .maximumSize(queryMaxEntries) + .expireAfterWrite(queryTtlMinutes, TimeUnit.MINUTES) + .recordStats() + .build(); + + transformCache = Caffeine.newBuilder() + .maximumSize(transformMaxEntries) + .expireAfterWrite(transformTtlMinutes, TimeUnit.MINUTES) + .recordStats() + .build(); + } + + private static int getConfigInt(DrillConfig config, String path, int defaultValue) { + return config.hasPath(path) ? config.getInt(path) : defaultValue; + } + + public static PhysicalPlan getQueryPlan(String sql) { + return queryCache.getIfPresent(sql); + } + + public static void putQueryPlan(String sql, PhysicalPlan plan) { + queryCache.put(sql, plan); + } + + public static RelNode getTransformedPlan(CacheKey key) { + return transformCache.getIfPresent(key); + } + + public static void putTransformedPlan(CacheKey key, RelNode plan) { + transformCache.put(key, plan); + } + + public static void logCacheStats() { + logger.info("Query Cache Stats: " + queryCache.stats()); + logger.info("Query Cache Size: " + queryCache.estimatedSize()); + + logger.info("Transform Cache Stats: " + transformCache.stats()); + logger.info("Transform Cache Size: " + transformCache.estimatedSize()); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index c706f8f3733..b1d1097a6d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.planner.sql; import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.apache.calcite.sql.SqlDescribeSchema; import org.apache.calcite.sql.SqlKind; @@ -29,12 +32,14 @@ import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; +import org.apache.calcite.util.Litmus; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.MetadataException; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.QueryContext.SqlStatementType; import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.sql.conversion.SqlConverter; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler; import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; @@ -52,7 +57,6 @@ import org.apache.drill.exec.planner.sql.parser.DrillSqlDescribeTable; import org.apache.drill.exec.planner.sql.parser.DrillSqlResetOption; import org.apache.drill.exec.planner.sql.parser.SqlSchema; -import org.apache.drill.exec.planner.sql.conversion.SqlConverter; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; @@ -128,6 +132,8 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe try { return getPhysicalPlan(context, sql, textPlan, retryAttempts); } catch (Exception e) { +logger.info("DrillSqlWorker.convertPlan() retrying???: attempt # {}", retryAttempts); +e.printStackTrace(System.out); logger.trace("There was an error during conversion into physical plan.", e); // It is prohibited to retry query planning for ANALYZE statement since it changes @@ -176,9 +182,11 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Pointer textPlan, long retryAttempts) throws ForemanSetupException, RelConversionException, IOException, ValidationException { try { + logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", retryAttempts); return getQueryPlan(context, sql, textPlan); } catch (Exception e) { Throwable rootCause = Throwables.getRootCause(e); + logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", rootCause.getMessage()); // Calcite wraps exceptions thrown during planning, so checks whether original exception is OutdatedMetadataException if (rootCause instanceof MetadataException) { // resets SqlStatementType to avoid errors when it is set during further attempts @@ -216,12 +224,21 @@ private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Po * @param textPlan text plan * @return query physical plan */ + + private static ConcurrentMap getQueryPlanCache = new ConcurrentHashMap<>(); + private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer textPlan) throws ForemanSetupException, RelConversionException, IOException, ValidationException { final SqlConverter parser = new SqlConverter(context); injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class); final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql); + QueryPlanCacheKey queryPlanCacheKey = new QueryPlanCacheKey(sqlNode); + + if(getQueryPlanCache.containsKey(queryPlanCacheKey)) { + logger.info("Using getQueryPlanCache"); + return getQueryPlanCache.get(queryPlanCacheKey); + } final AbstractSqlHandler handler; final SqlHandlerConfig config = new SqlHandlerConfig(context, parser); @@ -286,6 +303,8 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point handler = new DefaultSqlHandler(config, textPlan); context.setSQLStatementType(SqlStatementType.OTHER); } + + // Determines whether result set should be returned for the query based on return result set option and sql node kind. // Overrides the option on a query level if it differs from the current value. @@ -295,7 +314,31 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point context.getOptions().setLocalOption(ExecConstants.RETURN_RESULT_SET_FOR_DDL, true); } - return handler.getPlan(sqlNode); + PhysicalPlan physicalPlan = handler.getPlan(sqlNode); + getQueryPlanCache.put(queryPlanCacheKey, physicalPlan); + return physicalPlan; + } + + private static class QueryPlanCacheKey { + private final SqlNode sqlNode; + + public QueryPlanCacheKey(SqlNode sqlNode) { + this.sqlNode = sqlNode; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + QueryPlanCacheKey cacheKey = (QueryPlanCacheKey) o; + return sqlNode.equalsDeep(cacheKey.sqlNode, Litmus.IGNORE); + } + + @Override + public int hashCode() { + return Objects.hash(sqlNode); + } + } private static boolean isAutoLimitShouldBeApplied(SqlNode sqlNode, int queryMaxRows) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 6cc4d3bc4bc..bf07e883d5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -20,15 +20,12 @@ import java.io.IOException; import java.util.Collection; import java.util.List; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import com.fasterxml.jackson.databind.ser.PropertyFilter; -import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; -import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; -import org.apache.drill.exec.util.Utilities; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; import org.apache.calcite.plan.RelOptCostImpl; import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelOptRule; @@ -66,6 +63,7 @@ import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder; import org.apache.drill.common.logical.PlanProperties.PlanType; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.cache.CustomCacheManager; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; @@ -104,13 +102,20 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.StoragePlugin; import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.drill.exec.work.foreman.SqlUnsupportedException; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; + +import com.fasterxml.jackson.databind.ser.PropertyFilter; +import com.fasterxml.jackson.databind.ser.impl.SimpleBeanPropertyFilter; +import com.fasterxml.jackson.databind.ser.impl.SimpleFilterProvider; public class DefaultSqlHandler extends AbstractSqlHandler { private static final Logger logger = LoggerFactory.getLogger(DefaultSqlHandler.class); @@ -250,7 +255,6 @@ protected DrillRel convertToRawDrel(final RelNode relNode) throws SqlUnsupported // hep is enabled and hep pruning is enabled. intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode); - } else { // Only hep is enabled final RelNode intermediateNode = @@ -361,16 +365,64 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode * @param log Whether to log the planning phase. * @return The transformed relnode. */ + + + // A simple cache key class that uses the relevant parameters + public static class CacheKey { + private final PlannerPhase phase; + private PlannerType plannerType; + private RelNode input; + private RelTraitSet targetTraits; + + public CacheKey(PlannerType plannerType, PlannerPhase phase, RelNode input, RelTraitSet targetTraits) { + this.plannerType = plannerType; + this.phase = phase; + this.input = input; + this.targetTraits = targetTraits; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CacheKey cacheKey = (CacheKey) o; + logger.info("Compare phase {} {} ,{} ", phase.equals(cacheKey.phase), phase.name(), cacheKey.phase.name()); + logger.info("Compare plannerType {} {} {}", plannerType.equals(cacheKey.plannerType), plannerType.name() , cacheKey.plannerType.name()); + logger.info("Compare input {}", input.deepEquals(cacheKey.input)); + return phase.name().equals(cacheKey.phase.name()) && + plannerType.name().equals(cacheKey.plannerType.name()) && + input.deepEquals(cacheKey.input) && + targetTraits.equals(cacheKey.targetTraits); + } + + @Override + public int hashCode() { + return Objects.hash(phase.name(), plannerType.name(), input.deepHashCode(), targetTraits); + } + + } + + protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode input, RelTraitSet targetTraits, boolean log) { final Stopwatch watch = Stopwatch.createStarted(); final RuleSet rules = config.getRules(phase, input); final RelTraitSet toTraits = targetTraits.simplify(); + + // Create a cache key based on the input parameters + CacheKey key = new CacheKey(plannerType, phase, input, targetTraits); + RelNode cachedResult = CustomCacheManager.getTransformedPlan(key); + if (cachedResult != null) { + CustomCacheManager.logCacheStats(); + return cachedResult; + } + final RelNode output; switch (plannerType) { case HEP_BOTTOM_UP: case HEP: { + logger.info("DefaultSqlHandler.transform()"); final HepProgramBuilder hepPgmBldr = new HepProgramBuilder(); if (plannerType == PlannerType.HEP_BOTTOM_UP) { hepPgmBldr.addMatchOrder(HepMatchOrder.BOTTOM_UP); @@ -402,13 +454,19 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode Preconditions.checkArgument(planner instanceof VolcanoPlanner, "Cluster is expected to be constructed using VolcanoPlanner. Was actually of type %s.", planner.getClass() .getName()); + logger.info("DefaultSqlHandler.transform() program.run( before"); output = program.run(planner, input, toTraits, ImmutableList.of(), ImmutableList.of()); + logger.info("DefaultSqlHandler.transform() program.run( after"); break; } } + // Store the result in the cache before returning + CustomCacheManager.putTransformedPlan(key, output); + CustomCacheManager.logCacheStats(); + if (log) { log(plannerType, phase, output, logger, watch); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index a099b96b123..2f6bfebabca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -17,19 +17,19 @@ */ package org.apache.drill.exec.work.foreman; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.drill.common.util.JacksonUtils; -import org.apache.drill.exec.work.filter.RuntimeFilterRouter; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.protobuf.InvalidProtocolBufferException; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; +import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM; + +import java.io.IOException; +import java.util.Date; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; +import org.apache.drill.common.util.JacksonUtils; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.cache.CustomCacheManager; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.QueryContext; @@ -62,17 +62,20 @@ import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; +import org.apache.drill.exec.work.filter.RuntimeFilterRouter; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Date; -import java.util.List; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.protobuf.InvalidProtocolBufferException; -import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; /** * Foreman manages all the fragments (local and remote) for a single query where this @@ -269,9 +272,11 @@ public void run() { final String sql = queryRequest.getPlan(); // log query id, username and query text before starting any real work. Also, put // them together such that it is easy to search based on query id + long start = new Date().getTime(); logger.info("Query text for query with id {} issued by {}: {}", queryIdString, queryContext.getQueryUserName(), sql); runSQL(sql); + logger.info("RunSQL is executed within {}", new Date().getTime() - start); break; case EXECUTION: runFragment(queryRequest.getFragmentsList()); @@ -410,7 +415,9 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep } private void runPhysicalPlan(final PhysicalPlan plan, Pointer textPlan) throws ExecutionSetupException { - validatePlan(plan); + logger.info("validatePlan(plan); before"); + validatePlan(plan); + logger.info("validatePlan(plan); after"); queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); @@ -481,6 +488,7 @@ private void runFragment(List fragmentsList) throws ExecutionSetup * Moves query to RUNNING state. */ private void startQueryProcessing() { + logger.info("Starting query processing"); enqueue(); runFragments(); queryStateProcessor.moveToState(QueryState.RUNNING, null); @@ -589,11 +597,27 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) { queryId, queryWorkUnit.stringifyFragments())); } - private void runSQL(final String sql) throws ExecutionSetupException { - final Pointer textPlan = new Pointer<>(); - final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); - runPhysicalPlan(plan, textPlan); - } + private void runSQL(final String sql) throws ExecutionSetupException { + final Pointer textPlan = new Pointer<>(); + logger.info("DrillSqlWorker.getPlan( before"); + + PhysicalPlan plan = CustomCacheManager.getQueryPlan(sql); + + if (plan == null) { + logger.info("Cache miss, generating new plan"); + plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); + } else { + logger.info("Using cached plan"); + } + + if(sql.trim().startsWith("SELECT")) { + CustomCacheManager.putQueryPlan(sql, plan); + CustomCacheManager.logCacheStats(); + } + + logger.info("DrillSqlWorker.getPlan( after"); + runPhysicalPlan(plan, textPlan); + } private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException { if (logger.isDebugEnabled()) { From 01a90724d23ab83627ec3e5d2e66318078be6fc7 Mon Sep 17 00:00:00 2001 From: vdegans Date: Mon, 18 Aug 2025 11:31:16 +0200 Subject: [PATCH 02/10] remove logs --- .../main/java/org/apache/drill/exec/work/foreman/Foreman.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 2f6bfebabca..65033a30f7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -415,9 +415,7 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep } private void runPhysicalPlan(final PhysicalPlan plan, Pointer textPlan) throws ExecutionSetupException { - logger.info("validatePlan(plan); before"); validatePlan(plan); - logger.info("validatePlan(plan); after"); queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); @@ -599,7 +597,6 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) { private void runSQL(final String sql) throws ExecutionSetupException { final Pointer textPlan = new Pointer<>(); - logger.info("DrillSqlWorker.getPlan( before"); PhysicalPlan plan = CustomCacheManager.getQueryPlan(sql); @@ -615,7 +612,6 @@ private void runSQL(final String sql) throws ExecutionSetupException { CustomCacheManager.logCacheStats(); } - logger.info("DrillSqlWorker.getPlan( after"); runPhysicalPlan(plan, textPlan); } From dcbab614f268f2e5b8bd6cc13fd4b540357b39c0 Mon Sep 17 00:00:00 2001 From: vdegans Date: Thu, 28 Aug 2025 08:38:27 +0200 Subject: [PATCH 03/10] fix imports --- .../sql/handlers/DefaultSqlHandler.java | 24 +++++++++---------- .../drill/exec/work/foreman/Foreman.java | 6 ++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index bf07e883d5a..24e3e747254 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -105,11 +105,11 @@ import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.drill.exec.work.foreman.SqlUnsupportedException; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Sets; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -365,7 +365,7 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode * @param log Whether to log the planning phase. * @return The transformed relnode. */ - + // A simple cache key class that uses the relevant parameters public static class CacheKey { @@ -389,12 +389,12 @@ public boolean equals(Object o) { logger.info("Compare phase {} {} ,{} ", phase.equals(cacheKey.phase), phase.name(), cacheKey.phase.name()); logger.info("Compare plannerType {} {} {}", plannerType.equals(cacheKey.plannerType), plannerType.name() , cacheKey.plannerType.name()); logger.info("Compare input {}", input.deepEquals(cacheKey.input)); - return phase.name().equals(cacheKey.phase.name()) && + return phase.name().equals(cacheKey.phase.name()) && plannerType.name().equals(cacheKey.plannerType.name()) && - input.deepEquals(cacheKey.input) && + input.deepEquals(cacheKey.input) && targetTraits.equals(cacheKey.targetTraits); } - + @Override public int hashCode() { return Objects.hash(phase.name(), plannerType.name(), input.deepHashCode(), targetTraits); @@ -402,13 +402,13 @@ public int hashCode() { } - + protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode input, RelTraitSet targetTraits, boolean log) { final Stopwatch watch = Stopwatch.createStarted(); final RuleSet rules = config.getRules(phase, input); final RelTraitSet toTraits = targetTraits.simplify(); - + // Create a cache key based on the input parameters CacheKey key = new CacheKey(plannerType, phase, input, targetTraits); @@ -417,7 +417,7 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode CustomCacheManager.logCacheStats(); return cachedResult; } - + final RelNode output; switch (plannerType) { case HEP_BOTTOM_UP: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 65033a30f7d..96706188699 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -66,8 +66,8 @@ import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -606,7 +606,7 @@ private void runSQL(final String sql) throws ExecutionSetupException { } else { logger.info("Using cached plan"); } - + if(sql.trim().startsWith("SELECT")) { CustomCacheManager.putQueryPlan(sql, plan); CustomCacheManager.logCacheStats(); From 8236f03b1c569fe3b4386f8d37061ee93aaa1ac6 Mon Sep 17 00:00:00 2001 From: vdegans Date: Thu, 28 Aug 2025 10:33:18 +0200 Subject: [PATCH 04/10] fix styling --- .../drill/exec/cache/CustomCacheManager.java | 127 +++++++++--------- .../exec/planner/sql/DrillSqlWorker.java | 35 ++--- .../sql/handlers/DefaultSqlHandler.java | 28 ++-- .../drill/exec/work/foreman/Foreman.java | 34 ++--- 4 files changed, 114 insertions(+), 110 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java index 0e219ca90d7..7996c71a135 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java @@ -13,67 +13,66 @@ import com.github.benmanes.caffeine.cache.Caffeine; public class CustomCacheManager { - private static final Logger logger = LoggerFactory.getLogger(CustomCacheManager.class); - - private static Cache queryCache; - private static Cache transformCache; - - private static int queryMaxEntries; - private static int queryTtlMinutes; - private static int transformMaxEntries; - private static int transformTtlMinutes; - - static { - loadConfig(); - } - - private static void loadConfig() { - DrillConfig config = DrillConfig.create(); - - queryMaxEntries = getConfigInt(config, "custom.cache.query.max_entries", 100); - queryTtlMinutes = getConfigInt(config, "custom.cache.query.ttl_minutes", 300); - transformMaxEntries = getConfigInt(config, "custom.cache.transform.max_entries", 100); - transformTtlMinutes = getConfigInt(config, "custom.cache.transform.ttl_minutes", 300); - - queryCache = Caffeine.newBuilder() - .maximumSize(queryMaxEntries) - .expireAfterWrite(queryTtlMinutes, TimeUnit.MINUTES) - .recordStats() - .build(); - - transformCache = Caffeine.newBuilder() - .maximumSize(transformMaxEntries) - .expireAfterWrite(transformTtlMinutes, TimeUnit.MINUTES) - .recordStats() - .build(); - } - - private static int getConfigInt(DrillConfig config, String path, int defaultValue) { - return config.hasPath(path) ? config.getInt(path) : defaultValue; - } - - public static PhysicalPlan getQueryPlan(String sql) { - return queryCache.getIfPresent(sql); - } - - public static void putQueryPlan(String sql, PhysicalPlan plan) { - queryCache.put(sql, plan); - } - - public static RelNode getTransformedPlan(CacheKey key) { - return transformCache.getIfPresent(key); - } - - public static void putTransformedPlan(CacheKey key, RelNode plan) { - transformCache.put(key, plan); - } - - public static void logCacheStats() { - logger.info("Query Cache Stats: " + queryCache.stats()); - logger.info("Query Cache Size: " + queryCache.estimatedSize()); - - logger.info("Transform Cache Stats: " + transformCache.stats()); - logger.info("Transform Cache Size: " + transformCache.estimatedSize()); - } - -} + private static final Logger logger = LoggerFactory.getLogger(CustomCacheManager.class); + + private static Cache queryCache; + private static Cache transformCache; + + private static int queryMaxEntries; + private static int queryTtlMinutes; + private static int transformMaxEntries; + private static int transformTtlMinutes; + + static { + loadConfig(); + } + + private static void loadConfig() { + DrillConfig config = DrillConfig.create(); + + queryMaxEntries = getConfigInt(config, "custom.cache.query.max_entries", 100); + queryTtlMinutes = getConfigInt(config, "custom.cache.query.ttl_minutes", 300); + transformMaxEntries = getConfigInt(config, "custom.cache.transform.max_entries", 100); + transformTtlMinutes = getConfigInt(config, "custom.cache.transform.ttl_minutes", 300); + + queryCache = Caffeine.newBuilder() + .maximumSize(queryMaxEntries) + .expireAfterWrite(queryTtlMinutes, TimeUnit.MINUTES) + .recordStats() + .build(); + + transformCache = Caffeine.newBuilder() + .maximumSize(transformMaxEntries) + .expireAfterWrite(transformTtlMinutes, TimeUnit.MINUTES) + .recordStats() + .build(); + } + + private static int getConfigInt(DrillConfig config, String path, int defaultValue) { + return config.hasPath(path) ? config.getInt(path) : defaultValue; + } + + public static PhysicalPlan getQueryPlan(String sql) { + return queryCache.getIfPresent(sql); + } + + public static void putQueryPlan(String sql, PhysicalPlan plan) { + queryCache.put(sql, plan); + } + + public static RelNode getTransformedPlan(CacheKey key) { + return transformCache.getIfPresent(key); + } + + public static void putTransformedPlan(CacheKey key, RelNode plan) { + transformCache.put(key, plan); + } + + public static void logCacheStats() { + logger.info("Query Cache Stats: " + queryCache.stats()); + logger.info("Query Cache Size: " + queryCache.estimatedSize()); + + logger.info("Transform Cache Stats: " + transformCache.stats()); + logger.info("Transform Cache Size: " + transformCache.estimatedSize()); + } +} \ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index b1d1097a6d2..30e157a1852 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -182,7 +182,7 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Pointer textPlan, long retryAttempts) throws ForemanSetupException, RelConversionException, IOException, ValidationException { try { - logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", retryAttempts); + logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", retryAttempts); return getQueryPlan(context, sql, textPlan); } catch (Exception e) { Throwable rootCause = Throwables.getRootCause(e); @@ -224,9 +224,9 @@ private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Po * @param textPlan text plan * @return query physical plan */ - + private static ConcurrentMap getQueryPlanCache = new ConcurrentHashMap<>(); - + private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer textPlan) throws ForemanSetupException, RelConversionException, IOException, ValidationException { @@ -234,10 +234,10 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class); final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql); QueryPlanCacheKey queryPlanCacheKey = new QueryPlanCacheKey(sqlNode); - + if(getQueryPlanCache.containsKey(queryPlanCacheKey)) { - logger.info("Using getQueryPlanCache"); - return getQueryPlanCache.get(queryPlanCacheKey); + logger.info("Using getQueryPlanCache"); + return getQueryPlanCache.get(queryPlanCacheKey); } final AbstractSqlHandler handler; final SqlHandlerConfig config = new SqlHandlerConfig(context, parser); @@ -303,8 +303,8 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point handler = new DefaultSqlHandler(config, textPlan); context.setSQLStatementType(SqlStatementType.OTHER); } - - + + // Determines whether result set should be returned for the query based on return result set option and sql node kind. // Overrides the option on a query level if it differs from the current value. @@ -318,7 +318,7 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point getQueryPlanCache.put(queryPlanCacheKey, physicalPlan); return physicalPlan; } - + private static class QueryPlanCacheKey { private final SqlNode sqlNode; @@ -328,17 +328,20 @@ public QueryPlanCacheKey(SqlNode sqlNode) { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } QueryPlanCacheKey cacheKey = (QueryPlanCacheKey) o; return sqlNode.equalsDeep(cacheKey.sqlNode, Litmus.IGNORE); } - - @Override - public int hashCode() { - return Objects.hash(sqlNode); - } + @Override + public int hashCode() { + return Objects.hash(sqlNode); + } } private static boolean isAutoLimitShouldBeApplied(SqlNode sqlNode, int queryMaxRows) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 24e3e747254..d1fb5d52280 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -21,8 +21,6 @@ import java.util.Collection; import java.util.List; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -383,22 +381,26 @@ public CacheKey(PlannerType plannerType, PlannerPhase phase, RelNode input, RelT @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } CacheKey cacheKey = (CacheKey) o; - logger.info("Compare phase {} {} ,{} ", phase.equals(cacheKey.phase), phase.name(), cacheKey.phase.name()); - logger.info("Compare plannerType {} {} {}", plannerType.equals(cacheKey.plannerType), plannerType.name() , cacheKey.plannerType.name()); + logger.info("Compare phase {} {}, {} ", phase.equals(cacheKey.phase), phase.name(), cacheKey.phase.name()); + logger.info("Compare plannerType {} {} {}", plannerType.equals(cacheKey.plannerType), plannerType.name(), cacheKey.plannerType.name()); logger.info("Compare input {}", input.deepEquals(cacheKey.input)); return phase.name().equals(cacheKey.phase.name()) && - plannerType.name().equals(cacheKey.plannerType.name()) && - input.deepEquals(cacheKey.input) && - targetTraits.equals(cacheKey.targetTraits); + plannerType.name().equals(cacheKey.plannerType.name()) && + input.deepEquals(cacheKey.input) && + targetTraits.equals(cacheKey.targetTraits); } @Override - public int hashCode() { - return Objects.hash(phase.name(), plannerType.name(), input.deepHashCode(), targetTraits); - } + public int hashCode() { + return Objects.hash(phase.name(), plannerType.name(), input.deepHashCode(), targetTraits); + } } @@ -422,7 +424,7 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode switch (plannerType) { case HEP_BOTTOM_UP: case HEP: { - logger.info("DefaultSqlHandler.transform()"); + logger.info("DefaultSqlHandler.transform()"); final HepProgramBuilder hepPgmBldr = new HepProgramBuilder(); if (plannerType == PlannerType.HEP_BOTTOM_UP) { hepPgmBldr.addMatchOrder(HepMatchOrder.BOTTOM_UP); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 96706188699..07af9936ebc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -415,7 +415,7 @@ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupExcep } private void runPhysicalPlan(final PhysicalPlan plan, Pointer textPlan) throws ExecutionSetupException { - validatePlan(plan); + validatePlan(plan); queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); @@ -486,7 +486,7 @@ private void runFragment(List fragmentsList) throws ExecutionSetup * Moves query to RUNNING state. */ private void startQueryProcessing() { - logger.info("Starting query processing"); + logger.info("Starting query processing"); enqueue(); runFragments(); queryStateProcessor.moveToState(QueryState.RUNNING, null); @@ -595,25 +595,25 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) { queryId, queryWorkUnit.stringifyFragments())); } - private void runSQL(final String sql) throws ExecutionSetupException { - final Pointer textPlan = new Pointer<>(); + private void runSQL(final String sql) throws ExecutionSetupException { + final Pointer textPlan = new Pointer<>(); - PhysicalPlan plan = CustomCacheManager.getQueryPlan(sql); + PhysicalPlan plan = CustomCacheManager.getQueryPlan(sql); - if (plan == null) { - logger.info("Cache miss, generating new plan"); - plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); - } else { - logger.info("Using cached plan"); - } + if (plan == null) { + logger.info("Cache miss, generating new plan"); + plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); + } else { + logger.info("Using cached plan"); + } - if(sql.trim().startsWith("SELECT")) { - CustomCacheManager.putQueryPlan(sql, plan); - CustomCacheManager.logCacheStats(); - } + if(sql.trim().startsWith("SELECT")) { + CustomCacheManager.putQueryPlan(sql, plan); + CustomCacheManager.logCacheStats(); + } - runPhysicalPlan(plan, textPlan); - } + runPhysicalPlan(plan, textPlan); + } private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException { if (logger.isDebugEnabled()) { From 35d6d45e792f5f53b1bc0126221061bc9942aba5 Mon Sep 17 00:00:00 2001 From: vdegans Date: Wed, 10 Sep 2025 17:04:09 +0200 Subject: [PATCH 05/10] Add cache settings --- .../drill/exec/cache/CustomCacheManager.java | 12 ++--- .../planner/physical/PlannerSettings.java | 18 +++++++ .../sql/handlers/DefaultSqlHandler.java | 25 +++++++--- .../server/options/SystemOptionManager.java | 4 ++ .../drill/exec/work/foreman/Foreman.java | 49 +++++++++++++++---- .../src/main/resources/drill-module.conf | 6 +++ 6 files changed, 91 insertions(+), 23 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java index 7996c71a135..2c039f3df93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java @@ -30,14 +30,14 @@ public class CustomCacheManager { private static void loadConfig() { DrillConfig config = DrillConfig.create(); - queryMaxEntries = getConfigInt(config, "custom.cache.query.max_entries", 100); - queryTtlMinutes = getConfigInt(config, "custom.cache.query.ttl_minutes", 300); - transformMaxEntries = getConfigInt(config, "custom.cache.transform.max_entries", 100); - transformTtlMinutes = getConfigInt(config, "custom.cache.transform.ttl_minutes", 300); + queryMaxEntries = getConfigInt(config, "planner.query.cache.max_entries_amount", 100); + queryTtlMinutes = getConfigInt(config, "planner.query.cache.plan_cache_ttl_minutes", 300); + transformMaxEntries = getConfigInt(config, "planner.transform.cache.max_entries_amount", 100); + transformTtlMinutes = getConfigInt(config, "planner.transform.plan_cache_ttl_minutes", 300); queryCache = Caffeine.newBuilder() .maximumSize(queryMaxEntries) - .expireAfterWrite(queryTtlMinutes, TimeUnit.MINUTES) + .expireAfterWrite(queryTtlMinutes, TimeUnit.MILLISECONDS) .recordStats() .build(); @@ -75,4 +75,4 @@ public static void logCacheStats() { logger.info("Transform Cache Stats: " + transformCache.stats()); logger.info("Transform Cache Size: " + transformCache.estimatedSize()); } -} \ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 6fa145a1b01..e8ab405c557 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -132,6 +132,20 @@ public class PlannerSettings implements Context{ public static final String UNIONALL_DISTRIBUTE_KEY = "planner.enable_unionall_distribute"; public static final BooleanValidator UNIONALL_DISTRIBUTE = new BooleanValidator(UNIONALL_DISTRIBUTE_KEY, null); + public static final BooleanValidator PLAN_CACHE = new BooleanValidator("planner.cache.enable", + new OptionDescription("Enables caching of generated query plans in memory, so repeated queries can bypass the planning phase and execute faster.") + ); + + // Only settable in config, due to pub-sub requirements for recreating the cache on value change + // public static final RangeLongValidator PLAN_CACHE_TTL = new RangeLongValidator("planner.cache.ttl_minutes", + // 0, Long.MAX_VALUE, + // new OptionDescription("Time-to-live for cached query plans in minutes. Plans older than this are evicted. Default is 0 (disabled)") + // ); + // public static final RangeLongValidator MAX_CACHE_ENTRIES = new RangeLongValidator("planner.cache.max_entries", + // 1, Long.MAX_VALUE, + // new OptionDescription("Maximum total number of entries for cached query plans. When exceeded, least recently used plans are evicted.") + // ); + // ------------------------------------------- Index planning related options BEGIN -------------------------------------------------------------- public static final String USE_SIMPLE_OPTIMIZER_KEY = "planner.use_simple_optimizer"; public static final BooleanValidator USE_SIMPLE_OPTIMIZER = new BooleanValidator(USE_SIMPLE_OPTIMIZER_KEY, @@ -416,6 +430,10 @@ public boolean isUnionAllDistributeEnabled() { return options.getOption(UNIONALL_DISTRIBUTE); } + public boolean isPlanCacheEnabled() { + return options.getOption(PLAN_CACHE); + } + public boolean isParquetRowGroupFilterPushdownPlanningEnabled() { return options.getOption(PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index d1fb5d52280..428102b6a34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -410,14 +410,20 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode final Stopwatch watch = Stopwatch.createStarted(); final RuleSet rules = config.getRules(phase, input); final RelTraitSet toTraits = targetTraits.simplify(); + final OptionManager options = context.getOptions(); + final boolean planCacheEnabled = options.getOption(PlannerSettings.PLAN_CACHE); - // Create a cache key based on the input parameters - CacheKey key = new CacheKey(plannerType, phase, input, targetTraits); + CacheKey key = null; - RelNode cachedResult = CustomCacheManager.getTransformedPlan(key); - if (cachedResult != null) { + if (planCacheEnabled) { + // Create a cache key based on the input parameters + key = new CacheKey(plannerType, phase, input, targetTraits); + + RelNode cachedResult = CustomCacheManager.getTransformedPlan(key); + if (cachedResult != null) { CustomCacheManager.logCacheStats(); return cachedResult; + } } final RelNode output; @@ -465,9 +471,14 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode } } - // Store the result in the cache before returning - CustomCacheManager.putTransformedPlan(key, output); - CustomCacheManager.logCacheStats(); + if (planCacheEnabled) { + logger.info("planCache enabled, storing transformedplan"); + // Store the result in the cache before returning + CustomCacheManager.putTransformedPlan(key, output); + CustomCacheManager.logCacheStats(); + } else { + logger.info("planCache disabled, not storing transformedplan"); + } if (log) { log(plannerType, phase, output, logger, watch); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 3cee4096e09..11ffe93ecd3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -82,6 +82,10 @@ public static CaseInsensitiveMap createDefaultOptionDefinition // here. @SuppressWarnings("deprecation") final OptionDefinition[] definitions = new OptionDefinition[]{ + new OptionDefinition(PlannerSettings.PLAN_CACHE), + // new OptionDefinition(PlannerSettings.PLAN_CACHE_TTL), + // new OptionDefinition(PlannerSettings.MAX_CACHE_ENTRIES), + new OptionDefinition(PlannerSettings.CONSTANT_FOLDING), new OptionDefinition(PlannerSettings.EXCHANGE), new OptionDefinition(PlannerSettings.HASHAGG), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 07af9936ebc..0e5e0ba89b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -56,6 +57,7 @@ import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.FailureUtils; +import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; @@ -595,23 +597,50 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) { queryId, queryWorkUnit.stringifyFragments())); } - private void runSQL(final String sql) throws ExecutionSetupException { - final Pointer textPlan = new Pointer<>(); + private PhysicalPlan getOrBuildPlan(String sql, Pointer textPlan, boolean planCacheEnabled) + throws ExecutionSetupException { - PhysicalPlan plan = CustomCacheManager.getQueryPlan(sql); + PhysicalPlan plan = null; + + if (planCacheEnabled) { + logger.info("Cache enabled, checking entries"); + plan = CustomCacheManager.getQueryPlan(sql); + + if (plan == null) { + logger.info("Cache miss, generating new plan"); + plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); + + if (isCacheableQuery(sql)) { + CustomCacheManager.putQueryPlan(sql, plan); + CustomCacheManager.logCacheStats(); + } + } else { + logger.info("Using cached plan"); + } - if (plan == null) { - logger.info("Cache miss, generating new plan"); - plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); } else { - logger.info("Using cached plan"); + plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); } - if(sql.trim().startsWith("SELECT")) { - CustomCacheManager.putQueryPlan(sql, plan); - CustomCacheManager.logCacheStats(); + return plan; + } + + private boolean isCacheableQuery(String sql) { + return sql.trim().toUpperCase().startsWith("SELECT"); + } + + private void runSQL(final String sql) throws ExecutionSetupException { + final Pointer textPlan = new Pointer<>(); + final OptionManager options = queryContext.getOptions(); + final boolean planCacheEnabled = options.getOption(PlannerSettings.PLAN_CACHE); + if (planCacheEnabled) { + logger.info("PlanCache is enabled"); + } else { + logger.info("PlanCache is disabled"); } + PhysicalPlan plan = getOrBuildPlan(sql, textPlan, planCacheEnabled); + runPhysicalPlan(plan, textPlan); } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 7541a99e2dd..55757039682 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -697,6 +697,12 @@ drill.exec.options: { planner.width.max_per_node: 0, planner.width.max_per_query: 1000, + planner.cache.enable: true, + planner.query.cache.ttl_minutes: 0, + planner.transform.cache.ttl_minutes: 0, + planner.query.cache.max_entries_amount: 100, + planner.transform.cache.max_entries_amount: 100, + prepare.statement.create_timeout_ms: 30000, security.admin.user_groups: "%drill_process_user_groups%", security.admin.users: "%drill_process_user%", From 30c6624ce9f8f8949000b4bfbb8912fbd27ba9ad Mon Sep 17 00:00:00 2001 From: vdegans Date: Thu, 11 Sep 2025 14:13:13 +0200 Subject: [PATCH 06/10] fix property name --- .../drill/exec/cache/CustomCacheManager.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java index 2c039f3df93..028f2db0a8b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java @@ -33,11 +33,11 @@ private static void loadConfig() { queryMaxEntries = getConfigInt(config, "planner.query.cache.max_entries_amount", 100); queryTtlMinutes = getConfigInt(config, "planner.query.cache.plan_cache_ttl_minutes", 300); transformMaxEntries = getConfigInt(config, "planner.transform.cache.max_entries_amount", 100); - transformTtlMinutes = getConfigInt(config, "planner.transform.plan_cache_ttl_minutes", 300); + transformTtlMinutes = getConfigInt(config, "planner.transform.cache.plan_cache_ttl_minutes", 300); queryCache = Caffeine.newBuilder() .maximumSize(queryMaxEntries) - .expireAfterWrite(queryTtlMinutes, TimeUnit.MILLISECONDS) + .expireAfterWrite(queryTtlMinutes, TimeUnit.MINUTES) .recordStats() .build(); @@ -49,7 +49,15 @@ private static void loadConfig() { } private static int getConfigInt(DrillConfig config, String path, int defaultValue) { - return config.hasPath(path) ? config.getInt(path) : defaultValue; + logger.info("Fetching: " + path); + Boolean pathFound = config.hasPath(path); + int value = pathFound ? config.getInt(path) : defaultValue; + if (!pathFound) { + logger.info("Using default value: " + defaultValue); + } else { + logger.info("Using found value: " + value); + } + return value; } public static PhysicalPlan getQueryPlan(String sql) { From 11c0aa1a57238eed19c0d46eed2fe9c45a83aaa1 Mon Sep 17 00:00:00 2001 From: vdegans Date: Thu, 9 Oct 2025 13:44:45 +0200 Subject: [PATCH 07/10] add apache licence --- .../drill/exec/cache/CustomCacheManager.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java index 028f2db0a8b..c82cba092ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.drill.exec.cache; import java.util.concurrent.TimeUnit; From 08d621a8b89dc35fc54f6ce44e81ad9f7c1bbb60 Mon Sep 17 00:00:00 2001 From: vdegans Date: Thu, 9 Oct 2025 15:42:07 +0200 Subject: [PATCH 08/10] exclude caffeine to reduce jar size --- exec/jdbc-all/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml index 161fa59126d..ddd61836467 100644 --- a/exec/jdbc-all/pom.xml +++ b/exec/jdbc-all/pom.xml @@ -436,6 +436,7 @@ antlr:* com.beust:* com.dropbox.* + com.github.ben-manes.caffeine:caffeine com.github.stefanbirkner com.google.code.findbugs:jsr305:* com.googlecode.json-simple:* From 59223a59534aa4b8fd1e74b76e9dbd85819b535f Mon Sep 17 00:00:00 2001 From: vdegans Date: Thu, 6 Nov 2025 15:41:24 +0100 Subject: [PATCH 09/10] disable plan cache by default --- exec/java-exec/src/main/resources/drill-module.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 55757039682..836c09d525e 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -697,7 +697,7 @@ drill.exec.options: { planner.width.max_per_node: 0, planner.width.max_per_query: 1000, - planner.cache.enable: true, + planner.cache.enable: false, planner.query.cache.ttl_minutes: 0, planner.transform.cache.ttl_minutes: 0, planner.query.cache.max_entries_amount: 100, From c70a84f5bc22a64e4271feae2f96c6f4e68afdee Mon Sep 17 00:00:00 2001 From: cgivre Date: Wed, 4 Feb 2026 10:39:00 -0500 Subject: [PATCH 10/10] Refactor query plan caching: fix static initializer and consolidate logic - Fix CustomCacheManager static initializer that crashed tests during class load by using lazy initialization with double-checked locking - Consolidate caching logic: remove duplicate ConcurrentHashMap cache from DrillSqlWorker and redundant caching in Foreman, use CustomCacheManager only - Clean up verbose debug logging (remove logger.info calls, printStackTrace) - Add clearCaches() and reset() methods for testing support - Only cache SELECT queries (not DDL/DML statements) --- .../drill/exec/cache/CustomCacheManager.java | 99 ++++++++++++++----- .../exec/planner/sql/DrillSqlWorker.java | 72 ++++++-------- .../sql/handlers/DefaultSqlHandler.java | 19 +--- .../drill/exec/work/foreman/Foreman.java | 46 +-------- 4 files changed, 108 insertions(+), 128 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java index c82cba092ea..712e53f3a76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/CustomCacheManager.java @@ -29,28 +29,46 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +/** + * Manages caches for query plans and transform results to improve query planning performance. + * Uses Caffeine cache with configurable TTL and maximum size. + */ public class CustomCacheManager { private static final Logger logger = LoggerFactory.getLogger(CustomCacheManager.class); - private static Cache queryCache; - private static Cache transformCache; + private static volatile Cache queryCache; + private static volatile Cache transformCache; + private static volatile boolean initialized = false; + + private static final int DEFAULT_MAX_ENTRIES = 100; + private static final int DEFAULT_TTL_MINUTES = 300; - private static int queryMaxEntries; - private static int queryTtlMinutes; - private static int transformMaxEntries; - private static int transformTtlMinutes; + private CustomCacheManager() { + // Utility class + } - static { - loadConfig(); + /** + * Lazily initializes the caches if not already initialized. + * Uses double-checked locking for thread safety. + */ + private static void ensureInitialized() { + if (!initialized) { + synchronized (CustomCacheManager.class) { + if (!initialized) { + initializeCaches(); + initialized = true; + } + } + } } - private static void loadConfig() { + private static void initializeCaches() { DrillConfig config = DrillConfig.create(); - queryMaxEntries = getConfigInt(config, "planner.query.cache.max_entries_amount", 100); - queryTtlMinutes = getConfigInt(config, "planner.query.cache.plan_cache_ttl_minutes", 300); - transformMaxEntries = getConfigInt(config, "planner.transform.cache.max_entries_amount", 100); - transformTtlMinutes = getConfigInt(config, "planner.transform.cache.plan_cache_ttl_minutes", 300); + int queryMaxEntries = getConfigInt(config, "planner.query.cache.max_entries_amount", DEFAULT_MAX_ENTRIES); + int queryTtlMinutes = getConfigInt(config, "planner.query.cache.plan_cache_ttl_minutes", DEFAULT_TTL_MINUTES); + int transformMaxEntries = getConfigInt(config, "planner.transform.cache.max_entries_amount", DEFAULT_MAX_ENTRIES); + int transformTtlMinutes = getConfigInt(config, "planner.transform.cache.plan_cache_ttl_minutes", DEFAULT_TTL_MINUTES); queryCache = Caffeine.newBuilder() .maximumSize(queryMaxEntries) @@ -63,41 +81,72 @@ private static void loadConfig() { .expireAfterWrite(transformTtlMinutes, TimeUnit.MINUTES) .recordStats() .build(); + + logger.debug("Query plan cache initialized with maxEntries={}, ttlMinutes={}", queryMaxEntries, queryTtlMinutes); + logger.debug("Transform cache initialized with maxEntries={}, ttlMinutes={}", transformMaxEntries, transformTtlMinutes); } private static int getConfigInt(DrillConfig config, String path, int defaultValue) { - logger.info("Fetching: " + path); - Boolean pathFound = config.hasPath(path); - int value = pathFound ? config.getInt(path) : defaultValue; - if (!pathFound) { - logger.info("Using default value: " + defaultValue); - } else { - logger.info("Using found value: " + value); + if (config.hasPath(path)) { + int value = config.getInt(path); + logger.debug("Config {}: {}", path, value); + return value; } - return value; + logger.debug("Config {} not found, using default: {}", path, defaultValue); + return defaultValue; } public static PhysicalPlan getQueryPlan(String sql) { + ensureInitialized(); return queryCache.getIfPresent(sql); } public static void putQueryPlan(String sql, PhysicalPlan plan) { + ensureInitialized(); queryCache.put(sql, plan); } public static RelNode getTransformedPlan(CacheKey key) { + ensureInitialized(); return transformCache.getIfPresent(key); } public static void putTransformedPlan(CacheKey key, RelNode plan) { + ensureInitialized(); transformCache.put(key, plan); } public static void logCacheStats() { - logger.info("Query Cache Stats: " + queryCache.stats()); - logger.info("Query Cache Size: " + queryCache.estimatedSize()); + ensureInitialized(); + if (logger.isDebugEnabled()) { + logger.debug("Query Cache Stats: {}", queryCache.stats()); + logger.debug("Query Cache Size: {}", queryCache.estimatedSize()); + logger.debug("Transform Cache Stats: {}", transformCache.stats()); + logger.debug("Transform Cache Size: {}", transformCache.estimatedSize()); + } + } - logger.info("Transform Cache Stats: " + transformCache.stats()); - logger.info("Transform Cache Size: " + transformCache.estimatedSize()); + /** + * Clears both caches. Useful for testing. + */ + public static void clearCaches() { + if (initialized) { + queryCache.invalidateAll(); + transformCache.invalidateAll(); + } + } + + /** + * Resets the cache manager, forcing reinitialization on next use. + * Useful for testing with different configurations. + */ + public static synchronized void reset() { + if (initialized) { + queryCache.invalidateAll(); + transformCache.invalidateAll(); + queryCache = null; + transformCache = null; + initialized = false; + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index 30e157a1852..103a5684ce6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -18,9 +18,6 @@ package org.apache.drill.exec.planner.sql; import java.io.IOException; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import org.apache.calcite.sql.SqlDescribeSchema; import org.apache.calcite.sql.SqlKind; @@ -32,13 +29,14 @@ import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; -import org.apache.calcite.util.Litmus; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.cache.CustomCacheManager; import org.apache.drill.exec.exception.MetadataException; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.QueryContext.SqlStatementType; import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.sql.conversion.SqlConverter; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler; @@ -132,8 +130,6 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe try { return getPhysicalPlan(context, sql, textPlan, retryAttempts); } catch (Exception e) { -logger.info("DrillSqlWorker.convertPlan() retrying???: attempt # {}", retryAttempts); -e.printStackTrace(System.out); logger.trace("There was an error during conversion into physical plan.", e); // It is prohibited to retry query planning for ANALYZE statement since it changes @@ -182,11 +178,9 @@ private static PhysicalPlan convertPlan(QueryContext context, String sql, Pointe private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Pointer textPlan, long retryAttempts) throws ForemanSetupException, RelConversionException, IOException, ValidationException { try { - logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", retryAttempts); return getQueryPlan(context, sql, textPlan); } catch (Exception e) { Throwable rootCause = Throwables.getRootCause(e); - logger.info("DrillSqlWorker.getPhysicalPlan() is called {}", rootCause.getMessage()); // Calcite wraps exceptions thrown during planning, so checks whether original exception is OutdatedMetadataException if (rootCause instanceof MetadataException) { // resets SqlStatementType to avoid errors when it is set during further attempts @@ -218,27 +212,30 @@ private static PhysicalPlan getPhysicalPlan(QueryContext context, String sql, Po /** * Converts sql query string into query physical plan. + * If plan caching is enabled, attempts to retrieve a cached plan first. * * @param context query context * @param sql sql query * @param textPlan text plan * @return query physical plan */ - - private static ConcurrentMap getQueryPlanCache = new ConcurrentHashMap<>(); - private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer textPlan) throws ForemanSetupException, RelConversionException, IOException, ValidationException { + final boolean planCacheEnabled = context.getOptions().getOption(PlannerSettings.PLAN_CACHE); + + // Check cache first if enabled + if (planCacheEnabled) { + PhysicalPlan cachedPlan = CustomCacheManager.getQueryPlan(sql); + if (cachedPlan != null) { + logger.debug("Using cached query plan for SQL: {}", sql.substring(0, Math.min(sql.length(), 100))); + return cachedPlan; + } + } + final SqlConverter parser = new SqlConverter(context); injector.injectChecked(context.getExecutionControls(), "sql-parsing", ForemanSetupException.class); final SqlNode sqlNode = checkAndApplyAutoLimit(parser, context, sql); - QueryPlanCacheKey queryPlanCacheKey = new QueryPlanCacheKey(sqlNode); - - if(getQueryPlanCache.containsKey(queryPlanCacheKey)) { - logger.info("Using getQueryPlanCache"); - return getQueryPlanCache.get(queryPlanCacheKey); - } final AbstractSqlHandler handler; final SqlHandlerConfig config = new SqlHandlerConfig(context, parser); @@ -304,8 +301,6 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point context.setSQLStatementType(SqlStatementType.OTHER); } - - // Determines whether result set should be returned for the query based on return result set option and sql node kind. // Overrides the option on a query level if it differs from the current value. boolean currentReturnResultValue = context.getOptions().getBoolean(ExecConstants.RETURN_RESULT_SET_FOR_DDL); @@ -315,33 +310,24 @@ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Point } PhysicalPlan physicalPlan = handler.getPlan(sqlNode); - getQueryPlanCache.put(queryPlanCacheKey, physicalPlan); - return physicalPlan; - } - private static class QueryPlanCacheKey { - private final SqlNode sqlNode; - - public QueryPlanCacheKey(SqlNode sqlNode) { - this.sqlNode = sqlNode; - } + // Cache the plan if caching is enabled and this is a cacheable query type + if (planCacheEnabled && isCacheableQuery(sqlNode)) { + CustomCacheManager.putQueryPlan(sql, physicalPlan); + logger.debug("Cached query plan for SQL: {}", sql.substring(0, Math.min(sql.length(), 100))); + } - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - QueryPlanCacheKey cacheKey = (QueryPlanCacheKey) o; - return sqlNode.equalsDeep(cacheKey.sqlNode, Litmus.IGNORE); - } + return physicalPlan; + } - @Override - public int hashCode() { - return Objects.hash(sqlNode); - } + /** + * Determines if a query type should be cached. + * Only SELECT queries are cached; DDL and other modifying statements should not be cached. + */ + private static boolean isCacheableQuery(SqlNode sqlNode) { + SqlKind kind = sqlNode.getKind(); + // Only cache SELECT queries, not DDL or DML + return kind == SqlKind.SELECT || kind == SqlKind.ORDER_BY || kind == SqlKind.UNION; } private static boolean isAutoLimitShouldBeApplied(SqlNode sqlNode, int queryMaxRows) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 428102b6a34..41966793727 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -388,11 +388,8 @@ public boolean equals(Object o) { return false; } CacheKey cacheKey = (CacheKey) o; - logger.info("Compare phase {} {}, {} ", phase.equals(cacheKey.phase), phase.name(), cacheKey.phase.name()); - logger.info("Compare plannerType {} {} {}", plannerType.equals(cacheKey.plannerType), plannerType.name(), cacheKey.plannerType.name()); - logger.info("Compare input {}", input.deepEquals(cacheKey.input)); - return phase.name().equals(cacheKey.phase.name()) && - plannerType.name().equals(cacheKey.plannerType.name()) && + return phase == cacheKey.phase && + plannerType == cacheKey.plannerType && input.deepEquals(cacheKey.input) && targetTraits.equals(cacheKey.targetTraits); } @@ -421,7 +418,7 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode RelNode cachedResult = CustomCacheManager.getTransformedPlan(key); if (cachedResult != null) { - CustomCacheManager.logCacheStats(); + logger.debug("Cache hit for transform phase: {}", phase); return cachedResult; } } @@ -430,7 +427,6 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode switch (plannerType) { case HEP_BOTTOM_UP: case HEP: { - logger.info("DefaultSqlHandler.transform()"); final HepProgramBuilder hepPgmBldr = new HepProgramBuilder(); if (plannerType == PlannerType.HEP_BOTTOM_UP) { hepPgmBldr.addMatchOrder(HepMatchOrder.BOTTOM_UP); @@ -462,22 +458,15 @@ protected RelNode transform(PlannerType plannerType, PlannerPhase phase, RelNode Preconditions.checkArgument(planner instanceof VolcanoPlanner, "Cluster is expected to be constructed using VolcanoPlanner. Was actually of type %s.", planner.getClass() .getName()); - logger.info("DefaultSqlHandler.transform() program.run( before"); output = program.run(planner, input, toTraits, ImmutableList.of(), ImmutableList.of()); - logger.info("DefaultSqlHandler.transform() program.run( after"); - break; } } if (planCacheEnabled) { - logger.info("planCache enabled, storing transformedplan"); - // Store the result in the cache before returning CustomCacheManager.putTransformedPlan(key, output); - CustomCacheManager.logCacheStats(); - } else { - logger.info("planCache disabled, not storing transformedplan"); + logger.debug("Cached transform result for phase: {}", phase); } if (log) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 0e5e0ba89b7..181817c22d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -29,7 +29,6 @@ import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.common.util.JacksonUtils; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.cache.CustomCacheManager; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.QueryContext; @@ -39,7 +38,6 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; -import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -57,7 +55,6 @@ import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.FailureUtils; -import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionSet; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; @@ -597,50 +594,9 @@ private void logWorkUnit(QueryWorkUnit queryWorkUnit) { queryId, queryWorkUnit.stringifyFragments())); } - private PhysicalPlan getOrBuildPlan(String sql, Pointer textPlan, boolean planCacheEnabled) - throws ExecutionSetupException { - - PhysicalPlan plan = null; - - if (planCacheEnabled) { - logger.info("Cache enabled, checking entries"); - plan = CustomCacheManager.getQueryPlan(sql); - - if (plan == null) { - logger.info("Cache miss, generating new plan"); - plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); - - if (isCacheableQuery(sql)) { - CustomCacheManager.putQueryPlan(sql, plan); - CustomCacheManager.logCacheStats(); - } - } else { - logger.info("Using cached plan"); - } - - } else { - plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); - } - - return plan; - } - - private boolean isCacheableQuery(String sql) { - return sql.trim().toUpperCase().startsWith("SELECT"); - } - private void runSQL(final String sql) throws ExecutionSetupException { final Pointer textPlan = new Pointer<>(); - final OptionManager options = queryContext.getOptions(); - final boolean planCacheEnabled = options.getOption(PlannerSettings.PLAN_CACHE); - if (planCacheEnabled) { - logger.info("PlanCache is enabled"); - } else { - logger.info("PlanCache is disabled"); - } - - PhysicalPlan plan = getOrBuildPlan(sql, textPlan, planCacheEnabled); - + final PhysicalPlan plan = DrillSqlWorker.getPlan(queryContext, sql, textPlan); runPhysicalPlan(plan, textPlan); }