From 0a336bdf4d1d41c49bffdc3cdbde14b48f759ef7 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 12:36:38 -0500 Subject: [PATCH 1/9] add test to demonstrate schema evolution fail Signed-off-by: Andrew Duffy --- Cargo.lock | 1 + vortex-datafusion/Cargo.toml | 1 + vortex-datafusion/tests/schema_evolution.rs | 149 ++++++++++++++++++++ 3 files changed, 151 insertions(+) create mode 100644 vortex-datafusion/tests/schema_evolution.rs diff --git a/Cargo.lock b/Cargo.lock index 64327fb86ac..4a8a6424c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8554,6 +8554,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "url", "vortex", "vortex-utils", "walkdir", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index c929b1befc1..03bb7491157 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -37,6 +37,7 @@ object_store = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } tokio-stream = { workspace = true } tracing = { workspace = true } +url = { workspace = true } vortex = { workspace = true, features = ["object_store", "tokio"] } vortex-utils = { workspace = true, features = ["dashmap"] } diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs new file mode 100644 index 00000000000..e18979d9721 --- /dev/null +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -0,0 +1,149 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Test that checks we can evolve schemas in a cmpatible way across files. + +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::array::StringViewArray; +use datafusion::arrow::compute::concat_batches; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::execution::SessionStateBuilder; +use datafusion::execution::context::SessionContext; +use datafusion_common::arrow::array::{ArrayRef as ArrowArrayRef, RecordBatch}; +use datafusion_common::record_batch; +use datafusion_datasource::ListingTableUrl; +use datafusion_expr::col; +use object_store::ObjectStore; +use object_store::memory::InMemory; +use object_store::path::Path; +use std::sync::{Arc, LazyLock}; +use url::Url; +use vortex::arrow::FromArrowArray; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::{ObjectStoreWriter, VortexWrite}; +use vortex::session::VortexSession; +use vortex::{ArrayRef, VortexSessionDefault}; +use vortex_datafusion::{VortexFormat, VortexFormatFactory}; + +static SESSION: LazyLock = LazyLock::new(|| VortexSession::default()); + +fn register_vortex_format_factory( + factory: VortexFormatFactory, + session_state_builder: &mut SessionStateBuilder, +) { + if let Some(table_factories) = session_state_builder.table_factories() { + table_factories.insert( + datafusion::common::GetExt::get_ext(&factory).to_uppercase(), // Has to be uppercase + Arc::new(datafusion::datasource::provider::DefaultTableFactory::new()), + ); + } + + if let Some(file_formats) = session_state_builder.file_formats() { + file_formats.push(Arc::new(factory)); + } +} + +fn make_session_ctx() -> (SessionContext, Arc) { + let factory: VortexFormatFactory = VortexFormatFactory::new(); + let mut session_state_builder = SessionStateBuilder::new().with_default_features(); + register_vortex_format_factory(factory, &mut session_state_builder); + let ctx = SessionContext::new_with_state(session_state_builder.build()); + let store = Arc::new(InMemory::new()); + ctx.register_object_store(&Url::parse("s3://in-memory/").unwrap(), store.clone()); + + (ctx, store) +} + +async fn write_file(store: &Arc, path: &str, records: &RecordBatch) { + let array = ArrayRef::from_arrow(records, false); + let path = Path::from_url_path(path).unwrap(); + let mut write = ObjectStoreWriter::new(store.clone(), &path).await.unwrap(); + SESSION + .write_options() + .write(&mut write, array.to_array_stream()) + .await + .unwrap(); + write.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_filter_with_schema_evolution() { + let (ctx, store) = make_session_ctx(); + + // file1 only contains field "a" + write_file( + &store, + "files/file1.vortex", + &record_batch!(("a", Utf8, vec![Some("one"), Some("two"), Some("three")])).unwrap(), + ) + .await; + + // file2 only contains field "b" + write_file( + &store, + "files/file2.vortex", + &record_batch!(("b", Utf8, vec![Some("four"), Some("five"), Some("six")])).unwrap(), + ) + .await; + + // Read the table back as Vortex + let table_url = ListingTableUrl::parse("s3://in-memory/files").unwrap(); + let list_opts = ListingOptions::new(Arc::new(VortexFormat::new(SESSION.clone()))) + .with_session_config_options(ctx.state().config()) + .with_file_extension("vortex"); + + let table = ListingTable::try_new( + ListingTableConfig::new(table_url) + .with_listing_options(list_opts) + .infer_schema(&ctx.state()) + .await + .unwrap(), + ) + .unwrap(); + + let table = Arc::new(table); + + let df = ctx.read_table(table).unwrap(); + + let table_schema = Arc::new(df.schema().as_arrow().clone()); + + assert_eq!( + table_schema.as_ref(), + &Schema::new(vec![ + Field::new("a", DataType::Utf8View, true), + Field::new("b", DataType::Utf8View, true), + ]) + ); + + // Filter the result to only the values from the first file + let result = df + .filter(col("a").is_not_null()) + .unwrap() + .collect() + .await + .unwrap(); + let table = concat_batches(&table_schema, result.iter()).unwrap(); + + // We read back the full table, with nulls filled in for missing fields + assert_eq!( + table, + record_batch( + &table_schema, + vec![ + Arc::new(StringViewArray::from(vec![ + Some("one"), + Some("two"), + Some("three"), + ])) as ArrowArrayRef, + Arc::new(StringViewArray::from(vec![Option::<&str>::None, None, None])) as ArrowArrayRef, + ] + ) + ); +} + +fn record_batch( + schema: &SchemaRef, + fields: impl IntoIterator, +) -> RecordBatch { + RecordBatch::try_new(schema.clone(), fields.into_iter().collect()).unwrap() +} From e549411e2a9c55f1f9940b8ff84527107d1e4526 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 12:54:26 -0500 Subject: [PATCH 2/9] fix test Signed-off-by: Andrew Duffy --- vortex-datafusion/tests/schema_evolution.rs | 31 +++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs index e18979d9721..aa64486f289 100644 --- a/vortex-datafusion/tests/schema_evolution.rs +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -3,27 +3,38 @@ //! Test that checks we can evolve schemas in a cmpatible way across files. -use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use std::sync::Arc; +use std::sync::LazyLock; + +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use arrow_schema::SchemaRef; use datafusion::arrow::array::StringViewArray; use datafusion::arrow::compute::concat_batches; -use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::ListingTable; +use datafusion::datasource::listing::ListingTableConfig; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionContext; -use datafusion_common::arrow::array::{ArrayRef as ArrowArrayRef, RecordBatch}; +use datafusion_common::arrow::array::ArrayRef as ArrowArrayRef; +use datafusion_common::arrow::array::RecordBatch; use datafusion_common::record_batch; use datafusion_datasource::ListingTableUrl; use datafusion_expr::col; use object_store::ObjectStore; use object_store::memory::InMemory; use object_store::path::Path; -use std::sync::{Arc, LazyLock}; use url::Url; +use vortex::ArrayRef; +use vortex::VortexSessionDefault; use vortex::arrow::FromArrowArray; use vortex::file::WriteOptionsSessionExt; -use vortex::io::{ObjectStoreWriter, VortexWrite}; +use vortex::io::ObjectStoreWriter; +use vortex::io::VortexWrite; use vortex::session::VortexSession; -use vortex::{ArrayRef, VortexSessionDefault}; -use vortex_datafusion::{VortexFormat, VortexFormatFactory}; +use vortex_datafusion::VortexFormat; +use vortex_datafusion::VortexFormatFactory; static SESSION: LazyLock = LazyLock::new(|| VortexSession::default()); @@ -135,7 +146,11 @@ async fn test_filter_with_schema_evolution() { Some("two"), Some("three"), ])) as ArrowArrayRef, - Arc::new(StringViewArray::from(vec![Option::<&str>::None, None, None])) as ArrowArrayRef, + Arc::new(StringViewArray::from(vec![ + Option::<&str>::None, + None, + None + ])) as ArrowArrayRef, ] ) ); From df70214788a0f2b202c0aaabdb6a3889a5446273 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 17:13:40 -0500 Subject: [PATCH 3/9] lots of things Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 100 +----- vortex-datafusion/src/persistent/adapter.rs | 323 ++++++++++++++++++ vortex-datafusion/src/persistent/mod.rs | 1 + vortex-datafusion/src/persistent/opener.rs | 355 ++++++-------------- vortex-datafusion/src/persistent/source.rs | 63 +--- vortex-datafusion/tests/schema_evolution.rs | 3 +- 6 files changed, 446 insertions(+), 399 deletions(-) create mode 100644 vortex-datafusion/src/persistent/adapter.rs diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index cdd194482c8..4af8e38f7a5 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -9,22 +9,17 @@ use datafusion_expr::Operator as DFOperator; use datafusion_functions::core::getfield::GetFieldFunc; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::ScalarFunctionExpr; -use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr; +use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; use datafusion_physical_plan::expressions as df_expr; use itertools::Itertools; use vortex::compute::LikeOptions; +use vortex::dtype::arrow::FromArrowType; use vortex::dtype::DType; use vortex::dtype::Nullability; -use vortex::dtype::arrow::FromArrowType; -use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; -use vortex::expr::Binary; -use vortex::expr::Expression; -use vortex::expr::Like; -use vortex::expr::Operator; -use vortex::expr::VTableExt; +use vortex::error::VortexResult; use vortex::expr::and; use vortex::expr::cast; use vortex::expr::get_item; @@ -33,6 +28,11 @@ use vortex::expr::list_contains; use vortex::expr::lit; use vortex::expr::not; use vortex::expr::root; +use vortex::expr::Binary; +use vortex::expr::Expression; +use vortex::expr::Like; +use vortex::expr::Operator; +use vortex::expr::VTableExt; use vortex::scalar::Scalar; use crate::convert::FromDataFusion; @@ -244,8 +244,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> } else if let Some(in_list) = expr.downcast_ref::() { can_be_pushed_down(in_list.expr(), schema) && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) - } else if let Some(scalar_fn) = expr.downcast_ref::() { - can_scalar_fn_be_pushed_down(scalar_fn, schema) + } else if ScalarFunctionExpr::try_downcast_func::(df_expr.as_ref()).is_some() { + true } else { tracing::debug!(%df_expr, "DataFusion expression can't be pushed down"); false @@ -292,65 +292,17 @@ fn supported_data_types(dt: &DataType) -> bool { is_supported } -/// Checks if a GetField scalar function can be pushed down. -fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { - let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::(scalar_fn) - else { - // Only get_field pushdown is supported. - return false; - }; - - let args = get_field_fn.args(); - if args.len() != 2 { - tracing::debug!( - "Expected 2 arguments for GetField, not pushing down {} arguments", - args.len() - ); - return false; - } - let source_expr = &args[0]; - let field_name_expr = &args[1]; - let Some(field_name) = field_name_expr - .as_any() - .downcast_ref::() - .and_then(|lit| lit.value().try_as_str().flatten()) - else { - return false; - }; - - let Ok(source_dt) = source_expr.data_type(schema) else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type for GetField, not pushing down" - ); - return false; - }; - let DataType::Struct(fields) = source_dt else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type as struct for GetField, not pushing down" - ); - return false; - }; - fields.find(field_name).is_some() -} - #[cfg(test)] mod tests { use std::sync::Arc; use arrow_schema::DataType; use arrow_schema::Field; - use arrow_schema::Fields; use arrow_schema::Schema; use arrow_schema::TimeUnit as ArrowTimeUnit; use datafusion::functions::core::getfield::GetFieldFunc; - use datafusion_common::ScalarValue; use datafusion_common::config::ConfigOptions; + use datafusion_common::ScalarValue; use datafusion_expr::Operator as DFOperator; use datafusion_expr::ScalarUDF; use datafusion_physical_expr::PhysicalExpr; @@ -673,34 +625,4 @@ mod tests { └── input: vortex.root "#); } - - #[rstest] - #[case::valid_field("field1", true)] - #[case::missing_field("nonexistent_field", false)] - fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) { - let struct_fields = Fields::from(vec![ - Field::new("field1", DataType::Utf8, true), - Field::new("field2", DataType::Int32, true), - ]); - let schema = Schema::new(vec![Field::new( - "my_struct", - DataType::Struct(struct_fields), - true, - )]); - - let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc; - let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some( - field_name.to_string(), - )))) as Arc; - - let get_field_expr = Arc::new(ScalarFunctionExpr::new( - "get_field", - Arc::new(ScalarUDF::from(GetFieldFunc::new())), - vec![struct_col, field_name_lit], - Arc::new(Field::new(field_name, DataType::Utf8, true)), - Arc::new(ConfigOptions::new()), - )) as Arc; - - assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected); - } } diff --git a/vortex-datafusion/src/persistent/adapter.rs b/vortex-datafusion/src/persistent/adapter.rs new file mode 100644 index 00000000000..c3b64e2a5a2 --- /dev/null +++ b/vortex-datafusion/src/persistent/adapter.rs @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared utilities and custom implementations for schema and expression adapters. +//! +//! Most of this code is taken from one of two files: +//! +//! `datafusion/physical-expr-adapter/src/schema_rewriter.rs` -> everything related to the +//! DefaultPhysicalExprAdapter +//! +//! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private) + +use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; +use datafusion_common::ScalarValue; +use datafusion_common::arrow::compute::can_cast_types; +use datafusion_common::nested_struct::validate_struct_compatibility; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{exec_err, plan_err}; +use datafusion_functions::core::getfield::GetFieldFunc; +use datafusion_physical_expr::expressions::{CastExpr, Column}; +use datafusion_physical_expr::{ScalarFunctionExpr, expressions}; +use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +use std::fmt::Debug; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub struct DefaultPhysicalExprAdapterFactory; + +impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + Arc::new(DefaultPhysicalExprAdapter::new( + logical_file_schema, + physical_file_schema, + )) + } +} +#[derive(Debug, Clone)] +pub struct DefaultPhysicalExprAdapter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + partition_values: Vec<(FieldRef, ScalarValue)>, +} + +impl DefaultPhysicalExprAdapter { + /// Create a new instance of the default physical expression adapter. + /// + /// This adapter rewrites expressions to match the physical schema of the file being scanned, + /// handling type mismatches and missing columns by filling them with default values. + pub fn new(logical_file_schema: SchemaRef, physical_file_schema: SchemaRef) -> Self { + Self { + logical_file_schema, + physical_file_schema, + partition_values: Vec::new(), + } + } +} + +impl PhysicalExprAdapter for DefaultPhysicalExprAdapter { + fn rewrite( + &self, + expr: Arc, + ) -> datafusion_common::Result> { + let rewriter = DefaultPhysicalExprAdapterRewriter { + logical_file_schema: &self.logical_file_schema, + physical_file_schema: &self.physical_file_schema, + partition_fields: &self.partition_values, + }; + expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr))) + .data() + } + + fn with_partition_values( + &self, + partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + Arc::new(Self { + partition_values, + ..self.clone() + }) + } +} + +/// This is derived from the standard library, with the exception that it fixes handling of +/// rewriting expressions with struct fields. +pub(crate) struct DefaultPhysicalExprAdapterRewriter<'a> { + logical_file_schema: &'a Schema, + physical_file_schema: &'a Schema, + partition_fields: &'a [(FieldRef, ScalarValue)], +} + +impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { + fn rewrite_expr( + &self, + expr: Arc, + ) -> datafusion_common::Result>> { + if let Some(transformed) = self.try_rewrite_struct_field_access(&expr)? { + return Ok(Transformed::yes(transformed)); + } + + if let Some(column) = expr.as_any().downcast_ref::() { + return self.rewrite_column(Arc::clone(&expr), column); + } + + Ok(Transformed::no(expr)) + } + + /// Attempt to rewrite struct field access expressions to return null if the field does not exist in the physical schema. + /// Note that this does *not* handle nested struct fields, only top-level struct field access. + /// See for more details. + fn try_rewrite_struct_field_access( + &self, + expr: &Arc, + ) -> datafusion_common::Result>> { + let get_field_expr = + match ScalarFunctionExpr::try_downcast_func::(expr.as_ref()) { + Some(expr) => expr, + None => return Ok(None), + }; + + let source_expr = match get_field_expr.args().first() { + Some(expr) => expr, + None => return Ok(None), + }; + + let field_name_expr = match get_field_expr.args().get(1) { + Some(expr) => expr, + None => return Ok(None), + }; + + let lit = match field_name_expr + .as_any() + .downcast_ref::() + { + Some(lit) => lit, + None => return Ok(None), + }; + + let field_name = match lit.value().try_as_str().flatten() { + Some(name) => name, + None => return Ok(None), + }; + + let column = match source_expr.as_any().downcast_ref::() { + Some(column) => column, + None => return Ok(None), + }; + + let physical_field = match self.physical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let physical_struct_fields = match physical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + if physical_struct_fields + .iter() + .any(|f| f.name() == field_name) + { + return Ok(None); + } + + let logical_field = match self.logical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let logical_struct_fields = match logical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + let logical_struct_field = match logical_struct_fields + .iter() + .find(|f| f.name() == field_name) + { + Some(field) => field, + None => return Ok(None), + }; + + let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?; + Ok(Some(expressions::lit(null_value))) + } + + fn rewrite_column( + &self, + expr: Arc, + column: &Column, + ) -> datafusion_common::Result>> { + // Get the logical field for this column if it exists in the logical schema + let logical_field = match self.logical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(e) => { + // If the column is a partition field, we can use the partition value + if let Some(partition_value) = self.get_partition_value(column.name()) { + return Ok(Transformed::yes(expressions::lit(partition_value))); + } + // This can be hit if a custom rewrite injected a reference to a column that doesn't exist in the logical schema. + // For example, a pre-computed column that is kept only in the physical schema. + // If the column exists in the physical schema, we can still use it. + if let Ok(physical_field) = self.physical_file_schema.field_with_name(column.name()) + { + // If the column exists in the physical schema, we can use it in place of the logical column. + // This is nice to users because if they do a rewrite that results in something like `physical_int32_col = 123u64` + // we'll at least handle the casts for them. + physical_field + } else { + // A completely unknown column that doesn't exist in either schema! + // This should probably never be hit unless something upstream broke, but nonetheless it's better + // for us to return a handleable error than to panic / do something unexpected. + return Err(e.into()); + } + } + }; + + // Check if the column exists in the physical schema + let physical_column_index = match self.physical_file_schema.index_of(column.name()) { + Ok(index) => index, + Err(_) => { + if !logical_field.is_nullable() { + return exec_err!( + "Non-nullable column '{}' is missing from the physical schema", + column.name() + ); + } + // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. + // TODO: do we need to sync this with what the `SchemaAdapter` actually does? + // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! + // See https://github.com/apache/datafusion/issues/16527 + let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; + return Ok(Transformed::yes(expressions::lit(null_value))); + } + }; + let physical_field = self.physical_file_schema.field(physical_column_index); + + let column = match ( + column.index() == physical_column_index, + logical_field.data_type() == physical_field.data_type(), + ) { + // If the column index matches and the data types match, we can use the column as is + (true, true) => return Ok(Transformed::no(expr)), + // If the indexes or data types do not match, we need to create a new column expression + (true, _) => column.clone(), + (false, _) => Column::new_with_schema(logical_field.name(), self.physical_file_schema)?, + }; + + if logical_field.data_type() == physical_field.data_type() { + // If the data types match, we can use the column as is + return Ok(Transformed::yes(Arc::new(column))); + } + + // This has been changed to replace the DF upstream version which just does can_cast_types, + // which ignores struct fields with compatible but differing columns. + let is_compatible = can_cast_field(physical_field, logical_field)?; + if !is_compatible { + return exec_err!( + "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", + column.name(), + physical_field.data_type(), + logical_field.data_type() + ); + } + + let cast_expr = Arc::new(CastExpr::new( + Arc::new(column), + logical_field.data_type().clone(), + None, + )); + + Ok(Transformed::yes(cast_expr)) + } + + fn get_partition_value(&self, column_name: &str) -> Option { + self.partition_fields + .iter() + .find(|(field, _)| field.name() == column_name) + .map(|(_, value)| value.clone()) + } +} + +pub(crate) fn can_cast_field( + file_field: &Field, + table_field: &Field, +) -> datafusion_common::Result { + match (file_field.data_type(), table_field.data_type()) { + (DataType::Struct(source_fields), DataType::Struct(target_fields)) => { + validate_struct_compatibility(source_fields, target_fields) + } + _ => { + if can_cast_types(file_field.data_type(), table_field.data_type()) { + Ok(true) + } else { + plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } + } + } +} diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 9be23b7d804..692cd1ec7db 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -8,6 +8,7 @@ pub mod metrics; mod opener; mod sink; mod source; +pub(crate) mod adapter; pub use format::VortexFormat; pub use format::VortexFormatFactory; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index a0fa396b0c4..13741ec401f 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -2,11 +2,10 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::ops::Range; -use std::sync::Arc; use std::sync::Weak; +use std::sync::{Arc, LazyLock}; use arrow_schema::ArrowError; -use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::SchemaRef; use datafusion_common::DataFusionError; @@ -17,7 +16,7 @@ use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; +use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory}; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr::split_conjunction; @@ -48,11 +47,18 @@ use super::cache::VortexFileCache; use crate::convert::exprs::can_be_pushed_down; use crate::convert::exprs::make_vortex_predicate; +static DEFAULT_EXPR_ADAPTER: LazyLock> = + LazyLock::new(|| Arc::new(crate::adapter::DefaultPhysicalExprAdapterFactory)); + +static DEFAULT_SCHEMA_ADAPTER: LazyLock> = + LazyLock::new(|| Arc::new(DefaultSchemaAdapterFactory)); + #[derive(Clone)] pub(crate) struct VortexOpener { pub session: VortexSession, pub object_store: Arc, - /// Projection by index of the file's columns + /// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is + /// all fields in the final scan result not including the partition columns. pub projection: Option>, /// Filter expression optimized for pushdown into Vortex scan operations. /// This may be a subset of file_pruning_predicate containing only expressions @@ -61,103 +67,27 @@ pub(crate) struct VortexOpener { /// Filter expression used by DataFusion's FilePruner to eliminate files based on /// statistics and partition values without opening them. pub file_pruning_predicate: Option, - pub expr_adapter_factory: Option>, - pub schema_adapter_factory: Arc, /// Hive-style partitioning columns pub partition_fields: Vec>, pub file_cache: VortexFileCache, - /// This is the table's schema without partition columns. It might be different than - /// the physical schema, and the stream's type will be a projection of it. - pub logical_schema: SchemaRef, + /// This is the table's schema without partition columns. It may contain fields which do + /// not exist in the file, and are supplied by the `schema_adapter_factory`. + pub table_schema: SchemaRef, + /// A hint for the desired row count of record batches returned from the scan. pub batch_size: usize, + /// If provided, the scan will not return more than this many rows. pub limit: Option, + /// A metrics object for tracking performance of the scan. pub metrics: VortexMetrics, + /// A shared cache of file readers. + /// + /// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache + /// a file reader the first time we read a file. pub layout_readers: Arc>>, /// Whether the query has output ordering specified pub has_output_ordering: bool, } -/// Merges the data types of two fields, preferring the logical type from the -/// table field. -fn merge_field_types(physical_field: &Field, table_field: &Field) -> DataType { - match (physical_field.data_type(), table_field.data_type()) { - (DataType::Struct(phys_fields), DataType::Struct(table_fields)) => { - let merged_fields = merge_fields(phys_fields, table_fields); - DataType::Struct(merged_fields.into()) - } - (DataType::List(phys_field), DataType::List(table_field)) => { - DataType::List(Arc::new(Field::new( - phys_field.name(), - merge_field_types(phys_field, table_field), - phys_field.is_nullable(), - ))) - } - (DataType::LargeList(phys_field), DataType::LargeList(table_field)) => { - DataType::LargeList(Arc::new(Field::new( - phys_field.name(), - merge_field_types(phys_field, table_field), - phys_field.is_nullable(), - ))) - } - _ => table_field.data_type().clone(), - } -} - -/// Merges two field collections, using logical types from table_fields where available. -/// Falls back to physical field types when no matching table field is found. -fn merge_fields( - physical_fields: &arrow_schema::Fields, - table_fields: &arrow_schema::Fields, -) -> Vec { - physical_fields - .iter() - .map(|phys_field| { - table_fields - .iter() - .find(|f| f.name() == phys_field.name()) - .map(|table_field| { - Field::new( - phys_field.name(), - merge_field_types(phys_field, table_field), - phys_field.is_nullable(), - ) - }) - .unwrap_or_else(|| (**phys_field).clone()) - }) - .collect() -} - -/// Computes a logical file schema from the physical file schema and the table -/// schema. -/// -/// For each field in the physical file schema, looks up the corresponding field -/// in the table schema and uses its logical type. -fn compute_logical_file_schema( - physical_file_schema: &SchemaRef, - table_schema: &SchemaRef, -) -> SchemaRef { - let logical_fields: Vec = physical_file_schema - .fields() - .iter() - .map(|physical_field| { - table_schema - .fields() - .find(physical_field.name()) - .map(|(_, table_field)| { - Field::new( - physical_field.name(), - merge_field_types(physical_field, table_field), - physical_field.is_nullable(), - ) - .with_metadata(physical_field.metadata().clone()) - }) - .unwrap_or_else(|| (**physical_field).clone()) - }) - .collect(); - - Arc::new(arrow_schema::Schema::new(logical_fields)) -} - impl FileOpener for VortexOpener { fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> DFResult { let session = self.session.clone(); @@ -165,26 +95,31 @@ impl FileOpener for VortexOpener { let projection = self.projection.clone(); let mut filter = self.filter.clone(); let file_pruning_predicate = self.file_pruning_predicate.clone(); - let expr_adapter_factory = self.expr_adapter_factory.clone(); + + // We create our custom expression and schema adapters. The schema adapter is just the + // DF default adapter. Our expression adapter is currently built to work around a bug + // in the upstream adapter. + // See + let expr_adapter_factory = DEFAULT_EXPR_ADAPTER.clone(); + let schema_adapter_factory = DEFAULT_SCHEMA_ADAPTER.clone(); + let partition_fields = self.partition_fields.clone(); let file_cache = self.file_cache.clone(); - let logical_schema = self.logical_schema.clone(); + let table_schema = self.table_schema.clone(); let batch_size = self.batch_size; let limit = self.limit; let metrics = self.metrics.clone(); let layout_reader = self.layout_readers.clone(); let has_output_ordering = self.has_output_ordering; - let projected_schema = match projection.as_ref() { - None => logical_schema.clone(), - Some(indices) => Arc::new(logical_schema.project(indices)?), + let projected_table_schema = match projection.as_ref() { + None => table_schema.clone(), + Some(indices) => Arc::new(table_schema.project(indices)?), }; - let mut predicate_file_schema = logical_schema.clone(); - - let schema_adapter = self - .schema_adapter_factory - .create(projected_schema, logical_schema.clone()); + // We know the file schema before we open it. + let schema_adapter = + schema_adapter_factory.create(projected_table_schema.clone(), table_schema.clone()); Ok(async move { // Create FilePruner when we have a predicate and either dynamic expressions @@ -200,7 +135,7 @@ impl FileOpener for VortexOpener { (is_dynamic_physical_expr(&predicate) | file.has_statistics()).then_some( FilePruner::new( predicate.clone(), - &logical_schema, + &table_schema, partition_fields.clone(), file.clone(), Count::default(), @@ -230,51 +165,45 @@ impl FileOpener for VortexOpener { DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}")) })?); - // Compute the logical file schema by merging physical file types with logical table types. - // This schema has the same field names as logical_schema, but with physical types from the file. - let logical_file_schema = - compute_logical_file_schema(&physical_file_schema, &logical_schema); - - if let Some(expr_adapter_factory) = expr_adapter_factory { - let partition_values = partition_fields - .iter() - .cloned() - .zip(file.partition_values) - .collect::>(); - - // The adapter rewrites the expression to the local file schema, allowing - // for schema evolution and divergence between the table's schema and individual files. - filter = filter - .map(|filter| { - let expr = expr_adapter_factory - .create(logical_file_schema.clone(), physical_file_schema.clone()) - .with_partition_values(partition_values) - .rewrite(filter)?; - - // Expression might now reference columns that don't exist in the file, so we can give it - // another simplification pass. - PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr) - }) - .transpose()?; - - predicate_file_schema = physical_file_schema; - } + // Compute the schema we use to scan the file. This will include any fields + // from the table_schema that also exist in the file schema. + // let logical_file_schema = + // compute_logical_file_schema(&physical_file_schema, &table_schema); - // Use the pre-created schema adapter to map logical_file_schema to projected_schema. - // Since logical_file_schema has the same field names as logical_schema (which the adapter - // was created with), this works correctly and gives us the projection indices. - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&logical_file_schema)?; - - // Build the Vortex projection expression using field names from logical_file_schema - let fields = adapted_projections + let partition_values = partition_fields .iter() - .map(|&idx| { - let field = logical_file_schema.field(idx); - FieldName::from(field.name().as_str()) - }) + .cloned() + .zip(file.partition_values) .collect::>(); - let projection_expr = select(fields, root()); + + // The adapter rewrites the expression to the local file schema, allowing + // for schema evolution and divergence between the table's schema and individual files. + filter = filter + .map(|filter| { + // Rewrite the filter to cast into the logical file schema field types + let expr = expr_adapter_factory + .create(table_schema.clone(), physical_file_schema.clone()) + .with_partition_values(partition_values) + .rewrite(filter) + .expect("rewrite"); + + // Expression might now reference columns that don't exist in the file, so we can give it + // another simplification pass. + PhysicalExprSimplifier::new(table_schema.as_ref()).simplify(expr) + }) + .transpose()?; + + let predicate_file_schema = physical_file_schema; + + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(&predicate_file_schema)?; + + // We use the field names from pushdown expression instead. + let field_names: Vec = adapted_projections + .into_iter() + .map(|index| FieldName::from(predicate_file_schema.field(index).name().as_str())) + .collect(); + let projection_expr = select(field_names, root()); // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once. let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) { @@ -416,16 +345,15 @@ mod tests { use datafusion::arrow::array::RecordBatch; use datafusion::arrow::array::StringArray; use datafusion::arrow::array::StructArray; + use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::util::display::FormatOptions; use datafusion::common::record_batch; - use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion::logical_expr::col; use datafusion::logical_expr::lit; use datafusion::physical_expr::planner::logical2physical; - use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory; - use datafusion::scalar::ScalarValue; + use datafusion_common::create_array; use insta::assert_snapshot; use itertools::Itertools; use object_store::ObjectMeta; @@ -517,96 +445,8 @@ mod tests { } } - #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))] - // If we don't have a physical expr adapter, we just drop filters on partition values - #[case(None, (1, 3), (1, 3))] - #[tokio::test] - async fn test_adapter_optimization_partition_column( - #[case] expr_adapter_factory: Option>, - #[case] expected_result1: (usize, usize), - #[case] expected_result2: (usize, usize), - ) -> anyhow::Result<()> { - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "part=1/file.vortex"; - let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - let data_size = - write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - let file_schema = batch.schema(); - let mut file = PartitionedFile::new(file_path.to_string(), data_size); - file.partition_values = vec![ScalarValue::Int32(Some(1))]; - - let table_schema = Arc::new(Schema::new(vec![ - Field::new("part", DataType::Int32, false), - Field::new("a", DataType::Int32, false), - ])); - - let make_opener = |filter| VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: file_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; - - // filter matches partition value - let filter = col("part").eq(lit(1)); - let filter = logical2physical(&filter, table_schema.as_ref()); - - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); - - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); - - assert_eq!((num_batches, num_rows), expected_result1); - - // filter doesn't matches partition value - let filter = col("part").eq(lit(2)); - let filter = logical2physical(&filter, table_schema.as_ref()); - - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); - - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); - assert_eq!((num_batches, num_rows), expected_result2); - - Ok(()) - } - - #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _))] - // If we don't have a physical expr adapter, we just drop filters on partition values. - // This is currently not supported, the work to support it requires to rewrite the predicate with appropriate casts. - // Seems like datafusion is moving towards having DefaultPhysicalExprAdapterFactory be always provided, which would make it work OOTB. - // See: https://github.com/apache/datafusion/issues/16800 - // #[case(None)] #[tokio::test] - async fn test_open_files_different_table_schema( - #[case] expr_adapter_factory: Option>, - ) -> anyhow::Result<()> { + async fn test_open_files_different_table_schema() -> anyhow::Result<()> { use datafusion::arrow::util::pretty::pretty_format_batches_with_options; let object_store = Arc::new(InMemory::new()) as Arc; @@ -629,11 +469,9 @@ mod tests { projection: Some([0].into()), filter: Some(filter), file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), @@ -717,11 +555,9 @@ mod tests { projection: Some([0, 1, 2].into()), filter: None, file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), @@ -808,11 +644,9 @@ mod tests { &table_schema, )), file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema, + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), @@ -834,6 +668,35 @@ mod tests { assert_eq!(data.len(), 1); assert_eq!(data[0].num_rows(), 3); + // Output should match + let expected_field1 = cast( + create_array!(Utf8, vec![Some("value1"), Some("value2"), Some("value3")]).as_ref(), + &DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + ) + .unwrap(); + let expected_field2 = cast( + create_array!(Utf8, vec![Some("a"), Some("b"), Some("c")]).as_ref(), + &DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + ) + .unwrap(); + let expected_field3 = Arc::new(StringArray::new_null(3)); + + let my_struct = StructArray::new( + vec![ + Field::new("field1", expected_field1.data_type().clone(), true), + Field::new("field2", expected_field1.data_type().clone(), true), + Field::new("field3", DataType::Utf8, true), + ] + .into(), + vec![expected_field1, expected_field2, expected_field3], + None, + ); + + let expected_batch = + RecordBatch::try_new(table_schema.clone(), vec![Arc::new(my_struct)]).unwrap(); + + assert_eq!(data[0], expected_batch); + Ok(()) } @@ -877,11 +740,9 @@ mod tests { projection: Some(projection.into()), filter: None, file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index d42436eb652..4247f9610ae 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -13,12 +13,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::conjunction; -use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; -use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; @@ -57,8 +53,6 @@ pub struct VortexSource { pub(crate) projected_statistics: Option, /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema. pub(crate) arrow_file_schema: Option, - pub(crate) schema_adapter_factory: Option>, - pub(crate) expr_adapter_factory: Option>, _unused_df_metrics: ExecutionPlanMetricsSet, /// Shared layout readers, the source only lives as long as one scan. /// @@ -76,25 +70,10 @@ impl VortexSource { batch_size: None, projected_statistics: None, arrow_file_schema: None, - schema_adapter_factory: None, - expr_adapter_factory: None, _unused_df_metrics: Default::default(), layout_readers: Arc::new(DashMap::default()), } } - - /// Sets a [`PhysicalExprAdapterFactory`] for the [`VortexSource`]. - /// Currently, this must be provided in order to filter columns in files that have a different data type from the unified table schema. - /// - /// This factory will take precedence when opening files over instances provided by the [`FileScanConfig`]. - pub fn with_expr_adapter_factory( - &self, - expr_adapter_factory: Arc, - ) -> Arc { - let mut source = self.clone(); - source.expr_adapter_factory = Some(expr_adapter_factory); - Arc::new(source) - } } impl FileSource for VortexSource { @@ -113,31 +92,6 @@ impl FileSource for VortexSource { .batch_size .vortex_expect("batch_size must be supplied to VortexSource"); - let expr_adapter = self - .expr_adapter_factory - .as_ref() - .or(base_config.expr_adapter_factory.as_ref()); - let schema_adapter = self.schema_adapter_factory.as_ref(); - - // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details. - let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) { - (Some(expr_adapter), Some(schema_adapter)) => { - (Some(expr_adapter.clone()), schema_adapter.clone()) - } - (Some(expr_adapter), None) => ( - Some(expr_adapter.clone()), - Arc::new(DefaultSchemaAdapterFactory) as _, - ), - (None, Some(schema_adapter)) => { - // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory` - (None, schema_adapter.clone()) - } - (None, None) => ( - Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - Arc::new(DefaultSchemaAdapterFactory) as _, - ), - }; - let projection = base_config.file_column_projection_indices().map(Arc::from); let opener = VortexOpener { @@ -146,10 +100,8 @@ impl FileSource for VortexSource { projection, filter: self.vortex_predicate.clone(), file_pruning_predicate: self.full_predicate.clone(), - expr_adapter_factory, - schema_adapter_factory, partition_fields: base_config.table_partition_cols.clone(), - logical_schema: base_config.file_schema.clone(), + table_schema: base_config.file_schema.clone(), file_cache: self.file_cache.clone(), batch_size, limit: base_config.limit, @@ -300,17 +252,4 @@ impl FileSource for VortexSource { ) .with_updated_node(Arc::new(source) as _)) } - - fn with_schema_adapter_factory( - &self, - factory: Arc, - ) -> DFResult> { - let mut source = self.clone(); - source.schema_adapter_factory = Some(factory); - Ok(Arc::new(source)) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } } diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs index aa64486f289..d0ade2ebc6c 100644 --- a/vortex-datafusion/tests/schema_evolution.rs +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -118,6 +118,7 @@ async fn test_filter_with_schema_evolution() { let table_schema = Arc::new(df.schema().as_arrow().clone()); + // Table schema contains both fields assert_eq!( table_schema.as_ref(), &Schema::new(vec![ @@ -126,7 +127,7 @@ async fn test_filter_with_schema_evolution() { ]) ); - // Filter the result to only the values from the first file + // Filter the result to only ones with a column, i.e. only file1 let result = df .filter(col("a").is_not_null()) .unwrap() From 3c852271f1a5ea1e918aaa85b44ed3547dee2001 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 19:01:02 -0500 Subject: [PATCH 4/9] think this does it? Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 18 +++++++------- vortex-datafusion/src/persistent/adapter.rs | 27 +++++++++++++++------ vortex-datafusion/src/persistent/mod.rs | 2 +- vortex-datafusion/src/persistent/opener.rs | 22 ++++++++--------- vortex-datafusion/tests/schema_evolution.rs | 2 ++ 5 files changed, 42 insertions(+), 29 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 4af8e38f7a5..30ac7e1fe73 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -9,17 +9,22 @@ use datafusion_expr::Operator as DFOperator; use datafusion_functions::core::getfield::GetFieldFunc; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::ScalarFunctionExpr; -use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr; use datafusion_physical_expr_common::physical_expr::PhysicalExprRef; +use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr; use datafusion_physical_plan::expressions as df_expr; use itertools::Itertools; use vortex::compute::LikeOptions; -use vortex::dtype::arrow::FromArrowType; use vortex::dtype::DType; use vortex::dtype::Nullability; +use vortex::dtype::arrow::FromArrowType; +use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_err; -use vortex::error::VortexResult; +use vortex::expr::Binary; +use vortex::expr::Expression; +use vortex::expr::Like; +use vortex::expr::Operator; +use vortex::expr::VTableExt; use vortex::expr::and; use vortex::expr::cast; use vortex::expr::get_item; @@ -28,11 +33,6 @@ use vortex::expr::list_contains; use vortex::expr::lit; use vortex::expr::not; use vortex::expr::root; -use vortex::expr::Binary; -use vortex::expr::Expression; -use vortex::expr::Like; -use vortex::expr::Operator; -use vortex::expr::VTableExt; use vortex::scalar::Scalar; use crate::convert::FromDataFusion; @@ -301,8 +301,8 @@ mod tests { use arrow_schema::Schema; use arrow_schema::TimeUnit as ArrowTimeUnit; use datafusion::functions::core::getfield::GetFieldFunc; - use datafusion_common::config::ConfigOptions; use datafusion_common::ScalarValue; + use datafusion_common::config::ConfigOptions; use datafusion_expr::Operator as DFOperator; use datafusion_expr::ScalarUDF; use datafusion_physical_expr::PhysicalExpr; diff --git a/vortex-datafusion/src/persistent/adapter.rs b/vortex-datafusion/src/persistent/adapter.rs index c3b64e2a5a2..086f7b56e4e 100644 --- a/vortex-datafusion/src/persistent/adapter.rs +++ b/vortex-datafusion/src/persistent/adapter.rs @@ -24,19 +24,30 @@ //! //! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private) -use arrow_schema::{DataType, Field, FieldRef, Schema, SchemaRef}; +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::FieldRef; +use arrow_schema::Schema; +use arrow_schema::SchemaRef; use datafusion_common::ScalarValue; use datafusion_common::arrow::compute::can_cast_types; +use datafusion_common::exec_err; use datafusion_common::nested_struct::validate_struct_compatibility; -use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; -use datafusion_common::{exec_err, plan_err}; +use datafusion_common::plan_err; +use datafusion_common::tree_node::Transformed; +use datafusion_common::tree_node::TransformedResult; +use datafusion_common::tree_node::TreeNode; use datafusion_functions::core::getfield::GetFieldFunc; -use datafusion_physical_expr::expressions::{CastExpr, Column}; -use datafusion_physical_expr::{ScalarFunctionExpr, expressions}; -use datafusion_physical_expr_adapter::{PhysicalExprAdapter, PhysicalExprAdapterFactory}; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions; +use datafusion_physical_expr::expressions::CastExpr; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_adapter::PhysicalExprAdapter; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; -use std::fmt::Debug; -use std::sync::Arc; #[derive(Debug, Clone)] pub struct DefaultPhysicalExprAdapterFactory; diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 692cd1ec7db..208c5a6a403 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -2,13 +2,13 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors //! Persistent implementation of a Vortex table provider. +pub(crate) mod adapter; mod cache; mod format; pub mod metrics; mod opener; mod sink; mod source; -pub(crate) mod adapter; pub use format::VortexFormat; pub use format::VortexFormatFactory; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 13741ec401f..ea9ed5c5fa0 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -2,8 +2,9 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::ops::Range; +use std::sync::Arc; +use std::sync::LazyLock; use std::sync::Weak; -use std::sync::{Arc, LazyLock}; use arrow_schema::ArrowError; use arrow_schema::Field; @@ -16,7 +17,8 @@ use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory}; +use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; +use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr::split_conjunction; @@ -180,28 +182,26 @@ impl FileOpener for VortexOpener { // for schema evolution and divergence between the table's schema and individual files. filter = filter .map(|filter| { - // Rewrite the filter to cast into the logical file schema field types + // Rewrite the filter to properly handle values in the table schema let expr = expr_adapter_factory .create(table_schema.clone(), physical_file_schema.clone()) .with_partition_values(partition_values) - .rewrite(filter) - .expect("rewrite"); + .rewrite(filter)?; // Expression might now reference columns that don't exist in the file, so we can give it // another simplification pass. - PhysicalExprSimplifier::new(table_schema.as_ref()).simplify(expr) + PhysicalExprSimplifier::new(physical_file_schema.as_ref()) + .simplify(expr.clone()) }) .transpose()?; - let predicate_file_schema = physical_file_schema; - let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&predicate_file_schema)?; + schema_adapter.map_schema(&physical_file_schema)?; // We use the field names from pushdown expression instead. let field_names: Vec = adapted_projections .into_iter() - .map(|index| FieldName::from(predicate_file_schema.field(index).name().as_str())) + .map(|index| FieldName::from(physical_file_schema.field(index).name().as_str())) .collect(); let projection_expr = select(field_names, root()); @@ -247,7 +247,7 @@ impl FileOpener for VortexOpener { .and_then(|f| { let exprs = split_conjunction(&f) .into_iter() - .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) + .filter(|expr| can_be_pushed_down(expr, &physical_file_schema)) .collect::>(); make_vortex_predicate(&exprs).transpose() diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs index d0ade2ebc6c..05a0cf70b58 100644 --- a/vortex-datafusion/tests/schema_evolution.rs +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -142,11 +142,13 @@ async fn test_filter_with_schema_evolution() { record_batch( &table_schema, vec![ + // a Arc::new(StringViewArray::from(vec![ Some("one"), Some("two"), Some("three"), ])) as ArrowArrayRef, + // b Arc::new(StringViewArray::from(vec![ Option::<&str>::None, None, From ba46169b4f966d0acc1e82f42f31de607830a44b Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 19:01:38 -0500 Subject: [PATCH 5/9] comment Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/adapter.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/vortex-datafusion/src/persistent/adapter.rs b/vortex-datafusion/src/persistent/adapter.rs index 086f7b56e4e..b3c826fc0a8 100644 --- a/vortex-datafusion/src/persistent/adapter.rs +++ b/vortex-datafusion/src/persistent/adapter.rs @@ -23,6 +23,8 @@ //! DefaultPhysicalExprAdapter //! //! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private) +//! +//! See https://github.com/apache/datafusion/issues/18957 use std::fmt::Debug; use std::sync::Arc; From b9efe821683a5997c9873906ae38fb3dbdfb3c6a Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 19:11:57 -0500 Subject: [PATCH 6/9] last Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/opener.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index ea9ed5c5fa0..18f34302493 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -167,11 +167,6 @@ impl FileOpener for VortexOpener { DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}")) })?); - // Compute the schema we use to scan the file. This will include any fields - // from the table_schema that also exist in the file schema. - // let logical_file_schema = - // compute_logical_file_schema(&physical_file_schema, &table_schema); - let partition_values = partition_fields .iter() .cloned() From abfadb09b7b1c1ab01a1d70c2da89cfb06c75fda Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 19:26:01 -0500 Subject: [PATCH 7/9] fix lints and license Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/adapter.rs | 3 +++ vortex-datafusion/src/persistent/opener.rs | 2 +- vortex-datafusion/tests/schema_evolution.rs | 8 +++++++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/vortex-datafusion/src/persistent/adapter.rs b/vortex-datafusion/src/persistent/adapter.rs index b3c826fc0a8..6ef76e3e47e 100644 --- a/vortex-datafusion/src/persistent/adapter.rs +++ b/vortex-datafusion/src/persistent/adapter.rs @@ -1,3 +1,6 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Apache Software Foundation (ASF) + // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 18f34302493..64aef4e9152 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -121,7 +121,7 @@ impl FileOpener for VortexOpener { // We know the file schema before we open it. let schema_adapter = - schema_adapter_factory.create(projected_table_schema.clone(), table_schema.clone()); + schema_adapter_factory.create(projected_table_schema, table_schema.clone()); Ok(async move { // Create FilePruner when we have a predicate and either dynamic expressions diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs index 05a0cf70b58..2a1e9ded633 100644 --- a/vortex-datafusion/tests/schema_evolution.rs +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -1,6 +1,12 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +#![allow( + clippy::unwrap_in_result, + clippy::unwrap_used, + clippy::tests_outside_test_module +)] + //! Test that checks we can evolve schemas in a cmpatible way across files. use std::sync::Arc; @@ -36,7 +42,7 @@ use vortex::session::VortexSession; use vortex_datafusion::VortexFormat; use vortex_datafusion::VortexFormatFactory; -static SESSION: LazyLock = LazyLock::new(|| VortexSession::default()); +static SESSION: LazyLock = LazyLock::new(VortexSession::default); fn register_vortex_format_factory( factory: VortexFormatFactory, From c79419a5d4d25687ca39a2a11b265ac410a38c80 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 19:34:05 -0500 Subject: [PATCH 8/9] docs Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/adapter.rs | 2 +- vortex-datafusion/src/persistent/opener.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/vortex-datafusion/src/persistent/adapter.rs b/vortex-datafusion/src/persistent/adapter.rs index 6ef76e3e47e..894d1f2ce8f 100644 --- a/vortex-datafusion/src/persistent/adapter.rs +++ b/vortex-datafusion/src/persistent/adapter.rs @@ -27,7 +27,7 @@ //! //! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private) //! -//! See https://github.com/apache/datafusion/issues/18957 +//! See use std::fmt::Debug; use std::sync::Arc; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 64aef4e9152..16395dfdeb0 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -101,7 +101,7 @@ impl FileOpener for VortexOpener { // We create our custom expression and schema adapters. The schema adapter is just the // DF default adapter. Our expression adapter is currently built to work around a bug // in the upstream adapter. - // See + // See: https://github.com/apache/datafusion/issues/17114 let expr_adapter_factory = DEFAULT_EXPR_ADAPTER.clone(); let schema_adapter_factory = DEFAULT_SCHEMA_ADAPTER.clone(); @@ -119,7 +119,6 @@ impl FileOpener for VortexOpener { Some(indices) => Arc::new(table_schema.project(indices)?), }; - // We know the file schema before we open it. let schema_adapter = schema_adapter_factory.create(projected_table_schema, table_schema.clone()); From 0cd1a7d0149d9c87accb9085edda0f5601502d42 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Sun, 30 Nov 2025 16:06:10 -0500 Subject: [PATCH 9/9] add two more tests Signed-off-by: Andrew Duffy --- vortex-datafusion/tests/schema_evolution.rs | 252 +++++++++++++++++++- 1 file changed, 248 insertions(+), 4 deletions(-) diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs index 2a1e9ded633..8fa97774cc2 100644 --- a/vortex-datafusion/tests/schema_evolution.rs +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -7,7 +7,7 @@ clippy::tests_outside_test_module )] -//! Test that checks we can evolve schemas in a cmpatible way across files. +//! Test that checks we can evolve schemas in a compatible way across files. use std::sync::Arc; use std::sync::LazyLock; @@ -16,18 +16,21 @@ use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Schema; use arrow_schema::SchemaRef; +use datafusion::arrow::array::RecordBatch; use datafusion::arrow::array::StringViewArray; +use datafusion::arrow::array::{Array, Int32Array}; +use datafusion::arrow::array::{ArrayRef as ArrowArrayRef, StructArray}; use datafusion::arrow::compute::concat_batches; use datafusion::datasource::listing::ListingOptions; use datafusion::datasource::listing::ListingTable; use datafusion::datasource::listing::ListingTableConfig; use datafusion::execution::SessionStateBuilder; use datafusion::execution::context::SessionContext; -use datafusion_common::arrow::array::ArrayRef as ArrowArrayRef; -use datafusion_common::arrow::array::RecordBatch; -use datafusion_common::record_batch; +use datafusion_common::{create_array, record_batch}; use datafusion_datasource::ListingTableUrl; use datafusion_expr::col; +use datafusion_expr::lit; +use datafusion_functions::expr_fn::get_field; use object_store::ObjectStore; use object_store::memory::InMemory; use object_store::path::Path; @@ -165,6 +168,247 @@ async fn test_filter_with_schema_evolution() { ); } +#[tokio::test] +async fn test_filter_schema_evolution_order() { + let (ctx, store) = make_session_ctx(); + + // file1 only contains field "a" + write_file( + &store, + "files/file1.vortex", + &record_batch!(("a", Int32, vec![Some(1), Some(3), Some(5)])).unwrap(), + ) + .await; + + // file2 containing fields "b" and "a", where "a" needs to be upcast at scan time. + write_file( + &store, + "files/file2.vortex", + &record_batch!( + ("b", Utf8, vec![Some("two"), Some("four"), Some("six")]), + ("a", Int16, vec![Some(2), Some(4), Some(6)]) + ) + .unwrap(), + ) + .await; + + // Read the table back as Vortex + let table_url = ListingTableUrl::parse("s3://in-memory/files").unwrap(); + let list_opts = ListingOptions::new(Arc::new(VortexFormat::new(SESSION.clone()))) + .with_session_config_options(ctx.state().config()) + .with_file_extension("vortex"); + + // We force the table schema, because file1/file2 have different types for the "a" column + let read_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8View, true), + ])); + + let table = ListingTable::try_new( + ListingTableConfig::new(table_url) + .with_listing_options(list_opts) + .with_schema(read_schema.clone()), + ) + .unwrap(); + + let table = Arc::new(table); + + let df = ctx.read_table(table.clone()).unwrap(); + + let table_schema = Arc::new(df.schema().as_arrow().clone()); + + // Table schema contains both fields + assert_eq!( + table_schema.as_ref(), + &Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8View, true), + ]) + ); + + // Filter referencing the b column, which only appears in file2 + let result = df + .filter(col("b").eq(lit("two"))) + .unwrap() + .collect() + .await + .unwrap(); + let result = concat_batches(&table_schema, result.iter()).unwrap(); + + assert_eq!( + result, + record_batch( + &table_schema, + vec![ + // a + Arc::new(Int32Array::from(vec![Some(2)])) as ArrowArrayRef, + // b + Arc::new(StringViewArray::from(vec![Some("two"),])) as ArrowArrayRef, + ] + ) + ); + + // Filter on the "a" column, which has different types for each file + let result = ctx + .read_table(table) + .unwrap() + .filter(col("a").gt_eq(lit(3i16))) + .unwrap() + .collect() + .await + .unwrap(); + let table = concat_batches(&table_schema, result.iter()).unwrap(); + + // file1, then file2 + assert_eq!( + table, + record_batch( + &table_schema, + vec![ + // a field: present in both files + Arc::new(Int32Array::from(vec![Some(3), Some(5), Some(4), Some(6)])) + as ArrowArrayRef, + // b field: only present in file2, file1 fills with nulls + Arc::new(StringViewArray::from(vec![ + None, + None, + Some("four"), + Some("six") + ])) as ArrowArrayRef, + ] + ) + ); +} + +#[tokio::test] +async fn test_filter_schema_evolution_struct_fields() { + // Test for correct schema evolution behavior in the presence of nested struct fields. + // We use a hypothetical schema of some observability data with "wide records", struct columns + // with nullable payloads that may or may not be present for every file. + + let (ctx, store) = make_session_ctx(); + + fn make_metrics( + hostname: &str, + uptime: Vec, + instance: Option>>, + ) -> RecordBatch { + let values_array: ArrowArrayRef = create_array!(Int64, uptime); + let payload_array = if let Some(tags) = instance { + let tags_array: ArrowArrayRef = create_array!(Utf8, tags); + Arc::new(StructArray::new( + vec![ + Field::new("uptime", DataType::Int64, true), + Field::new("instance", DataType::Utf8, true), + ] + .into(), + vec![values_array, tags_array], + None, + )) + } else { + Arc::new(StructArray::new( + vec![Field::new("uptime", DataType::Int64, true)].into(), + vec![values_array], + None, + )) + }; + + let len = payload_array.len(); + let hostname_array = create_array!(Utf8, vec![Some(hostname); len]); + + let payload_type = payload_array.data_type().clone(); + let hostname_type = hostname_array.data_type().clone(); + + RecordBatch::from(StructArray::new( + vec![ + Field::new("hostname", hostname_type, true), + Field::new("payload", payload_type, true), + ] + .into(), + vec![hostname_array, payload_array], + None, + )) + } + + let host01 = make_metrics("host01.local", vec![1, 2, 3, 4], None); + let host02 = make_metrics( + "host02.local", + vec![10, 20, 30, 40], + // host02 has new logging code which adds the new "instance" nested field in its payload + Some(vec![Some("c6i"), Some("c6i"), Some("m5"), Some("r5")]), + ); + + // Write metrics files to storage + write_file(&store, "files/host01.vortex", &host01).await; + write_file(&store, "files/host02.vortex", &host02).await; + + // Read the table back as Vortex + let table_url = ListingTableUrl::parse("s3://in-memory/files").unwrap(); + let list_opts = ListingOptions::new(Arc::new(VortexFormat::new(SESSION.clone()))) + .with_session_config_options(ctx.state().config()) + .with_file_extension("vortex"); + + // We force the table schema to be the one inclusive of the new instance field. + let read_schema = host02.schema(); + + let table = ListingTable::try_new( + ListingTableConfig::new(table_url) + .with_listing_options(list_opts) + .with_schema(read_schema.clone()), + ) + .unwrap(); + + let table = Arc::new(table); + + let df = ctx.read_table(table.clone()).unwrap(); + + let table_schema = Arc::new(df.schema().as_arrow().clone()); + + // Table schema contains both fields + assert_eq!(table_schema.as_ref(), read_schema.as_ref(),); + + // Scan all the records, NULLs are filled in for nested optional fields. + let full_scan = df.collect().await.unwrap(); + let full_scan = concat_batches(&table_schema, full_scan.iter()).unwrap(); + + let expected = concat_batches( + &table_schema, + &[ + // host01 with extra nulls for the payload.instance field + make_metrics("host01.local", vec![1, 2, 3, 4], Some(vec![None; 4])), + host02, + ], + ) + .unwrap(); + assert_eq!(full_scan, expected); + + // run a filter that touches both the payload.uptime AND the payload.instance nested fields + let df = ctx.read_table(table.clone()).unwrap(); + let filtered_scan = df + .filter( + // payload.instance = 'c6i' OR payload.uptime < 10 + // We need to perform filtering over nested columns which don't exist in every + // file type. + get_field(col("payload"), "instance") + .eq(lit("c6i")) + .or(get_field(col("payload"), "uptime").lt(lit(10))), + ) + .unwrap() + .collect() + .await + .unwrap(); + let filtered_scan = concat_batches(&table_schema, filtered_scan.iter()).unwrap(); + let expected = concat_batches( + &table_schema, + &[ + make_metrics("host01", vec![1, 2, 3, 4], Some(vec![None; 4])), + make_metrics("host02", vec![10, 20], Some(vec![Some("c6i"), Some("c6i")])), + ], + ) + .unwrap(); + assert_eq!(filtered_scan, expected); +} + fn record_batch( schema: &SchemaRef, fields: impl IntoIterator,