Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class BuildInfoInSparkUISuite extends AuronQueryTest with BaseAuronSQLSuite {
val listeners = spark.sparkContext.listenerBus.findListenersByClass[AuronSQLAppStatusListener]
assert(listeners.size === 1)
val listener = listeners(0)
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(listener.getAuronBuildInfo() == 1)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelpe
import org.apache.spark.sql.auron.iceberg.IcebergScanPlan
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
Expand All @@ -53,7 +52,9 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
with Logging {

override lazy val metrics: Map[String, SQLMetric] =
NativeHelper.getNativeFileScanMetrics(sparkContext)
NativeHelper.getNativeFileScanMetrics(sparkContext) ++ Seq(
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "Native.partitions_read"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "Native.files_read"))

override val output = basedScan.output
override val outputPartitioning = basedScan.outputPartitioning
Expand All @@ -65,7 +66,11 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks
private lazy val pruningPredicates: Seq[pb.PhysicalExprNode] = plan.pruningPredicates

private lazy val partitions: Array[FilePartition] = buildFilePartitions()
private lazy val partitions: Array[FilePartition] = {
val filePartitions = buildFilePartitions()
postDriverMetrics(filePartitions)
filePartitions
}
private lazy val fileSizes: Map[String, Long] = buildFileSizes()

private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema)
Expand Down Expand Up @@ -221,6 +226,18 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
.toMap
}

private def postDriverMetrics(filePartitions: Array[FilePartition]): Unit = {
val numPartitions = filePartitions.length
metrics("numPartitions").add(numPartitions)
val numFiles = filePartitions.foldLeft(0L)(_ + _.files.length)
metrics("numFiles").add(numFiles)
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(
sparkContext,
executionId,
Seq(metrics("numPartitions"), metrics("numFiles")))
Comment thread
weimingdiit marked this conversation as resolved.
}

private def buildFilePartitions(): Array[FilePartition] = {
// Convert Iceberg file tasks into Spark FilePartition groups for execution.
if (fileTasks.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,22 @@
package org.apache.auron.iceberg

import java.util.UUID
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._

import org.apache.iceberg.{FileFormat, FileScanTask}
import org.apache.iceberg.data.{GenericAppenderFactory, Record}
import org.apache.iceberg.deletes.PositionDelete
import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.auron.iceberg.IcebergScanSupport
import org.apache.spark.sql.execution.auron.plan.NativeIcebergTableScanExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates

class AuronIcebergIntegrationSuite
extends org.apache.spark.sql.QueryTest
Expand All @@ -51,6 +57,52 @@ class AuronIcebergIntegrationSuite
}
}

test("iceberg native scan exposes file scan driver metrics") {
withTable("local.db.t_metrics") {
sql("create table local.db.t_metrics using iceberg as select 1 as id, 'a' as v")
withSQLConf("spark.sql.adaptive.enabled" -> "false") {
val df = sql("select * from local.db.t_metrics")
val nativeScan = executedNativeIcebergTableScanExec(df)
val metricIds = Map(
"numPartitions" -> nativeScan.metrics("numPartitions").id,
"numFiles" -> nativeScan.metrics("numFiles").id)
val driverMetricUpdates = new ConcurrentLinkedQueue[(Long, Long)]()
val driverMetricUpdatesPosted = new CountDownLatch(1)
val listener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerDriverAccumUpdates(_, updates) =>
updates.foreach { case (metricId, value) =>
driverMetricUpdates.add(metricId -> value)
}
val updatedMetricIds = driverMetricUpdates.iterator().asScala.map(_._1).toSet
if (metricIds.values.forall(updatedMetricIds.contains)) {
driverMetricUpdatesPosted.countDown()
}
case _ =>
}
}

spark.sparkContext.addSparkListener(listener)
try {
checkAnswer(df, Seq(Row(1, "a")))
assert(driverMetricUpdatesPosted.await(30, TimeUnit.SECONDS))
} finally {
spark.sparkContext.removeSparkListener(listener)
}

val driverMetricValues = driverMetricUpdates
.iterator()
.asScala
.toSeq
.groupBy(_._1)
.mapValues(_.map(_._2).sum)
.toMap
assert(driverMetricValues.getOrElse(metricIds("numPartitions"), 0L) > 0)
assert(driverMetricValues.getOrElse(metricIds("numFiles"), 0L) > 0)
}
}
}

test("iceberg native scan is applied for empty COW table") {
withTable("local.db.t_empty") {
sql("""
Expand Down Expand Up @@ -345,4 +397,12 @@ class AuronIcebergIntegrationSuite
df.queryExecution.sparkPlan.collectFirst { case scan: BatchScanExec =>
IcebergScanSupport.plan(scan)
}.flatten

private def executedNativeIcebergTableScanExec(df: DataFrame): NativeIcebergTableScanExec = {
val nativeScan = df.queryExecution.executedPlan.collectFirst {
case scan: NativeIcebergTableScanExec => scan
}
assert(nativeScan.nonEmpty)
nativeScan.get
}
}
Loading