diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala index 864879f12..08a4474bf 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/BuildInfoInSparkUISuite.scala @@ -46,6 +46,7 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with BaseAuronSQLSuite { val listeners = spark.sparkContext.listenerBus.findListenersByClass[AuronSQLAppStatusListener] assert(listeners.size === 1) val listener = listeners(0) + spark.sparkContext.listenerBus.waitUntilEmpty() assert(listener.getAuronBuildInfo() == 1) } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 386977925..6fa5f1a99 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -34,11 +34,10 @@ import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelpe import org.apache.spark.sql.auron.iceberg.IcebergScanPlan import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec -import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -53,7 +52,9 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca with Logging { override lazy val metrics: Map[String, SQLMetric] = - NativeHelper.getNativeFileScanMetrics(sparkContext) + NativeHelper.getNativeFileScanMetrics(sparkContext) ++ Seq( + "numPartitions" -> SQLMetrics.createMetric(sparkContext, "Native.partitions_read"), + "numFiles" -> SQLMetrics.createMetric(sparkContext, "Native.files_read")) override val output = basedScan.output override val outputPartitioning = basedScan.outputPartitioning @@ -65,7 +66,11 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks private lazy val pruningPredicates: Seq[pb.PhysicalExprNode] = plan.pruningPredicates - private lazy val partitions: Array[FilePartition] = buildFilePartitions() + private lazy val partitions: Array[FilePartition] = { + val filePartitions = buildFilePartitions() + postDriverMetrics(filePartitions) + filePartitions + } private lazy val fileSizes: Map[String, Long] = buildFileSizes() private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema) @@ -221,6 +226,18 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca .toMap } + private def postDriverMetrics(filePartitions: Array[FilePartition]): Unit = { + val numPartitions = filePartitions.length + metrics("numPartitions").add(numPartitions) + val numFiles = filePartitions.foldLeft(0L)(_ + _.files.length) + metrics("numFiles").add(numFiles) + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates( + sparkContext, + executionId, + Seq(metrics("numPartitions"), metrics("numFiles"))) + } + private def buildFilePartitions(): Array[FilePartition] = { // Convert Iceberg file tasks into Spark FilePartition groups for execution. if (fileTasks.isEmpty) { diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 0c5b752a8..7250d0fef 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -17,6 +17,9 @@ package org.apache.auron.iceberg import java.util.UUID +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ @@ -24,9 +27,12 @@ import org.apache.iceberg.{FileFormat, FileScanTask} import org.apache.iceberg.data.{GenericAppenderFactory, Record} import org.apache.iceberg.deletes.PositionDelete import org.apache.iceberg.spark.Spark3Util +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.auron.iceberg.IcebergScanSupport +import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates class AuronIcebergIntegrationSuite extends org.apache.spark.sql.QueryTest @@ -51,6 +57,52 @@ class AuronIcebergIntegrationSuite } } + test("iceberg native scan exposes file scan driver metrics") { + withTable("local.db.t_metrics") { + sql("create table local.db.t_metrics using iceberg as select 1 as id, 'a' as v") + withSQLConf("spark.sql.adaptive.enabled" -> "false") { + val df = sql("select * from local.db.t_metrics") + val nativeScan = executedNativeIcebergTableScanExec(df) + val metricIds = Map( + "numPartitions" -> nativeScan.metrics("numPartitions").id, + "numFiles" -> nativeScan.metrics("numFiles").id) + val driverMetricUpdates = new ConcurrentLinkedQueue[(Long, Long)]() + val driverMetricUpdatesPosted = new CountDownLatch(1) + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case SparkListenerDriverAccumUpdates(_, updates) => + updates.foreach { case (metricId, value) => + driverMetricUpdates.add(metricId -> value) + } + val updatedMetricIds = driverMetricUpdates.iterator().asScala.map(_._1).toSet + if (metricIds.values.forall(updatedMetricIds.contains)) { + driverMetricUpdatesPosted.countDown() + } + case _ => + } + } + + spark.sparkContext.addSparkListener(listener) + try { + checkAnswer(df, Seq(Row(1, "a"))) + assert(driverMetricUpdatesPosted.await(30, TimeUnit.SECONDS)) + } finally { + spark.sparkContext.removeSparkListener(listener) + } + + val driverMetricValues = driverMetricUpdates + .iterator() + .asScala + .toSeq + .groupBy(_._1) + .mapValues(_.map(_._2).sum) + .toMap + assert(driverMetricValues.getOrElse(metricIds("numPartitions"), 0L) > 0) + assert(driverMetricValues.getOrElse(metricIds("numFiles"), 0L) > 0) + } + } + } + test("iceberg native scan is applied for empty COW table") { withTable("local.db.t_empty") { sql(""" @@ -345,4 +397,12 @@ class AuronIcebergIntegrationSuite df.queryExecution.sparkPlan.collectFirst { case scan: BatchScanExec => IcebergScanSupport.plan(scan) }.flatten + + private def executedNativeIcebergTableScanExec(df: DataFrame): NativeIcebergTableScanExec = { + val nativeScan = df.queryExecution.executedPlan.collectFirst { + case scan: NativeIcebergTableScanExec => scan + } + assert(nativeScan.nonEmpty) + nativeScan.get + } }