From 4a309d4c12e3f5a904df8d2d8a7a5dfa185c98fb Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 7 May 2026 22:10:09 +0530 Subject: [PATCH 1/3] feat(arrow): route reader-supplied virtual metadata columns through RecordBatchTransformer (#1765) --- crates/iceberg/src/arrow/reader/mod.rs | 1 + .../src/arrow/reader/virtual_columns.rs | 110 ++++++++++++++++++ .../src/arrow/record_batch_transformer.rs | 44 ++++++- crates/iceberg/src/metadata_columns.rs | 6 + 4 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 crates/iceberg/src/arrow/reader/virtual_columns.rs diff --git a/crates/iceberg/src/arrow/reader/mod.rs b/crates/iceberg/src/arrow/reader/mod.rs index c6c41accb7..5badd80d22 100644 --- a/crates/iceberg/src/arrow/reader/mod.rs +++ b/crates/iceberg/src/arrow/reader/mod.rs @@ -40,6 +40,7 @@ mod positional_deletes; mod predicate_visitor; mod projection; mod row_filter; +mod virtual_columns; pub use file_reader::ArrowFileReader; pub(crate) use options::ParquetReadOptions; use predicate_visitor::{CollectFieldIdVisitor, PredicateConverter}; diff --git a/crates/iceberg/src/arrow/reader/virtual_columns.rs b/crates/iceberg/src/arrow/reader/virtual_columns.rs new file mode 100644 index 0000000000..55fc616369 --- /dev/null +++ b/crates/iceberg/src/arrow/reader/virtual_columns.rs @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Maps Iceberg reserved metadata field ids to Apache Arrow virtual columns +//! consumed via [`parquet::arrow::arrow_reader::ArrowReaderOptions::with_virtual_columns`]. +//! +//! Today only `_pos` is wired (mapped to the `RowNumber` extension type). + +use std::collections::HashMap; +use std::sync::Arc; + +use arrow_schema::{DataType, Field, FieldRef}; +use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, RowNumber}; + +use crate::Result; +use crate::error::Error; +use crate::metadata_columns::{ + RESERVED_COL_NAME_POS, RESERVED_FIELD_ID_POS, is_reader_supplied_metadata_field, +}; + +/// Returns the Arrow virtual columns to request for the given projected +/// Iceberg field ids; empty when none are reader-supplied. +pub(crate) fn collect_arrow_virtual_columns(project_field_ids: &[i32]) -> Result> { + project_field_ids + .iter() + .copied() + .filter(|id| is_reader_supplied_metadata_field(*id)) + .map(iceberg_metadata_field_to_arrow_virtual) + .collect() +} + +/// Builds the Arrow `Field` for a single reader-supplied metadata field id. +/// +/// The `PARQUET:field_id` metadata key lets `RecordBatchTransformer` route +/// the column by id; the extension type makes arrow-rs accept it as a +/// virtual column. +fn iceberg_metadata_field_to_arrow_virtual(field_id: i32) -> Result { + let metadata = HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), field_id.to_string())]); + + let mut field = match field_id { + RESERVED_FIELD_ID_POS => { + Field::new(RESERVED_COL_NAME_POS, DataType::Int64, false).with_metadata(metadata) + } + _ => { + return Err(Error::new( + crate::ErrorKind::Unexpected, + format!( + "Iceberg metadata field id {field_id} is not produced by the Parquet reader" + ), + )); + } + }; + + if field_id == RESERVED_FIELD_ID_POS { + field.try_with_extension_type(RowNumber)?; + } + + Ok(Arc::new(field)) +} + +#[cfg(test)] +mod tests { + use parquet::arrow::is_virtual_column; + + use super::*; + use crate::metadata_columns::RESERVED_FIELD_ID_FILE; + + #[test] + fn collect_returns_empty_for_no_metadata_fields() { + let virtuals = collect_arrow_virtual_columns(&[1, 2, 3]).unwrap(); + assert!(virtuals.is_empty()); + } + + #[test] + fn collect_returns_pos_field_when_requested() { + let virtuals = collect_arrow_virtual_columns(&[1, RESERVED_FIELD_ID_POS, 2]).unwrap(); + assert_eq!(virtuals.len(), 1); + + let pos = &virtuals[0]; + assert_eq!(pos.name(), RESERVED_COL_NAME_POS); + assert_eq!(pos.data_type(), &DataType::Int64); + assert!(!pos.is_nullable()); + assert!(is_virtual_column(pos)); + assert_eq!( + pos.metadata().get(PARQUET_FIELD_ID_META_KEY), + Some(&RESERVED_FIELD_ID_POS.to_string()), + ); + } + + #[test] + fn collect_skips_constant_metadata_fields() { + // `_file` is a metadata column but is added as a constant, not by the reader. + let virtuals = collect_arrow_virtual_columns(&[RESERVED_FIELD_ID_FILE, 1]).unwrap(); + assert!(virtuals.is_empty()); + } +} diff --git a/crates/iceberg/src/arrow/record_batch_transformer.rs b/crates/iceberg/src/arrow/record_batch_transformer.rs index 439358435c..0d521c5685 100644 --- a/crates/iceberg/src/arrow/record_batch_transformer.rs +++ b/crates/iceberg/src/arrow/record_batch_transformer.rs @@ -28,8 +28,8 @@ use arrow_schema::{ use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::arrow::value::{create_primitive_array_repeated, create_primitive_array_single_element}; -use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema}; -use crate::metadata_columns::get_metadata_field; +use crate::arrow::{datum_to_arrow_type_with_ree, schema_to_arrow_schema, type_to_arrow_type}; +use crate::metadata_columns::{get_metadata_field, is_reader_supplied_metadata_field}; use crate::spec::{ Datum, Literal, PartitionSpec, PrimitiveLiteral, Schema as IcebergSchema, Struct, Transform, }; @@ -394,6 +394,19 @@ impl RecordBatchTransformer { .with_metadata(field.metadata().clone()); Ok(Arc::new(constant_field)) } + } else if is_reader_supplied_metadata_field(*field_id) { + // Reader-supplied virtual field (e.g. `_pos`) is absent from + // the snapshot schema; build its target field from the + // iceberg metadata-column definition. + let iceberg_field = get_metadata_field(*field_id)?; + let arrow_type = type_to_arrow_type(&iceberg_field.field_type)?; + let arrow_field = + Field::new(&iceberg_field.name, arrow_type, !iceberg_field.required) + .with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + iceberg_field.id.to_string(), + )])); + Ok(Arc::new(arrow_field)) } else { // Regular field - use schema as-is Ok(field_id_to_mapped_schema_map @@ -456,6 +469,16 @@ impl RecordBatchTransformer { return SchemaComparison::Different; } + // Same type at the same position with different field ids means + // the columns are actually reordered; force the field-id-aware path. + if let (Some(source_id), Some(target_id)) = ( + source_field.metadata().get(PARQUET_FIELD_ID_META_KEY), + target_field.metadata().get(PARQUET_FIELD_ID_META_KEY), + ) && source_id != target_id + { + return SchemaComparison::Different; + } + if source_field.name() != target_field.name() { names_changed = true; } @@ -493,6 +516,23 @@ impl RecordBatchTransformer { }); } + if is_reader_supplied_metadata_field(*field_id) { + // Routed by field id from the source batch (parquet virtual column). + let (_, source_index) = field_id_to_source_schema_map + .get(field_id) + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!( + "reader-supplied virtual field id {field_id} not found in source batch" + ), + ) + })?; + return Ok(ColumnSource::PassThrough { + source_index: *source_index, + }); + } + let (target_field, _) = field_id_to_mapped_schema_map .get(field_id) diff --git a/crates/iceberg/src/metadata_columns.rs b/crates/iceberg/src/metadata_columns.rs index b622a76edc..f91b183d78 100644 --- a/crates/iceberg/src/metadata_columns.rs +++ b/crates/iceberg/src/metadata_columns.rs @@ -453,6 +453,12 @@ pub fn get_metadata_field_id(column_name: &str) -> Result { } } +/// Returns `true` if the metadata field is produced by the file reader +/// (e.g. `_pos` via Parquet virtual columns) rather than added as a constant. +pub fn is_reader_supplied_metadata_field(field_id: i32) -> bool { + matches!(field_id, RESERVED_FIELD_ID_POS) +} + /// Checks if a field ID is a metadata field. /// /// # Arguments From e86fdd392c01a92c84e8c062c3c7a93d6056bd5a Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 7 May 2026 22:10:09 +0530 Subject: [PATCH 2/3] feat(arrow): wire `_pos` through the parquet read pipeline (#1765) --- crates/iceberg/src/arrow/reader/pipeline.rs | 80 ++++++++++++------- crates/iceberg/src/arrow/reader/projection.rs | 3 +- 2 files changed, 52 insertions(+), 31 deletions(-) diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 8ecee294c4..3f73dbeb20 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -27,6 +27,7 @@ use futures::{StreamExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder}; +use super::virtual_columns::collect_arrow_virtual_columns; use super::{ ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema, apply_name_mapping_to_arrow_schema, @@ -133,6 +134,9 @@ impl FileScanTaskReader { .next() .is_some_and(|f| f.metadata().get(PARQUET_FIELD_ID_META_KEY).is_none()); + // Reader-supplied metadata columns (e.g. `_pos`) requested via projection. + let virtual_columns = collect_arrow_virtual_columns(task.project_field_ids())?; + // Three-branch schema resolution strategy matching Java's ReadConf constructor // // Per Iceberg spec Column Projection rules: @@ -148,44 +152,60 @@ impl FileScanTaskReader { // - Branch 1: hasIds(fileSchema) → trust embedded field IDs, use pruneColumns() // - Branch 2: nameMapping present → applyNameMapping(), then pruneColumns() // - Branch 3: fallback → addFallbackIds(), then pruneColumnsFallback() - let arrow_metadata = if missing_field_ids { - // Parquet file lacks field IDs - must assign them before reading - let arrow_schema = if let Some(name_mapping) = &task.name_mapping { - // Branch 2: Apply name mapping to assign correct Iceberg field IDs - // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id - // to columns without field id" - // Corresponds to Java's ParquetSchemaUtil.applyNameMapping() - apply_name_mapping_to_arrow_schema( - Arc::clone(arrow_metadata.schema()), - name_mapping, - )? + let arrow_metadata = + if missing_field_ids { + // Parquet file lacks field IDs - must assign them before reading + let arrow_schema = if let Some(name_mapping) = &task.name_mapping { + // Branch 2: Apply name mapping to assign correct Iceberg field IDs + // Per spec rule #2: "Use schema.name-mapping.default metadata to map field id + // to columns without field id" + // Corresponds to Java's ParquetSchemaUtil.applyNameMapping() + apply_name_mapping_to_arrow_schema( + Arc::clone(arrow_metadata.schema()), + name_mapping, + )? + } else { + // Branch 3: No name mapping - use position-based fallback IDs + // Corresponds to Java's ParquetSchemaUtil.addFallbackIds() + add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema()) + }; + + let options = ArrowReaderOptions::new() + .with_schema(arrow_schema) + .with_virtual_columns(virtual_columns.clone())?; + ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create ArrowReaderMetadata with field ID schema", + ) + .with_source(e) + })? + } else if !virtual_columns.is_empty() { + // Branch 1 with virtual columns: rebuild metadata so arrow-rs emits them. + let options = + ArrowReaderOptions::new().with_virtual_columns(virtual_columns.clone())?; + ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options) + .map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to create ArrowReaderMetadata with virtual columns", + ) + .with_source(e) + })? } else { - // Branch 3: No name mapping - use position-based fallback IDs - // Corresponds to Java's ParquetSchemaUtil.addFallbackIds() - add_fallback_field_ids_to_arrow_schema(arrow_metadata.schema()) + // Branch 1: File has embedded field IDs - trust them + arrow_metadata }; - let options = ArrowReaderOptions::new().with_schema(arrow_schema); - ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err( - |e| { - Error::new( - ErrorKind::Unexpected, - "Failed to create ArrowReaderMetadata with field ID schema", - ) - .with_source(e) - }, - )? - } else { - // Branch 1: File has embedded field IDs - trust them - arrow_metadata - }; - // Coerce INT96 timestamp columns to the resolution specified by the Iceberg schema. // This must happen before building the stream reader to avoid i64 overflow in arrow-rs. let arrow_metadata = if let Some(coerced_schema) = coerce_int96_timestamps(arrow_metadata.schema(), &task.schema) { - let options = ArrowReaderOptions::new().with_schema(Arc::clone(&coerced_schema)); + let options = ArrowReaderOptions::new() + .with_schema(Arc::clone(&coerced_schema)) + .with_virtual_columns(virtual_columns.clone())?; ArrowReaderMetadata::try_new(Arc::clone(arrow_metadata.metadata()), options).map_err( |e| { Error::new( diff --git a/crates/iceberg/src/arrow/reader/projection.rs b/crates/iceberg/src/arrow/reader/projection.rs index deae027e14..af83226518 100644 --- a/crates/iceberg/src/arrow/reader/projection.rs +++ b/crates/iceberg/src/arrow/reader/projection.rs @@ -229,7 +229,8 @@ impl ArrowReader { if parquet_pos < parquet_root_fields.len() { root_indices.push(parquet_pos); } - // RecordBatchTransformer adds missing columns with NULL values + // Missing columns are filled post-read by RecordBatchTransformer + // (with NULLs) or by the parquet reader's virtual-columns mechanism. } if root_indices.is_empty() { From a2b88868eb261b3c9a35841cf61fc50e6ebb3aed Mon Sep 17 00:00:00 2001 From: shekharrajak Date: Thu, 7 May 2026 22:10:10 +0530 Subject: [PATCH 3/3] test(scan): cover `_pos` virtual column end-to-end (#1765) --- crates/iceberg/src/scan/mod.rs | 138 ++++++++++++++++++++++++++++++++- 1 file changed, 137 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 27f479183a..718a8c5b0a 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -592,7 +592,7 @@ pub mod tests { use crate::arrow::ArrowReaderBuilder; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; - use crate::metadata_columns::RESERVED_COL_NAME_FILE; + use crate::metadata_columns::{RESERVED_COL_NAME_FILE, RESERVED_COL_NAME_POS}; use crate::scan::FileScanTask; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, ManifestEntry, @@ -2162,6 +2162,142 @@ pub mod tests { ); } + /// Collects `_pos` values from every batch as a sorted multiset. + fn collect_sorted_pos(batches: &[RecordBatch]) -> Vec { + let mut values: Vec = batches + .iter() + .flat_map(|b| { + let pos_col = b.column_by_name(RESERVED_COL_NAME_POS).unwrap(); + let int64 = pos_col.as_any().downcast_ref::().unwrap(); + (0..int64.len()).map(|i| int64.value(i)).collect::>() + }) + .collect(); + values.sort(); + values + } + + #[tokio::test] + async fn test_select_pos_column_basic() { + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let table_scan = fixture + .table + .scan() + .select(["x", RESERVED_COL_NAME_POS, "z"]) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batches: Vec<_> = table_scan + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), "x"); + assert_eq!(schema.field(1).name(), RESERVED_COL_NAME_POS); + assert_eq!(schema.field(2).name(), "z"); + assert!(matches!( + schema.field(1).data_type(), + arrow_schema::DataType::Int64 + )); + assert!(!schema.field(1).is_nullable()); + + // Two 1024-row files; expect each position 0..1024 twice. + let mut expected: Vec = (0..1024).chain(0..1024).collect(); + expected.sort(); + assert_eq!(collect_sorted_pos(&batches), expected); + } + + #[tokio::test] + async fn test_pos_column_with_predicate_pushdown() { + // y is [2; 512] + [3; 200] + [4; 300] + [5; 12]; `y < 3` keeps + // positions 0..511. Retained `_pos` values must be original file + // positions, not 0..N/2. + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let table_scan = fixture + .table + .scan() + .select(["y", RESERVED_COL_NAME_POS]) + .with_filter(Reference::new("y").less_than(Datum::long(3))) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batches: Vec<_> = table_scan + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let mut expected: Vec = (0..512).chain(0..512).collect(); + expected.sort(); + assert_eq!( + collect_sorted_pos(&batches), + expected, + "_pos must reflect original-file row indices, not post-filter indices" + ); + + for batch in &batches { + let y = batch + .column_by_name("y") + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..y.len() { + assert_eq!(y.value(i), 2); + } + } + } + + #[tokio::test] + async fn test_pos_column_with_row_selection_preserves_file_positions() { + // `y >= 5` keeps positions 1012..1023 (last 12 rows). Exercises the + // RowSelection path used by positional deletes: `_pos` must survive + // a contiguous-prefix skip and coexist with `_file`. + let mut fixture = TableTestFixture::new(); + fixture.setup_manifest_files().await; + + let table_scan = fixture + .table + .scan() + .select([RESERVED_COL_NAME_POS, "y", RESERVED_COL_NAME_FILE]) + .with_filter(Reference::new("y").greater_than_or_equal_to(Datum::long(5))) + .with_row_selection_enabled(true) + .build() + .unwrap(); + + let batches: Vec<_> = table_scan + .to_arrow() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + let schema = batches[0].schema(); + assert_eq!(schema.field(0).name(), RESERVED_COL_NAME_POS); + assert_eq!(schema.field(1).name(), "y"); + assert_eq!(schema.field(2).name(), RESERVED_COL_NAME_FILE); + + let mut expected: Vec = (1012..1024).chain(1012..1024).collect(); + expected.sort(); + assert_eq!( + collect_sorted_pos(&batches), + expected, + "_pos must survive row-selection-based skipping with original positions" + ); + } + #[tokio::test] async fn test_scan_deadlock() { let mut fixture = TableTestFixture::new();