Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.sql.types.{BinaryType, DataType, DecimalType, StringType
import org.apache.auron.{protobuf => pb}

// fileSchema is read from the data files. partitionSchema carries supported metadata columns
// (for example _file) that are materialized as per-file constant values in the native scan.
// (for example _file and _spec_id) that are materialized as per-file constant values in
// the native scan.
final case class IcebergScanPlan(
fileTasks: Seq[FileScanTask],
fileFormat: FileFormat,
Expand All @@ -58,7 +59,8 @@ object IcebergScanSupport extends Logging {

val readSchema = scan.readSchema
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
// Native scan can project file-level metadata columns such as _file via partition values.
// Native scan can project file-level metadata columns such as _file and _spec_id
// via partition values.
// Metadata columns that require per-row materialization (for example _pos) still fallback.
if (unsupportedMetadataColumns.nonEmpty) {
return None
Expand Down Expand Up @@ -136,7 +138,8 @@ object IcebergScanSupport extends Logging {
}

private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean =
field.name == MetadataColumns.FILE_PATH.name()
field.name == MetadataColumns.FILE_PATH.name() ||
field.name == MetadataColumns.SPEC_ID.name()

private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
// Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca

private lazy val partitions: Array[FilePartition] = buildFilePartitions()
private lazy val fileSizes: Map[String, Long] = buildFileSizes()
private lazy val fileSpecIds: Map[String, Int] = buildFileSpecIds()

private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema)
private lazy val nativePartitionSchema: pb.Schema =
Expand Down Expand Up @@ -125,6 +126,10 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
field.name match {
case name if name == MetadataColumns.FILE_PATH.name() =>
NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral
case name if name == MetadataColumns.SPEC_ID.name() =>
NativeConverters
.convertExpr(Literal.create(fileSpecIds(filePath), field.dataType))
.getLiteral
Comment on lines +129 to +132
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fileSpecIds(filePath) uses Map.apply, which will throw a NoSuchElementException with little context if the key isn’t present (e.g., due to path normalization differences between Iceberg DataFile.location() and Spark PartitionedFile.filePath.toString). Consider using getOrElse and throwing an IllegalStateException with a clear message, and/or normalizing the key the same way in both buildFileSpecIds and metadataPartitionValues to guarantee consistent lookups.

Copilot uses AI. Check for mistakes.
case name =>
throw new IllegalStateException(
s"unsupported Iceberg metadata column in native scan: $name")
Expand Down Expand Up @@ -221,6 +226,25 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
.toMap
}

private def buildFileSpecIds(): Map[String, Int] = {
// Map file path to Iceberg partition spec id; tasks may split a file into multiple ranges.
val specIds = scala.collection.mutable.HashMap.empty[String, Int]
fileTasks.foreach { task =>
val filePath = task.file().location()
val specId = task.file().specId()
specIds.get(filePath) match {
case Some(existingSpecId) if existingSpecId != specId =>
throw new IllegalStateException(
s"Inconsistent Iceberg partition spec id for file $filePath: " +
s"$existingSpecId != $specId")
case Some(_) =>
case None =>
specIds.put(filePath, specId)
}
}
specIds.toMap
}

private def buildFilePartitions(): Array[FilePartition] = {
// Convert Iceberg file tasks into Spark FilePartition groups for execution.
if (fileTasks.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,20 @@ class AuronIcebergIntegrationSuite
}
}

test("iceberg native scan supports _spec_id metadata column") {
withTable("local.db.t4_spec_id") {
sql("create table local.db.t4_spec_id using iceberg as select 1 as id, 'a' as v")
checkSparkAnswerAndOperator("select _spec_id from local.db.t4_spec_id")
}
}

test("iceberg native scan supports data columns with _file and _spec_id metadata columns") {
withTable("local.db.t4_metadata_mixed") {
sql("create table local.db.t4_metadata_mixed using iceberg as select 1 as id, 'a' as v")
checkSparkAnswerAndOperator("select id, _file, _spec_id from local.db.t4_metadata_mixed")
}
}

test("iceberg native scan supports data columns with _file metadata column") {
withTable("local.db.t4_mixed") {
sql("create table local.db.t4_mixed using iceberg as select 1 as id, 'a' as v")
Expand Down
Loading