diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index d20844efbc0..a0fa396b0c4 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -230,6 +230,11 @@ impl FileOpener for VortexOpener { DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}")) })?); + // Compute the logical file schema by merging physical file types with logical table types. + // This schema has the same field names as logical_schema, but with physical types from the file. + let logical_file_schema = + compute_logical_file_schema(&physical_file_schema, &logical_schema); + if let Some(expr_adapter_factory) = expr_adapter_factory { let partition_values = partition_fields .iter() @@ -241,13 +246,8 @@ impl FileOpener for VortexOpener { // for schema evolution and divergence between the table's schema and individual files. filter = filter .map(|filter| { - let logical_file_schema = compute_logical_file_schema( - &physical_file_schema.clone(), - &logical_schema, - ); - let expr = expr_adapter_factory - .create(logical_file_schema, physical_file_schema.clone()) + .create(logical_file_schema.clone(), physical_file_schema.clone()) .with_partition_values(partition_values) .rewrite(filter)?; @@ -257,59 +257,25 @@ impl FileOpener for VortexOpener { }) .transpose()?; - predicate_file_schema = physical_file_schema.clone(); + predicate_file_schema = physical_file_schema; } - // Create the initial mapping from physical file schema to projected schema. - // This gives us the field reordering and tells us which logical schema fields - // to select. - let (_schema_mapping, adapted_projections) = - schema_adapter.map_schema(&physical_file_schema)?; + // Use the pre-created schema adapter to map logical_file_schema to projected_schema. + // Since logical_file_schema has the same field names as logical_schema (which the adapter + // was created with), this works correctly and gives us the projection indices. + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(&logical_file_schema)?; - // Build the Vortex projection expression using the adapted projections. - // This will reorder the fields to match the target order. + // Build the Vortex projection expression using field names from logical_file_schema let fields = adapted_projections .iter() - .map(|idx| { - let field = logical_schema.field(*idx); + .map(|&idx| { + let field = logical_file_schema.field(idx); FieldName::from(field.name().as_str()) }) .collect::>(); let projection_expr = select(fields, root()); - // After Vortex applies the projection, the batch will have fields in the target - // order (matching adapted_projections), but with the physical file types. - // We need a second schema mapping for type casting only, not reordering. - // Build a schema that represents what Vortex will return: fields in target order - // with physical types. - let projected_physical_fields: Vec = adapted_projections - .iter() - .map(|&idx| { - let logical_field = logical_schema.field(idx); - let field_name = logical_field.name(); - - // Find this field in the physical schema to get its physical type - physical_file_schema - .field_with_name(field_name) - .map(|phys_field| { - Field::new( - field_name, - merge_field_types(phys_field, logical_field), - phys_field.is_nullable(), - ) - }) - .unwrap_or_else(|_| (*logical_field).clone()) - }) - .collect(); - - let projected_physical_schema = - Arc::new(arrow_schema::Schema::new(projected_physical_fields)); - - // Create a second mapping from the projected physical schema (what Vortex returns) - // to the final projected schema. This mapping will handle type casting without reordering. - let (batch_schema_mapping, _) = - schema_adapter.map_schema(&projected_physical_schema)?; - // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once. let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) { Entry::Occupied(mut occupied_entry) => { @@ -404,7 +370,7 @@ impl FileOpener for VortexOpener { )))) }) .try_flatten() - .map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b))) + .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) .boxed(); Ok(stream) @@ -870,4 +836,81 @@ mod tests { Ok(()) } + + #[tokio::test] + // Minimal reproducing test for the schema projection bug. + // Before the fix, this would fail with a cast error when the file schema + // and table schema have different field orders and we project a subset of columns. + async fn test_projection_bug_minimal_repro() -> anyhow::Result<()> { + let object_store = Arc::new(InMemory::new()) as Arc; + let file_path = "/path/file.vortex"; + + // File has columns in order: a, b, c with simple types + let batch = record_batch!( + ("a", Int32, vec![Some(1)]), + ("b", Utf8, vec![Some("test")]), + ("c", Int32, vec![Some(2)]) + ) + .unwrap(); + let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?; + + // Table schema has columns in DIFFERENT order: c, a, b + // and different types that require casting (Utf8 -> Dictionary) + let table_schema = Arc::new(Schema::new(vec![ + Field::new("c", DataType::Int32, true), + Field::new("a", DataType::Int32, true), + Field::new( + "b", + DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + true, + ), + ])); + + // Project columns [0, 2] from table schema, which should give us: c, b + // Before the fix, the schema adapter would get confused about which fields + // to select from the file, causing incorrect type mappings. + let projection = vec![0, 2]; + + let opener = VortexOpener { + session: SESSION.clone(), + object_store: object_store.clone(), + projection: Some(projection.into()), + filter: None, + file_pruning_predicate: None, + expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), + schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), + partition_fields: vec![], + file_cache: VortexFileCache::new(1, 1, SESSION.clone()), + logical_schema: table_schema.clone(), + batch_size: 100, + limit: None, + metrics: Default::default(), + layout_readers: Default::default(), + has_output_ordering: false, + }; + + // This should succeed and return the correctly projected and cast data + let data = opener + .open( + make_meta(file_path, data_size), + PartitionedFile::new(file_path.to_string(), data_size), + )? + .await? + .try_collect::>() + .await?; + + // Verify the columns are in the right order and have the right values + use datafusion::arrow::util::pretty::pretty_format_batches_with_options; + let format_opts = FormatOptions::new().with_types_info(true); + assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" + +-------+--------------------------+ + | c | b | + | Int32 | Dictionary(UInt32, Utf8) | + +-------+--------------------------+ + | 2 | test | + +-------+--------------------------+ + "); + + Ok(()) + } }