Skip to content
Open

test #2237

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,23 +239,33 @@ object AuronConverters extends Logging {

case exec: ForceNativeExecutionWrapperBase => exec
case exec =>
extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match {
case Some(provider) => tryConvert(exec, provider.convert)
case None =>
Shims.get.convertMoreSparkPlan(exec) match {
case Some(exec) =>
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
case None =>
if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader
try {
extConvertProviders.find(h => h.isEnabled && h.isSupported(exec)) match {
case Some(provider) => tryConvert(exec, provider.convert)
case None =>
Shims.get.convertMoreSparkPlan(exec) match {
case Some(exec) =>
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
} else {
addNeverConvertReasonTag(exec)
}
}
case None =>
if (Shims.get.isNative(exec)) { // for QueryStageInput and CustomShuffleReader
exec.setTagValue(convertibleTag, true)
exec.setTagValue(convertStrategyTag, AlwaysConvert)
exec
} else {
addNeverConvertReasonTag(exec)
}
}
}
} catch {
case e @ (_: NotImplementedError | _: AssertionError | _: Exception) =>
exec.setTagValue(convertibleTag, false)
exec.setTagValue(convertStrategyTag, NeverConvert)
exec.setTagValue(
neverConvertReasonTag,
s"Falling back exec: ${exec.getClass.getSimpleName}: ${e.getMessage}")
exec
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions thirdparty/auron-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,13 @@ class IcebergConvertProvider extends AuronConvertProvider with Logging {

override def isEnabled: Boolean = {
val enabled = SparkAuronConfiguration.ENABLE_ICEBERG_SCAN.get()
if (!enabled) {
return false
}
if (!sparkCompatible) {
logWarning(
s"Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " +
s"Supported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).")
return false
}
true
assert(!enabled, "Conversion disabled: auron.enable.iceberg.scan=false.")
assert(
!sparkCompatible,
s"Conversion disabled: Disable Iceberg native scan: Spark $SPARK_VERSION is not supported. " +
s"Su" +
s"pported Spark versions: 3.4 to 4.0 (Iceberg ${icebergVersionOrUnknown}).")
enabled
}

override def isSupported(exec: SparkPlan): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,36 +47,31 @@ object IcebergScanSupport extends Logging {
val scan = exec.scan
val scanClassName = scan.getClass.getName
// Only handle Iceberg scans; other sources must stay on Spark's path.
if (!scanClassName.startsWith("org.apache.iceberg.spark.source.")) {
return None
}
assert(!scanClassName.startsWith("org.apache.iceberg.spark.source."), "Not Iceberg scans.")

// Changelog scan carries row-level changes; not supported by native COW-only path.
if (scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan") {
return None
}
assert(
scanClassName == "org.apache.iceberg.spark.source.SparkChangelogScan",
"Not Iceberg cow table.")

val readSchema = scan.readSchema
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
// Native scan can project file-level metadata columns such as _file via partition values.
// Metadata columns that require per-row materialization (for example _pos) still fallback.
if (unsupportedMetadataColumns.nonEmpty) {
return None
}
assert(unsupportedMetadataColumns.nonEmpty, "Has per-row materialization (for example _pos).")

val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn))
// Supported metadata columns are materialized via per-file constant values rather than
// read from the Iceberg data file payload.
val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn))

if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
return None
}
assert(
!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)),
"Has Iceberg data file payload.")

if (!partitionSchema.fields.forall(field =>
NativeConverters.isTypeSupported(field.dataType))) {
return None
}
assert(
!partitionSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType)),
"Has unsupported schema type.")

val partitions = inputPartitions(exec)
// Empty scan (e.g. empty table) should still build a plan to return no rows.
Expand All @@ -94,27 +89,25 @@ object IcebergScanSupport extends Logging {

val icebergPartitions = partitions.flatMap(icebergPartition)
// All partitions must be Iceberg SparkInputPartition; otherwise fallback.
if (icebergPartitions.size != partitions.size) {
return None
}
assert(
icebergPartitions.size != partitions.size,
"All partitions must be Iceberg SparkInputPartition.")

val fileTasks = icebergPartitions.flatMap(_.fileTasks)

// Native scan does not apply delete files; only allow pure data files (COW).
if (!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty)) {
return None
}
assert(
!fileTasks.forall(task => task.deletes() == null || task.deletes().isEmpty),
"Not Iceberg cow table.")

// Native scan handles a single file format; mixed formats must fallback.
val formats = fileTasks.map(_.file().format()).distinct
if (formats.size > 1) {
return None
}
assert(formats.size > 1, "Not all data file format is a single file format.")

val format = formats.headOption.getOrElse(FileFormat.PARQUET)
if (format != FileFormat.PARQUET && format != FileFormat.ORC) {
return None
}
assert(
format != FileFormat.PARQUET && format != FileFormat.ORC,
"Only support parquet or orc.")

val pruningPredicates = collectPruningPredicates(scan.asInstanceOf[AnyRef], readSchema)
Some(
Expand Down
Loading