Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 93 additions & 50 deletions vortex-datafusion/src/persistent/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ impl FileOpener for VortexOpener {
DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}"))
})?);

// Compute the logical file schema by merging physical file types with logical table types.
// This schema has the same field names as logical_schema, but with physical types from the file.
let logical_file_schema =
compute_logical_file_schema(&physical_file_schema, &logical_schema);

if let Some(expr_adapter_factory) = expr_adapter_factory {
let partition_values = partition_fields
.iter()
Expand All @@ -241,13 +246,8 @@ impl FileOpener for VortexOpener {
// for schema evolution and divergence between the table's schema and individual files.
filter = filter
.map(|filter| {
let logical_file_schema = compute_logical_file_schema(
&physical_file_schema.clone(),
&logical_schema,
);

let expr = expr_adapter_factory
.create(logical_file_schema, physical_file_schema.clone())
.create(logical_file_schema.clone(), physical_file_schema.clone())
.with_partition_values(partition_values)
.rewrite(filter)?;

Expand All @@ -257,59 +257,25 @@ impl FileOpener for VortexOpener {
})
.transpose()?;

predicate_file_schema = physical_file_schema.clone();
predicate_file_schema = physical_file_schema;
}

// Create the initial mapping from physical file schema to projected schema.
// This gives us the field reordering and tells us which logical schema fields
// to select.
let (_schema_mapping, adapted_projections) =
schema_adapter.map_schema(&physical_file_schema)?;
// Use the pre-created schema adapter to map logical_file_schema to projected_schema.
// Since logical_file_schema has the same field names as logical_schema (which the adapter
// was created with), this works correctly and gives us the projection indices.
let (schema_mapping, adapted_projections) =
schema_adapter.map_schema(&logical_file_schema)?;

// Build the Vortex projection expression using the adapted projections.
// This will reorder the fields to match the target order.
// Build the Vortex projection expression using field names from logical_file_schema
let fields = adapted_projections
.iter()
.map(|idx| {
let field = logical_schema.field(*idx);
.map(|&idx| {
let field = logical_file_schema.field(idx);
FieldName::from(field.name().as_str())
})
.collect::<Vec<_>>();
let projection_expr = select(fields, root());

// After Vortex applies the projection, the batch will have fields in the target
// order (matching adapted_projections), but with the physical file types.
// We need a second schema mapping for type casting only, not reordering.
// Build a schema that represents what Vortex will return: fields in target order
// with physical types.
let projected_physical_fields: Vec<Field> = adapted_projections
.iter()
.map(|&idx| {
let logical_field = logical_schema.field(idx);
let field_name = logical_field.name();

// Find this field in the physical schema to get its physical type
physical_file_schema
.field_with_name(field_name)
.map(|phys_field| {
Field::new(
field_name,
merge_field_types(phys_field, logical_field),
phys_field.is_nullable(),
)
})
.unwrap_or_else(|_| (*logical_field).clone())
})
.collect();

let projected_physical_schema =
Arc::new(arrow_schema::Schema::new(projected_physical_fields));

// Create a second mapping from the projected physical schema (what Vortex returns)
// to the final projected schema. This mapping will handle type casting without reordering.
let (batch_schema_mapping, _) =
schema_adapter.map_schema(&projected_physical_schema)?;

// We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
Entry::Occupied(mut occupied_entry) => {
Expand Down Expand Up @@ -404,7 +370,7 @@ impl FileOpener for VortexOpener {
))))
})
.try_flatten()
.map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b)))
.map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b)))
.boxed();

Ok(stream)
Expand Down Expand Up @@ -870,4 +836,81 @@ mod tests {

Ok(())
}

#[tokio::test]
// Minimal reproducing test for the schema projection bug.
// Before the fix, this would fail with a cast error when the file schema
// and table schema have different field orders and we project a subset of columns.
async fn test_projection_bug_minimal_repro() -> anyhow::Result<()> {
let object_store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
let file_path = "/path/file.vortex";

// File has columns in order: a, b, c with simple types
let batch = record_batch!(
("a", Int32, vec![Some(1)]),
("b", Utf8, vec![Some("test")]),
("c", Int32, vec![Some(2)])
)
.unwrap();
let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?;

// Table schema has columns in DIFFERENT order: c, a, b
// and different types that require casting (Utf8 -> Dictionary)
let table_schema = Arc::new(Schema::new(vec![
Field::new("c", DataType::Int32, true),
Field::new("a", DataType::Int32, true),
Field::new(
"b",
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
true,
),
]));

// Project columns [0, 2] from table schema, which should give us: c, b
// Before the fix, the schema adapter would get confused about which fields
// to select from the file, causing incorrect type mappings.
let projection = vec![0, 2];

let opener = VortexOpener {
session: SESSION.clone(),
object_store: object_store.clone(),
projection: Some(projection.into()),
filter: None,
file_pruning_predicate: None,
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
partition_fields: vec![],
file_cache: VortexFileCache::new(1, 1, SESSION.clone()),
logical_schema: table_schema.clone(),
batch_size: 100,
limit: None,
metrics: Default::default(),
layout_readers: Default::default(),
has_output_ordering: false,
};

// This should succeed and return the correctly projected and cast data
let data = opener
.open(
make_meta(file_path, data_size),
PartitionedFile::new(file_path.to_string(), data_size),
)?
.await?
.try_collect::<Vec<_>>()
.await?;

// Verify the columns are in the right order and have the right values
use datafusion::arrow::util::pretty::pretty_format_batches_with_options;
let format_opts = FormatOptions::new().with_types_info(true);
assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r"
+-------+--------------------------+
| c | b |
| Int32 | Dictionary(UInt32, Utf8) |
+-------+--------------------------+
| 2 | test |
+-------+--------------------------+
");

Ok(())
}
}
Loading