Skip to content

bug: native Iceberg reader errors on residual filter on column after nested type for migrated Parquet files #3860

@mbutrovich

Description

@mbutrovich

Describe the bug

iceberg-rust's PredicateConverter fails with "Leave column id in predicates isn't a root column in Parquet schema" when all three conditions are met:

  1. Migrated table — Parquet files were written by Spark without Iceberg field IDs, then imported via SparkTableUtil.importSparkTable()
  2. Nested types in the schema (struct, array, or map)
  3. Filter predicate on a root column that appears after nested types in the column ordering

Root Cause

The column_map in iceberg-rust's PredicateConverter (crates/iceberg/src/arrow/reader.rs:1609) maps Iceberg field IDs to Parquet leaf column indices. For migrated files (no embedded Iceberg field IDs), iceberg-rust falls back to name-based mapping. This mapping produces incorrect leaf indices when nested types are present, causing a flat column like id to be mapped to a leaf index inside a group (struct/map/array).

The check at reader.rs:1622:

if self.parquet_schema.get_column_root(*column_idx).is_group() {
    return Err(...)
}

then fails because get_column_root() returns the enclosing group rather than id itself.

Note: Tables created directly through Iceberg (which embed field IDs in Parquet metadata) are NOT affected. The bug is specific to the name-mapping fallback path used for migrated files.

Steps to reproduce

test("filter with nested types in migrated table") {
  assume(icebergAvailable, "Iceberg not available in classpath")

  withTempIcebergDir { warehouseDir =>
    withSQLConf(
      "spark.sql.catalog.test_cat" -> "org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.test_cat.type" -> "hadoop",
      "spark.sql.catalog.test_cat.warehouse" -> warehouseDir.getAbsolutePath,
      CometConf.COMET_ENABLED.key -> "true",
      CometConf.COMET_EXEC_ENABLED.key -> "true",
      CometConf.COMET_ICEBERG_NATIVE_ENABLED.key -> "true") {

      val dataPath = s"${warehouseDir.getAbsolutePath}/nested_data"

      // Write Parquet WITHOUT Iceberg (simulates pre-migration data)
      // id is last so its leaf index is after all nested type leaves
      spark.sql(s"""
        SELECT
          named_struct('age', id * 10, 'score', id * 1.5) AS info,
          array(id, id + 1) AS tags,
          map('key', id) AS props,
          id
        FROM range(10)
      """).write.parquet(dataPath)

      spark.sql("CREATE NAMESPACE IF NOT EXISTS test_cat.db")
      spark.sql(s"""
        CREATE TABLE test_cat.db.nested_migrate (
          info STRUCT<age: BIGINT, score: DOUBLE>,
          tags ARRAY<BIGINT>,
          props MAP<STRING, BIGINT>,
          id BIGINT
        ) USING iceberg
      """)

      try {
        val tableUtilClass = Class.forName("org.apache.iceberg.spark.SparkTableUtil")
        val sparkCatalog = spark.sessionState.catalogManager
          .catalog("test_cat")
          .asInstanceOf[org.apache.iceberg.spark.SparkCatalog]
        val ident =
          org.apache.spark.sql.connector.catalog.Identifier.of(Array("db"), "nested_migrate")
        val sparkTable = sparkCatalog
          .loadTable(ident)
          .asInstanceOf[org.apache.iceberg.spark.source.SparkTable]
        val table = sparkTable.table()

        val stagingDir = s"${warehouseDir.getAbsolutePath}/staging"
        spark.sql(s"""CREATE TABLE parquet_temp USING parquet LOCATION '$dataPath'""")
        val sourceIdent = new org.apache.spark.sql.catalyst.TableIdentifier("parquet_temp")

        val importMethod = tableUtilClass.getMethod(
          "importSparkTable",
          classOf[org.apache.spark.sql.SparkSession],
          classOf[org.apache.spark.sql.catalyst.TableIdentifier],
          classOf[org.apache.iceberg.Table],
          classOf[String])
        importMethod.invoke(null, spark, sourceIdent, table, stagingDir)

        // Select only flat columns to avoid Spark's Iceberg reader returning
        // null for struct fields in migrated tables (separate Spark bug)
        checkIcebergNativeScan(
          "SELECT id FROM test_cat.db.nested_migrate ORDER BY id")

        // Filter on root column with nested types in migrated table:
        // Parquet files lack Iceberg field IDs, so iceberg-rust falls back to
        // name mapping where column_map resolution is broken for nested types
        checkIcebergNativeScan(
          "SELECT id FROM test_cat.db.nested_migrate WHERE id > 5 ORDER BY id")

        spark.sql("DROP TABLE test_cat.db.nested_migrate")
        spark.sql("DROP TABLE parquet_temp")
      } catch {
        case _: ClassNotFoundException =>
          cancel("SparkTableUtil not available")
      }
    }
  }
}

Expected behavior

Queries with filter predicates should work on migrated tables regardless of whether the schema contains nested types.

Additional context

  • The residual predicate is serialized by Comet from the Iceberg `FileScanTask.residual()`` and sent to iceberg-rust for row-group pruning.
  • Without the predicate, the scan works fine — the filter is still applied post-scan by CometFilter.
  • Column ordering matters: if the filtered column appears before all nested types in the schema, the bug does not trigger (the leaf index happens to be correct).

Possible Comet workaround

Skip serializing residual predicates when the table schema contains nested types. This is safe since the filter is applied post-scan anyway, with loss of row-group pruning efficiency.

Related

Metadata

Metadata

Assignees

Labels

area:scanParquet scan / data readingcrashNative engine crash/panic/segfaultpriority:highCrashes, panics, segfaults, major functional breakage

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions