From fe35c6e47805e6ca888e4d65773ebef93c5e9b69 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Wed, 6 May 2026 11:47:29 +0800 Subject: [PATCH 1/3] test --- .../spark/sql/auron/AuronConverters.scala | 38 ++++++++------ thirdparty/auron-iceberg/pom.xml | 5 ++ .../iceberg/IcebergConvertProvider.scala | 17 +++---- .../auron/iceberg/IcebergScanSupport.scala | 49 ++++++++----------- 4 files changed, 57 insertions(+), 52 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index cc12a176a..acb46c314 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -239,23 +239,33 @@ object AuronConverters extends Logging { case exec: ForceNativeExecutionWrapperBase => exec case exec => - extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match { - case Some(provider) => tryConvert(exec, provider.convert) - case None => - Shims.get.convertMoreSparkPlan(exec) match { - case Some(exec) => - exec.setTagValue(convertibleTag, true) - exec.setTagValue(convertStrategyTag, AlwaysConvert) - exec - case None => - if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader + try { + extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match { + case Some(provider) => tryConvert(exec, provider.convert) + case None => + Shims.get.convertMoreSparkPlan(exec) match { + case Some(exec) => exec.setTagValue(convertibleTag, true) exec.setTagValue(convertStrategyTag, AlwaysConvert) exec - } else { - addNeverConvertReasonTag(exec) - } - } + case None => + if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader + exec.setTagValue(convertibleTag, true) + exec.setTagValue(convertStrategyTag, AlwaysConvert) + exec + } else { + addNeverConvertReasonTag(exec) + } + } + } + } catch { + case e @ (_: NotImplementedError | _: AssertionError | _: Exception) => + exec.setTagValue(convertibleTag, false) + exec.setTagValue(convertStrategyTag, NeverConvert) + exec.setTagValue( + neverConvertReasonTag, + s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}") + exec } } } diff --git a/thirdparty/auron-iceberg/pom.xml b/thirdparty/auron-iceberg/pom.xml index bb686f481..ec5b3db23 100644 --- a/thirdparty/auron-iceberg/pom.xml +++ b/thirdparty/auron-iceberg/pom.xml @@ -81,6 +81,11 @@ ${project.version} test + + org.scala-lang + scala-library + provided + diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index aae3f576e..ba352f40e 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -30,16 +30,13 @@ class IcebergConvertProvider extends AuronConvertProvider with Logging { override def isEnabled: Boolean = { val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get() - if (!enabled) { - return false - } - if (!sparkCompatible) { - logWarning( - s"Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " + - s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).") - return false - } - true + assert(!enabled, "Conversion disabled: auron.enable.iceberg.scan=false.") + assert( + !sparkCompatible, + s"Conversion disabled: Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " + + s"Su" + + s"pported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).") + enabled } override def isSupported(exec: SparkPlan): Boolean = { diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 1d9efbc3b..48bfc737f 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -47,36 +47,31 @@ object IcebergScanSupport extends Logging { val scan = exec.scan val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. - if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) { - return None - } + assert(!scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not Iceberg scans.") // Changelog scan carries row-level changes; not supported by native COW-only path. - if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") { - return None - } + assert( + scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan", + "Not Iceberg cow table.") val readSchema = scan.readSchema val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) // Native scan can project file-level metadata columns such as _file via partition values. // Metadata columns that require per-row materialization (for example _pos) still fallback. - if (unsupportedMetadataColumns.nonEmpty) { - return None - } + assert(unsupportedMetadataColumns.nonEmpty, "Has per-row materialization (for example _pos).") val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn)) // Supported metadata columns are materialized via per-file constant values rather than // read from the Iceberg data file payload. val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn)) - if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { - return None - } + assert( + !fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), + "Has Iceberg data file payload.") - if (!partitionSchema.fields.forall(field => - NativeConverters.isTypeSupported(field.dataType))) { - return None - } + assert( + !partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), + "Has unsupported schema type.") val partitions = inputPartitions(exec) // Empty scan (e.g. empty table) should still build a plan to return no rows. @@ -94,27 +89,25 @@ object IcebergScanSupport extends Logging { val icebergPartitions = partitions.flatMap(icebergPartition) // All partitions must be Iceberg SparkInputPartition; otherwise fallback. - if (icebergPartitions.size != partitions.size) { - return None - } + assert( + icebergPartitions.size != partitions.size, + "All partitions must be Iceberg SparkInputPartition.") val fileTasks = icebergPartitions.flatMap(_.fileTasks) // Native scan does not apply delete files; only allow pure data files (COW). - if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) { - return None - } + assert( + !fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty), + "Not Iceberg cow table.") // Native scan handles a single file format; mixed formats must fallback. val formats = fileTasks.map(_.file().format()).distinct - if (formats.size > 1) { - return None - } + assert(formats.size > 1, "Not all data file format is a single file format.") val format = formats.headOption.getOrElse(FileFormat.PARQUET) - if (format != FileFormat.PARQUET && format != FileFormat.ORC) { - return None - } + assert( + format != FileFormat.PARQUET && format != FileFormat.ORC, + "Only support parquet or orc.") val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) Some( From f87a2a3cec6c2c07822d31495c92bd1472931bda Mon Sep 17 00:00:00 2001 From: guihuawen Date: Wed, 6 May 2026 18:23:43 +0800 Subject: [PATCH 2/3] test --- .../apache/spark/sql/auron/NativeHelper.scala | 2 +- .../iceberg/IcebergConvertProvider.scala | 4 ++-- .../auron/iceberg/IcebergScanSupport.scala | 20 ++++++++++--------- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala index e16656471..7a1e34724 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala @@ -74,7 +74,7 @@ object NativeHelper extends Logging { val heapMemory = Runtime.getRuntime.maxMemory() val offheapMemory = totalMemory - heapMemory logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString( - heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") + heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") offheapMemory } diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala index ba352f40e..baa9d69da 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergConvertProvider.scala @@ -30,9 +30,9 @@ class IcebergConvertProvider extends AuronConvertProvider with Logging { override def isEnabled: Boolean = { val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get() - assert(!enabled, "Conversion disabled: auron.enable.iceberg.scan=false.") + assert(enabled, "Conversion disabled: auron.enable.iceberg.scan=false.") assert( - !sparkCompatible, + sparkCompatible, s"Conversion disabled: Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " + s"Su" + s"pported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).") diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 48bfc737f..ba6dbc1e9 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -47,18 +47,20 @@ object IcebergScanSupport extends Logging { val scan = exec.scan val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. - assert(!scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not Iceberg scans.") + assert(scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not Iceberg scans.") // Changelog scan carries row-level changes; not supported by native COW-only path. assert( - scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan", + !(scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan"), "Not Iceberg cow table.") val readSchema = scan.readSchema val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) // Native scan can project file-level metadata columns such as _file via partition values. // Metadata columns that require per-row materialization (for example _pos) still fallback. - assert(unsupportedMetadataColumns.nonEmpty, "Has per-row materialization (for example _pos).") + assert( + !(unsupportedMetadataColumns.nonEmpty), + "Has per-row materialization (for example _pos).") val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn)) // Supported metadata columns are materialized via per-file constant values rather than @@ -66,11 +68,11 @@ object IcebergScanSupport extends Logging { val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn)) assert( - !fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), + fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), "Has Iceberg data file payload.") assert( - !partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), + partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), "Has unsupported schema type.") val partitions = inputPartitions(exec) @@ -90,23 +92,23 @@ object IcebergScanSupport extends Logging { val icebergPartitions = partitions.flatMap(icebergPartition) // All partitions must be Iceberg SparkInputPartition; otherwise fallback. assert( - icebergPartitions.size != partitions.size, + icebergPartitions.size == partitions.size, "All partitions must be Iceberg SparkInputPartition.") val fileTasks = icebergPartitions.flatMap(_.fileTasks) // Native scan does not apply delete files; only allow pure data files (COW). assert( - !fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty), + fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty), "Not Iceberg cow table.") // Native scan handles a single file format; mixed formats must fallback. val formats = fileTasks.map(_.file().format()).distinct - assert(formats.size > 1, "Not all data file format is a single file format.") + assert(!(formats.size > 1), "Not all data file format is a single file format.") val format = formats.headOption.getOrElse(FileFormat.PARQUET) assert( - format != FileFormat.PARQUET && format != FileFormat.ORC, + !(format != FileFormat.PARQUET && format != FileFormat.ORC), "Only support parquet or orc.") val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema) From 8dd34b4f44920fd21a1553ba279fa71398cfd5ec Mon Sep 17 00:00:00 2001 From: guihuawen Date: Wed, 6 May 2026 18:44:47 +0800 Subject: [PATCH 3/3] [AURON #2238] Add support for auron.never.convert.reason in Iceberg scan scenarios --- .../spark/sql/auron/iceberg/IcebergScanSupport.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index ba6dbc1e9..78f42cb10 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -47,12 +47,12 @@ object IcebergScanSupport extends Logging { val scan = exec.scan val scanClassName = scan.getClass.getName // Only handle Iceberg scans; other sources must stay on Spark's path. - assert(scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not Iceberg scans.") + assert(scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not iceberg scans.") // Changelog scan carries row-level changes; not supported by native COW-only path. assert( !(scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan"), - "Not Iceberg cow table.") + "Not iceberg cow table.") val readSchema = scan.readSchema val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) @@ -69,7 +69,7 @@ object IcebergScanSupport extends Logging { assert( fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), - "Has Iceberg data file payload.") + "Has iceberg data file payload.") assert( partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)), @@ -100,7 +100,7 @@ object IcebergScanSupport extends Logging { // Native scan does not apply delete files; only allow pure data files (COW). assert( fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty), - "Not Iceberg cow table.") + "Not iceberg cow table.") // Native scan handles a single file format; mixed formats must fallback. val formats = fileTasks.map(_.file().format()).distinct