From ad7de5c8105414e5f566c5b10ea30fb986c106b8 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Sun, 15 Mar 2026 14:27:01 -0700 Subject: [PATCH 1/4] project_comet_metrics --- dev/ensure-jars-have-correct-contents.sh | 1 + .../apache/comet/CometMetricsListener.scala | 45 ++++++++++++++ .../apache/comet/ExtendedExplainInfo.scala | 19 ++++++ .../apache/comet/rules/CometExecRule.scala | 4 +- .../scala/org/apache/spark/CometSource.scala | 62 +++++++++++++++++++ .../main/scala/org/apache/spark/Plugins.scala | 3 + .../org/apache/spark/CometPluginsSuite.scala | 41 +++++++++++- 7 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 spark/src/main/scala/org/apache/comet/CometMetricsListener.scala create mode 100644 spark/src/main/scala/org/apache/spark/CometSource.scala diff --git a/dev/ensure-jars-have-correct-contents.sh b/dev/ensure-jars-have-correct-contents.sh index 570aeabb2b..084936475d 100755 --- a/dev/ensure-jars-have-correct-contents.sh +++ b/dev/ensure-jars-have-correct-contents.sh @@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$" allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$" allowed_expr+="|^org/apache/spark/CometPlugin.class$" allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$" +allowed_expr+="|^org/apache/spark/CometSource.*$" allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$" allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$" allowed_expr+="|^scala-collection-compat.properties$" diff --git a/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala new file mode 100644 index 0000000000..6bd2bdc53c --- /dev/null +++ b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.comet + +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.CometSource +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.util.QueryExecutionListener + +object CometMetricsListener extends QueryExecutionListener { + + private val registered = new AtomicBoolean(false) + + def ensureRegistered(session: SparkSession): Unit = { + if (registered.compareAndSet(false, true)) { + session.listenerManager.register(this) + } + } + + override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { + val stats = CometCoverageStats.forPlan(qe.executedPlan) + CometSource.recordStats(stats) + } + + override def onFailure(funcName: String, qe: QueryExecution, exception: Exception): Unit = {} +} diff --git a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala index f47428e801..d30a1fe788 100644 --- a/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala +++ b/spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala @@ -192,6 +192,25 @@ class CometCoverageStats { } } +object CometCoverageStats { + + /** + * Compute coverage stats for a plan without generating explain string. + */ + def forPlan(plan: SparkPlan): CometCoverageStats = { + val stats = new CometCoverageStats() + val explainInfo = new ExtendedExplainInfo() + explainInfo.generateTreeString( + CometExplainInfo.getActualPlan(plan), + 0, + Seq(), + 0, + new StringBuilder(), + stats) + stats + } +} + object CometExplainInfo { val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo") diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index 76e741e3bf..d4a0eddccc 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo} +import org.apache.comet.{CometConf, CometExplainInfo, CometMetricsListener, ExtendedExplainInfo} import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST} import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.rules.CometExecRule.allExecs @@ -387,6 +387,8 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { normalizedPlan } + CometMetricsListener.ensureRegistered(session) + var newPlan = transform(planWithJoinRewritten) // if the plan cannot be run fully natively then explain why (when appropriate diff --git a/spark/src/main/scala/org/apache/spark/CometSource.scala b/spark/src/main/scala/org/apache/spark/CometSource.scala new file mode 100644 index 0000000000..e243b48d0f --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/CometSource.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark + +import org.apache.spark.metrics.source.Source + +import com.codahale.metrics.{Counter, Gauge, MetricRegistry} + +import org.apache.comet.CometCoverageStats + +/** + * Exposes following metrics (hooked from CometCoverageStats) + * - operators.native: Total operators executed natively + * - operators.spark: Total operators that fell back to Spark + * - queries.planned: Total queries processed + * - transitions: Total Spark-to-Comet transitions + * - acceleration.ratio: native / (native + spark) + */ +object CometSource extends Source { + override val sourceName = "comet" + override val metricRegistry = new MetricRegistry() + + val NATIVE_OPERATORS: Counter = + metricRegistry.counter(MetricRegistry.name("operators", "native")) + val SPARK_OPERATORS = metricRegistry.counter(MetricRegistry.name("operators", "spark")) + val QUERIES_PLANNED = metricRegistry.counter(MetricRegistry.name("queries", "planned")) + val TRANSITIONS = metricRegistry.counter(MetricRegistry.name("transitions")) + + metricRegistry.register( + MetricRegistry.name("acceleration", "ratio"), + new Gauge[Double] { + override def getValue: Double = { + val native = NATIVE_OPERATORS.getCount + val total = native + SPARK_OPERATORS.getCount + if (total > 0) native.toDouble / total else 0.0 + } + }) + + def recordStats(stats: CometCoverageStats): Unit = { + NATIVE_OPERATORS.inc(stats.cometOperators) + SPARK_OPERATORS.inc(stats.sparkOperators) + TRANSITIONS.inc(stats.transitions) + QUERIES_PLANNED.inc() + } +} diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala b/spark/src/main/scala/org/apache/spark/Plugins.scala index 2529f08cfb..027acf257d 100644 --- a/spark/src/main/scala/org/apache/spark/Plugins.scala +++ b/spark/src/main/scala/org/apache/spark/Plugins.scala @@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl // register CometSparkSessionExtensions if it isn't already registered CometDriverPlugin.registerCometSessionExtension(sc.conf) + // Register Comet metrics source + sc.env.metricsSystem.registerSource(CometSource) + if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) { val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) { sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key) diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala index c493b22f79..e7be7fd0be 100644 --- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala @@ -19,7 +19,9 @@ package org.apache.spark -import org.apache.spark.sql.CometTestBase +import java.io.File + +import org.apache.spark.sql.{CometTestBase, SaveMode} import org.apache.spark.sql.internal.StaticSQLConf class CometPluginsSuite extends CometTestBase { @@ -77,6 +79,43 @@ class CometPluginsSuite extends CometTestBase { } } + test("CometSource metrics are recorded") { + val nativeBefore = CometSource.NATIVE_OPERATORS.getCount + val queriesBefore = CometSource.QUERIES_PLANNED.getCount + + withTempPath { dir => + val path = new File(dir, "test.parquet").toString + spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) + spark.read.parquet(path).filter("id > 500").collect() + } + + assert( + CometSource.QUERIES_PLANNED.getCount > queriesBefore, + "queries.planned should increment after query") + assert( + CometSource.NATIVE_OPERATORS.getCount > nativeBefore, + "operators.native should increment for native execution") + } + + test("metrics not double counted with AQE") { + withSQLConf("spark.sql.adaptive.enabled" -> "true") { + withTempPath { dir => + val path = new File(dir, "test.parquet").toString + spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) + + val queriesBefore = CometSource.QUERIES_PLANNED.getCount + + spark.read.parquet(path).filter("id > 100").collect() + spark.read.parquet(path).filter("id > 200").collect() + + val queriesAfter = CometSource.QUERIES_PLANNED.getCount + assert( + queriesAfter == queriesBefore + 2, + s"Expected 2 queries, got ${queriesAfter - queriesBefore}") + } + } + } + test("Default Comet memory overhead") { val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead") val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead") From 2eb506f2140db54603a1d7c63e293c4a29c229d1 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Mon, 16 Mar 2026 08:52:24 -0700 Subject: [PATCH 2/4] fix_review_comments --- spark/src/main/scala/org/apache/spark/CometSource.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/CometSource.scala b/spark/src/main/scala/org/apache/spark/CometSource.scala index e243b48d0f..95d7523616 100644 --- a/spark/src/main/scala/org/apache/spark/CometSource.scala +++ b/spark/src/main/scala/org/apache/spark/CometSource.scala @@ -39,9 +39,9 @@ object CometSource extends Source { val NATIVE_OPERATORS: Counter = metricRegistry.counter(MetricRegistry.name("operators", "native")) - val SPARK_OPERATORS = metricRegistry.counter(MetricRegistry.name("operators", "spark")) - val QUERIES_PLANNED = metricRegistry.counter(MetricRegistry.name("queries", "planned")) - val TRANSITIONS = metricRegistry.counter(MetricRegistry.name("transitions")) + val SPARK_OPERATORS: Counter = metricRegistry.counter(MetricRegistry.name("operators", "spark")) + val QUERIES_PLANNED: Counter = metricRegistry.counter(MetricRegistry.name("queries", "planned")) + val TRANSITIONS: Counter = metricRegistry.counter(MetricRegistry.name("transitions")) metricRegistry.register( MetricRegistry.name("acceleration", "ratio"), From eaa1dc45bd5775b4c74f6ab49c9f7aaa4cb820e0 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 17 Mar 2026 01:10:08 -0700 Subject: [PATCH 3/4] add_listner_comet_plugin_ini --- .../org/apache/comet/CometMetricsListener.scala | 13 +------------ .../org/apache/comet/rules/CometExecRule.scala | 4 +--- spark/src/main/scala/org/apache/spark/Plugins.scala | 7 +++++++ .../scala/org/apache/spark/CometPluginsSuite.scala | 6 +++--- 4 files changed, 12 insertions(+), 18 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala index 6bd2bdc53c..e8907d8264 100644 --- a/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala +++ b/spark/src/main/scala/org/apache/comet/CometMetricsListener.scala @@ -19,22 +19,11 @@ package org.apache.comet -import java.util.concurrent.atomic.AtomicBoolean - import org.apache.spark.CometSource -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.util.QueryExecutionListener -object CometMetricsListener extends QueryExecutionListener { - - private val registered = new AtomicBoolean(false) - - def ensureRegistered(session: SparkSession): Unit = { - if (registered.compareAndSet(false, true)) { - session.listenerManager.register(this) - } - } +class CometMetricsListener extends QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { val stats = CometCoverageStats.forPlan(qe.executedPlan) diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala index d4a0eddccc..76e741e3bf 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala @@ -47,7 +47,7 @@ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ -import org.apache.comet.{CometConf, CometExplainInfo, CometMetricsListener, ExtendedExplainInfo} +import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo} import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST} import org.apache.comet.CometSparkSessionExtensions._ import org.apache.comet.rules.CometExecRule.allExecs @@ -387,8 +387,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] { normalizedPlan } - CometMetricsListener.ensureRegistered(session) - var newPlan = transform(planWithJoinRewritten) // if the plan cannot be run fully natively then explain why (when appropriate diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala b/spark/src/main/scala/org/apache/spark/Plugins.scala index 027acf257d..6ea99fe995 100644 --- a/spark/src/main/scala/org/apache/spark/Plugins.scala +++ b/spark/src/main/scala/org/apache/spark/Plugins.scala @@ -60,6 +60,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl // Register Comet metrics source sc.env.metricsSystem.registerSource(CometSource) + // Register query execution listener via config + CometDriverPlugin.registerQueryExecutionListener(sc.conf) + if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) { val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) { sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key) @@ -104,6 +107,10 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl } object CometDriverPlugin extends Logging { + def registerQueryExecutionListener(conf: SparkConf): Unit = { + conf.set("spark.sql.queryExecutionListeners", "org.apache.comet.CometMetricsListener") + } + def registerCometSessionExtension(conf: SparkConf): Unit = { val extensionKey = StaticSQLConf.SPARK_SESSION_EXTENSIONS.key val extensionClass = classOf[CometSparkSessionExtensions].getName diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala index e7be7fd0be..b1781b2504 100644 --- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala @@ -88,7 +88,7 @@ class CometPluginsSuite extends CometTestBase { spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) spark.read.parquet(path).filter("id > 500").collect() } - + spark.sparkContext.listenerBus.waitUntilEmpty() assert( CometSource.QUERIES_PLANNED.getCount > queriesBefore, "queries.planned should increment after query") @@ -104,10 +104,10 @@ class CometPluginsSuite extends CometTestBase { spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) val queriesBefore = CometSource.QUERIES_PLANNED.getCount - + spark.sparkContext.listenerBus.waitUntilEmpty() spark.read.parquet(path).filter("id > 100").collect() spark.read.parquet(path).filter("id > 200").collect() - + spark.sparkContext.listenerBus.waitUntilEmpty() val queriesAfter = CometSource.QUERIES_PLANNED.getCount assert( queriesAfter == queriesBefore + 2, From 2b6dcc1b78721064122ebcf8c8ed3e42e7e5af22 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 18 Mar 2026 15:05:17 -0700 Subject: [PATCH 4/4] fix_review_comments --- .../main/scala/org/apache/spark/Plugins.scala | 26 ++++++++++++++----- .../org/apache/spark/CometPluginsSuite.scala | 2 +- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/Plugins.scala b/spark/src/main/scala/org/apache/spark/Plugins.scala index 6ea99fe995..e03433e7b2 100644 --- a/spark/src/main/scala/org/apache/spark/Plugins.scala +++ b/spark/src/main/scala/org/apache/spark/Plugins.scala @@ -57,11 +57,8 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl // register CometSparkSessionExtensions if it isn't already registered CometDriverPlugin.registerCometSessionExtension(sc.conf) - // Register Comet metrics source - sc.env.metricsSystem.registerSource(CometSource) - - // Register query execution listener via config - CometDriverPlugin.registerQueryExecutionListener(sc.conf) + // Register Comet metrics + CometDriverPlugin.registerCometMetrics(sc) if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) { val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) { @@ -107,8 +104,23 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl } object CometDriverPlugin extends Logging { - def registerQueryExecutionListener(conf: SparkConf): Unit = { - conf.set("spark.sql.queryExecutionListeners", "org.apache.comet.CometMetricsListener") + def registerCometMetrics(sc: SparkContext): Unit = { + sc.env.metricsSystem.registerSource(CometSource) + + val listenerKey = "spark.sql.queryExecutionListeners" + val listenerClass = "org.apache.comet.CometMetricsListener" + val listeners = sc.conf.get(listenerKey, "") + if (listeners.isEmpty) { + logInfo(s"Setting $listenerKey=$listenerClass") + sc.conf.set(listenerKey, listenerClass) + } else { + val currentListeners = listeners.split(",").map(_.trim) + if (!currentListeners.contains(listenerClass)) { + val newValue = s"$listeners,$listenerClass" + logInfo(s"Setting $listenerKey=$newValue") + sc.conf.set(listenerKey, newValue) + } + } } def registerCometSessionExtension(conf: SparkConf): Unit = { diff --git a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala index b1781b2504..cab263854f 100644 --- a/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala @@ -103,8 +103,8 @@ class CometPluginsSuite extends CometTestBase { val path = new File(dir, "test.parquet").toString spark.range(10000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path) - val queriesBefore = CometSource.QUERIES_PLANNED.getCount spark.sparkContext.listenerBus.waitUntilEmpty() + val queriesBefore = CometSource.QUERIES_PLANNED.getCount spark.read.parquet(path).filter("id > 100").collect() spark.read.parquet(path).filter("id > 200").collect() spark.sparkContext.listenerBus.waitUntilEmpty()