Skip to content

Commit c0eb149

Browse files
committed
project_comet_metrics
1 parent 9b773f3 commit c0eb149

6 files changed

Lines changed: 112 additions & 2 deletions

File tree

dev/ensure-jars-have-correct-contents.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ allowed_expr+="|^org/apache/spark/sql/$"
9393
allowed_expr+="|^org/apache/spark/sql/ExtendedExplainGenerator.*$"
9494
allowed_expr+="|^org/apache/spark/CometPlugin.class$"
9595
allowed_expr+="|^org/apache/spark/CometDriverPlugin.*$"
96+
allowed_expr+="|^org/apache/spark/CometSource.*$"
9697
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.class$"
9798
allowed_expr+="|^org/apache/spark/CometTaskMemoryManager.*$"
9899
allowed_expr+="|^scala-collection-compat.properties$"

spark/src/main/scala/org/apache/comet/ExtendedExplainInfo.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,25 @@ class CometCoverageStats {
192192
}
193193
}
194194

195+
object CometCoverageStats {
196+
197+
/**
198+
* Compute coverage stats for a plan without generating explain string.
199+
*/
200+
def forPlan(plan: SparkPlan): CometCoverageStats = {
201+
val stats = new CometCoverageStats()
202+
val explainInfo = new ExtendedExplainInfo()
203+
explainInfo.generateTreeString(
204+
CometExplainInfo.getActualPlan(plan),
205+
0,
206+
Seq(),
207+
0,
208+
new StringBuilder(),
209+
stats)
210+
stats
211+
}
212+
}
213+
195214
object CometExplainInfo {
196215
val EXTENSION_INFO = new TreeNodeTag[Set[String]]("CometExtensionInfo")
197216

spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.apache.comet.rules
2121

2222
import scala.collection.mutable.ListBuffer
2323

24+
import org.apache.spark.CometSource
2425
import org.apache.spark.sql.SparkSession
2526
import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, Remainder}
2627
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
@@ -47,7 +48,7 @@ import org.apache.spark.sql.execution.window.WindowExec
4748
import org.apache.spark.sql.internal.SQLConf
4849
import org.apache.spark.sql.types._
4950

50-
import org.apache.comet.{CometConf, CometExplainInfo, ExtendedExplainInfo}
51+
import org.apache.comet.{CometConf, CometCoverageStats, CometExplainInfo, ExtendedExplainInfo}
5152
import org.apache.comet.CometConf.{COMET_SPARK_TO_ARROW_ENABLED, COMET_SPARK_TO_ARROW_SUPPORTED_OPERATOR_LIST}
5253
import org.apache.comet.CometSparkSessionExtensions._
5354
import org.apache.comet.rules.CometExecRule.allExecs
@@ -389,6 +390,10 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
389390

390391
var newPlan = transform(planWithJoinRewritten)
391392

393+
// Record coverage stats for metrics
394+
val stats = CometCoverageStats.forPlan(newPlan)
395+
CometSource.recordStats(stats)
396+
392397
// if the plan cannot be run fully natively then explain why (when appropriate
393398
// config is enabled)
394399
if (CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.get()) {
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.spark
21+
22+
import org.apache.spark.metrics.source.Source
23+
24+
import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
25+
26+
import org.apache.comet.CometCoverageStats
27+
28+
/**
29+
* Exposes following metrics (hooked from CometCoverageStats)
30+
* - operators.native: Total operators executed natively
31+
* - operators.spark: Total operators that fell back to Spark
32+
* - queries.planned: Total queries processed
33+
* - transitions: Total Spark-to-Comet transitions
34+
* - acceleration.ratio: native / (native + spark)
35+
*/
36+
object CometSource extends Source {
37+
override val sourceName = "comet"
38+
override val metricRegistry = new MetricRegistry()
39+
40+
val NATIVE_OPERATORS: Counter =
41+
metricRegistry.counter(MetricRegistry.name("operators", "native"))
42+
val SPARK_OPERATORS = metricRegistry.counter(MetricRegistry.name("operators", "spark"))
43+
val QUERIES_PLANNED = metricRegistry.counter(MetricRegistry.name("queries", "planned"))
44+
val TRANSITIONS = metricRegistry.counter(MetricRegistry.name("transitions"))
45+
46+
metricRegistry.register(
47+
MetricRegistry.name("acceleration", "ratio"),
48+
new Gauge[Double] {
49+
override def getValue: Double = {
50+
val native = NATIVE_OPERATORS.getCount
51+
val total = native + SPARK_OPERATORS.getCount
52+
if (total > 0) native.toDouble / total else 0.0
53+
}
54+
})
55+
56+
def recordStats(stats: CometCoverageStats): Unit = {
57+
NATIVE_OPERATORS.inc(stats.cometOperators)
58+
SPARK_OPERATORS.inc(stats.sparkOperators)
59+
TRANSITIONS.inc(stats.transitions)
60+
QUERIES_PLANNED.inc()
61+
}
62+
}

spark/src/main/scala/org/apache/spark/Plugins.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ class CometDriverPlugin extends DriverPlugin with Logging with ShimCometDriverPl
5757
// register CometSparkSessionExtensions if it isn't already registered
5858
CometDriverPlugin.registerCometSessionExtension(sc.conf)
5959

60+
// Register Comet metrics source
61+
sc.env.metricsSystem.registerSource(CometSource)
62+
6063
if (CometSparkSessionExtensions.shouldOverrideMemoryConf(sc.getConf)) {
6164
val execMemOverhead = if (sc.getConf.contains(EXECUTOR_MEMORY_OVERHEAD.key)) {
6265
sc.getConf.getSizeAsMb(EXECUTOR_MEMORY_OVERHEAD.key)

spark/src/test/scala/org/apache/spark/CometPluginsSuite.scala

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919

2020
package org.apache.spark
2121

22-
import org.apache.spark.sql.CometTestBase
22+
import java.io.File
23+
24+
import org.apache.spark.sql.{CometTestBase, SaveMode}
2325
import org.apache.spark.sql.internal.StaticSQLConf
2426

2527
class CometPluginsSuite extends CometTestBase {
@@ -77,6 +79,24 @@ class CometPluginsSuite extends CometTestBase {
7779
}
7880
}
7981

82+
test("CometSource metrics are recorded") {
83+
val nativeBefore = CometSource.NATIVE_OPERATORS.getCount
84+
val queriesBefore = CometSource.QUERIES_PLANNED.getCount
85+
86+
withTempPath { dir =>
87+
val path = new File(dir, "test.parquet").toString
88+
spark.range(1000).toDF("id").write.mode(SaveMode.Overwrite).parquet(path)
89+
spark.read.parquet(path).filter("id > 500").collect()
90+
}
91+
92+
assert(
93+
CometSource.QUERIES_PLANNED.getCount > queriesBefore,
94+
"queries.planned should increment after query")
95+
assert(
96+
CometSource.NATIVE_OPERATORS.getCount > nativeBefore,
97+
"operators.native should increment for native execution")
98+
}
99+
80100
test("Default Comet memory overhead") {
81101
val execMemOverhead1 = spark.conf.get("spark.executor.memoryOverhead")
82102
val execMemOverhead2 = spark.sessionState.conf.getConfString("spark.executor.memoryOverhead")

0 commit comments

Comments
 (0)