diff --git a/Cargo.lock b/Cargo.lock index 64327fb86ac..4a8a6424c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8554,6 +8554,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "url", "vortex", "vortex-utils", "walkdir", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index c929b1befc1..03bb7491157 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -37,6 +37,7 @@ object_store = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } tokio-stream = { workspace = true } tracing = { workspace = true } +url = { workspace = true } vortex = { workspace = true, features = ["object_store", "tokio"] } vortex-utils = { workspace = true, features = ["dashmap"] } diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index cdd194482c8..30ac7e1fe73 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -244,8 +244,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> } else if let Some(in_list) = expr.downcast_ref::() { can_be_pushed_down(in_list.expr(), schema) && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) - } else if let Some(scalar_fn) = expr.downcast_ref::() { - can_scalar_fn_be_pushed_down(scalar_fn, schema) + } else if ScalarFunctionExpr::try_downcast_func::(df_expr.as_ref()).is_some() { + true } else { tracing::debug!(%df_expr, "DataFusion expression can't be pushed down"); false @@ -292,60 +292,12 @@ fn supported_data_types(dt: &DataType) -> bool { is_supported } -/// Checks if a GetField scalar function can be pushed down. -fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { - let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::(scalar_fn) - else { - // Only get_field pushdown is supported. - return false; - }; - - let args = get_field_fn.args(); - if args.len() != 2 { - tracing::debug!( - "Expected 2 arguments for GetField, not pushing down {} arguments", - args.len() - ); - return false; - } - let source_expr = &args[0]; - let field_name_expr = &args[1]; - let Some(field_name) = field_name_expr - .as_any() - .downcast_ref::() - .and_then(|lit| lit.value().try_as_str().flatten()) - else { - return false; - }; - - let Ok(source_dt) = source_expr.data_type(schema) else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type for GetField, not pushing down" - ); - return false; - }; - let DataType::Struct(fields) = source_dt else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type as struct for GetField, not pushing down" - ); - return false; - }; - fields.find(field_name).is_some() -} - #[cfg(test)] mod tests { use std::sync::Arc; use arrow_schema::DataType; use arrow_schema::Field; - use arrow_schema::Fields; use arrow_schema::Schema; use arrow_schema::TimeUnit as ArrowTimeUnit; use datafusion::functions::core::getfield::GetFieldFunc; @@ -673,34 +625,4 @@ mod tests { └── input: vortex.root "#); } - - #[rstest] - #[case::valid_field("field1", true)] - #[case::missing_field("nonexistent_field", false)] - fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) { - let struct_fields = Fields::from(vec![ - Field::new("field1", DataType::Utf8, true), - Field::new("field2", DataType::Int32, true), - ]); - let schema = Schema::new(vec![Field::new( - "my_struct", - DataType::Struct(struct_fields), - true, - )]); - - let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc; - let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some( - field_name.to_string(), - )))) as Arc; - - let get_field_expr = Arc::new(ScalarFunctionExpr::new( - "get_field", - Arc::new(ScalarUDF::from(GetFieldFunc::new())), - vec![struct_col, field_name_lit], - Arc::new(Field::new(field_name, DataType::Utf8, true)), - Arc::new(ConfigOptions::new()), - )) as Arc; - - assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected); - } } diff --git a/vortex-datafusion/src/persistent/adapter.rs b/vortex-datafusion/src/persistent/adapter.rs new file mode 100644 index 00000000000..894d1f2ce8f --- /dev/null +++ b/vortex-datafusion/src/persistent/adapter.rs @@ -0,0 +1,339 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Apache Software Foundation (ASF) + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared utilities and custom implementations for schema and expression adapters. +//! +//! Most of this code is taken from one of two files: +//! +//! `datafusion/physical-expr-adapter/src/schema_rewriter.rs` -> everything related to the +//! DefaultPhysicalExprAdapter +//! +//! `datafusion/datasource/src/schema_adapter.rs` -> for can_cast_field (which is crate-private) +//! +//! See + +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::FieldRef; +use arrow_schema::Schema; +use arrow_schema::SchemaRef; +use datafusion_common::ScalarValue; +use datafusion_common::arrow::compute::can_cast_types; +use datafusion_common::exec_err; +use datafusion_common::nested_struct::validate_struct_compatibility; +use datafusion_common::plan_err; +use datafusion_common::tree_node::Transformed; +use datafusion_common::tree_node::TransformedResult; +use datafusion_common::tree_node::TreeNode; +use datafusion_functions::core::getfield::GetFieldFunc; +use datafusion_physical_expr::ScalarFunctionExpr; +use datafusion_physical_expr::expressions; +use datafusion_physical_expr::expressions::CastExpr; +use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr_adapter::PhysicalExprAdapter; +use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; +use datafusion_physical_expr_common::physical_expr::PhysicalExpr; + +#[derive(Debug, Clone)] +pub struct DefaultPhysicalExprAdapterFactory; + +impl PhysicalExprAdapterFactory for DefaultPhysicalExprAdapterFactory { + fn create( + &self, + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + ) -> Arc { + Arc::new(DefaultPhysicalExprAdapter::new( + logical_file_schema, + physical_file_schema, + )) + } +} +#[derive(Debug, Clone)] +pub struct DefaultPhysicalExprAdapter { + logical_file_schema: SchemaRef, + physical_file_schema: SchemaRef, + partition_values: Vec<(FieldRef, ScalarValue)>, +} + +impl DefaultPhysicalExprAdapter { + /// Create a new instance of the default physical expression adapter. + /// + /// This adapter rewrites expressions to match the physical schema of the file being scanned, + /// handling type mismatches and missing columns by filling them with default values. + pub fn new(logical_file_schema: SchemaRef, physical_file_schema: SchemaRef) -> Self { + Self { + logical_file_schema, + physical_file_schema, + partition_values: Vec::new(), + } + } +} + +impl PhysicalExprAdapter for DefaultPhysicalExprAdapter { + fn rewrite( + &self, + expr: Arc, + ) -> datafusion_common::Result> { + let rewriter = DefaultPhysicalExprAdapterRewriter { + logical_file_schema: &self.logical_file_schema, + physical_file_schema: &self.physical_file_schema, + partition_fields: &self.partition_values, + }; + expr.transform(|expr| rewriter.rewrite_expr(Arc::clone(&expr))) + .data() + } + + fn with_partition_values( + &self, + partition_values: Vec<(FieldRef, ScalarValue)>, + ) -> Arc { + Arc::new(Self { + partition_values, + ..self.clone() + }) + } +} + +/// This is derived from the standard library, with the exception that it fixes handling of +/// rewriting expressions with struct fields. +pub(crate) struct DefaultPhysicalExprAdapterRewriter<'a> { + logical_file_schema: &'a Schema, + physical_file_schema: &'a Schema, + partition_fields: &'a [(FieldRef, ScalarValue)], +} + +impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { + fn rewrite_expr( + &self, + expr: Arc, + ) -> datafusion_common::Result>> { + if let Some(transformed) = self.try_rewrite_struct_field_access(&expr)? { + return Ok(Transformed::yes(transformed)); + } + + if let Some(column) = expr.as_any().downcast_ref::() { + return self.rewrite_column(Arc::clone(&expr), column); + } + + Ok(Transformed::no(expr)) + } + + /// Attempt to rewrite struct field access expressions to return null if the field does not exist in the physical schema. + /// Note that this does *not* handle nested struct fields, only top-level struct field access. + /// See for more details. + fn try_rewrite_struct_field_access( + &self, + expr: &Arc, + ) -> datafusion_common::Result>> { + let get_field_expr = + match ScalarFunctionExpr::try_downcast_func::(expr.as_ref()) { + Some(expr) => expr, + None => return Ok(None), + }; + + let source_expr = match get_field_expr.args().first() { + Some(expr) => expr, + None => return Ok(None), + }; + + let field_name_expr = match get_field_expr.args().get(1) { + Some(expr) => expr, + None => return Ok(None), + }; + + let lit = match field_name_expr + .as_any() + .downcast_ref::() + { + Some(lit) => lit, + None => return Ok(None), + }; + + let field_name = match lit.value().try_as_str().flatten() { + Some(name) => name, + None => return Ok(None), + }; + + let column = match source_expr.as_any().downcast_ref::() { + Some(column) => column, + None => return Ok(None), + }; + + let physical_field = match self.physical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let physical_struct_fields = match physical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + if physical_struct_fields + .iter() + .any(|f| f.name() == field_name) + { + return Ok(None); + } + + let logical_field = match self.logical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(_) => return Ok(None), + }; + + let logical_struct_fields = match logical_field.data_type() { + DataType::Struct(fields) => fields, + _ => return Ok(None), + }; + + let logical_struct_field = match logical_struct_fields + .iter() + .find(|f| f.name() == field_name) + { + Some(field) => field, + None => return Ok(None), + }; + + let null_value = ScalarValue::Null.cast_to(logical_struct_field.data_type())?; + Ok(Some(expressions::lit(null_value))) + } + + fn rewrite_column( + &self, + expr: Arc, + column: &Column, + ) -> datafusion_common::Result>> { + // Get the logical field for this column if it exists in the logical schema + let logical_field = match self.logical_file_schema.field_with_name(column.name()) { + Ok(field) => field, + Err(e) => { + // If the column is a partition field, we can use the partition value + if let Some(partition_value) = self.get_partition_value(column.name()) { + return Ok(Transformed::yes(expressions::lit(partition_value))); + } + // This can be hit if a custom rewrite injected a reference to a column that doesn't exist in the logical schema. + // For example, a pre-computed column that is kept only in the physical schema. + // If the column exists in the physical schema, we can still use it. + if let Ok(physical_field) = self.physical_file_schema.field_with_name(column.name()) + { + // If the column exists in the physical schema, we can use it in place of the logical column. + // This is nice to users because if they do a rewrite that results in something like `physical_int32_col = 123u64` + // we'll at least handle the casts for them. + physical_field + } else { + // A completely unknown column that doesn't exist in either schema! + // This should probably never be hit unless something upstream broke, but nonetheless it's better + // for us to return a handleable error than to panic / do something unexpected. + return Err(e.into()); + } + } + }; + + // Check if the column exists in the physical schema + let physical_column_index = match self.physical_file_schema.index_of(column.name()) { + Ok(index) => index, + Err(_) => { + if !logical_field.is_nullable() { + return exec_err!( + "Non-nullable column '{}' is missing from the physical schema", + column.name() + ); + } + // If the column is missing from the physical schema fill it in with nulls as `SchemaAdapter` would do. + // TODO: do we need to sync this with what the `SchemaAdapter` actually does? + // While the default implementation fills in nulls in theory a custom `SchemaAdapter` could do something else! + // See https://github.com/apache/datafusion/issues/16527 + let null_value = ScalarValue::Null.cast_to(logical_field.data_type())?; + return Ok(Transformed::yes(expressions::lit(null_value))); + } + }; + let physical_field = self.physical_file_schema.field(physical_column_index); + + let column = match ( + column.index() == physical_column_index, + logical_field.data_type() == physical_field.data_type(), + ) { + // If the column index matches and the data types match, we can use the column as is + (true, true) => return Ok(Transformed::no(expr)), + // If the indexes or data types do not match, we need to create a new column expression + (true, _) => column.clone(), + (false, _) => Column::new_with_schema(logical_field.name(), self.physical_file_schema)?, + }; + + if logical_field.data_type() == physical_field.data_type() { + // If the data types match, we can use the column as is + return Ok(Transformed::yes(Arc::new(column))); + } + + // This has been changed to replace the DF upstream version which just does can_cast_types, + // which ignores struct fields with compatible but differing columns. + let is_compatible = can_cast_field(physical_field, logical_field)?; + if !is_compatible { + return exec_err!( + "Cannot cast column '{}' from '{}' (physical data type) to '{}' (logical data type)", + column.name(), + physical_field.data_type(), + logical_field.data_type() + ); + } + + let cast_expr = Arc::new(CastExpr::new( + Arc::new(column), + logical_field.data_type().clone(), + None, + )); + + Ok(Transformed::yes(cast_expr)) + } + + fn get_partition_value(&self, column_name: &str) -> Option { + self.partition_fields + .iter() + .find(|(field, _)| field.name() == column_name) + .map(|(_, value)| value.clone()) + } +} + +pub(crate) fn can_cast_field( + file_field: &Field, + table_field: &Field, +) -> datafusion_common::Result { + match (file_field.data_type(), table_field.data_type()) { + (DataType::Struct(source_fields), DataType::Struct(target_fields)) => { + validate_struct_compatibility(source_fields, target_fields) + } + _ => { + if can_cast_types(file_field.data_type(), table_field.data_type()) { + Ok(true) + } else { + plan_err!( + "Cannot cast file schema field {} of type {:?} to table schema field of type {:?}", + file_field.name(), + file_field.data_type(), + table_field.data_type() + ) + } + } + } +} diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 9be23b7d804..208c5a6a403 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors //! Persistent implementation of a Vortex table provider. +pub(crate) mod adapter; mod cache; mod format; pub mod metrics; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index a0fa396b0c4..16395dfdeb0 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -3,10 +3,10 @@ use std::ops::Range; use std::sync::Arc; +use std::sync::LazyLock; use std::sync::Weak; use arrow_schema::ArrowError; -use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::SchemaRef; use datafusion_common::DataFusionError; @@ -17,6 +17,7 @@ use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; +use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; @@ -48,11 +49,18 @@ use super::cache::VortexFileCache; use crate::convert::exprs::can_be_pushed_down; use crate::convert::exprs::make_vortex_predicate; +static DEFAULT_EXPR_ADAPTER: LazyLock> = + LazyLock::new(|| Arc::new(crate::adapter::DefaultPhysicalExprAdapterFactory)); + +static DEFAULT_SCHEMA_ADAPTER: LazyLock> = + LazyLock::new(|| Arc::new(DefaultSchemaAdapterFactory)); + #[derive(Clone)] pub(crate) struct VortexOpener { pub session: VortexSession, pub object_store: Arc, - /// Projection by index of the file's columns + /// Optional table schema projection. The indices are w.r.t. the `table_schema`, which is + /// all fields in the final scan result not including the partition columns. pub projection: Option>, /// Filter expression optimized for pushdown into Vortex scan operations. /// This may be a subset of file_pruning_predicate containing only expressions @@ -61,103 +69,27 @@ pub(crate) struct VortexOpener { /// Filter expression used by DataFusion's FilePruner to eliminate files based on /// statistics and partition values without opening them. pub file_pruning_predicate: Option, - pub expr_adapter_factory: Option>, - pub schema_adapter_factory: Arc, /// Hive-style partitioning columns pub partition_fields: Vec>, pub file_cache: VortexFileCache, - /// This is the table's schema without partition columns. It might be different than - /// the physical schema, and the stream's type will be a projection of it. - pub logical_schema: SchemaRef, + /// This is the table's schema without partition columns. It may contain fields which do + /// not exist in the file, and are supplied by the `schema_adapter_factory`. + pub table_schema: SchemaRef, + /// A hint for the desired row count of record batches returned from the scan. pub batch_size: usize, + /// If provided, the scan will not return more than this many rows. pub limit: Option, + /// A metrics object for tracking performance of the scan. pub metrics: VortexMetrics, + /// A shared cache of file readers. + /// + /// To save on the overhead of reparsing FlatBuffers and rebuilding the layout tree, we cache + /// a file reader the first time we read a file. pub layout_readers: Arc>>, /// Whether the query has output ordering specified pub has_output_ordering: bool, } -/// Merges the data types of two fields, preferring the logical type from the -/// table field. -fn merge_field_types(physical_field: &Field, table_field: &Field) -> DataType { - match (physical_field.data_type(), table_field.data_type()) { - (DataType::Struct(phys_fields), DataType::Struct(table_fields)) => { - let merged_fields = merge_fields(phys_fields, table_fields); - DataType::Struct(merged_fields.into()) - } - (DataType::List(phys_field), DataType::List(table_field)) => { - DataType::List(Arc::new(Field::new( - phys_field.name(), - merge_field_types(phys_field, table_field), - phys_field.is_nullable(), - ))) - } - (DataType::LargeList(phys_field), DataType::LargeList(table_field)) => { - DataType::LargeList(Arc::new(Field::new( - phys_field.name(), - merge_field_types(phys_field, table_field), - phys_field.is_nullable(), - ))) - } - _ => table_field.data_type().clone(), - } -} - -/// Merges two field collections, using logical types from table_fields where available. -/// Falls back to physical field types when no matching table field is found. -fn merge_fields( - physical_fields: &arrow_schema::Fields, - table_fields: &arrow_schema::Fields, -) -> Vec { - physical_fields - .iter() - .map(|phys_field| { - table_fields - .iter() - .find(|f| f.name() == phys_field.name()) - .map(|table_field| { - Field::new( - phys_field.name(), - merge_field_types(phys_field, table_field), - phys_field.is_nullable(), - ) - }) - .unwrap_or_else(|| (**phys_field).clone()) - }) - .collect() -} - -/// Computes a logical file schema from the physical file schema and the table -/// schema. -/// -/// For each field in the physical file schema, looks up the corresponding field -/// in the table schema and uses its logical type. -fn compute_logical_file_schema( - physical_file_schema: &SchemaRef, - table_schema: &SchemaRef, -) -> SchemaRef { - let logical_fields: Vec = physical_file_schema - .fields() - .iter() - .map(|physical_field| { - table_schema - .fields() - .find(physical_field.name()) - .map(|(_, table_field)| { - Field::new( - physical_field.name(), - merge_field_types(physical_field, table_field), - physical_field.is_nullable(), - ) - .with_metadata(physical_field.metadata().clone()) - }) - .unwrap_or_else(|| (**physical_field).clone()) - }) - .collect(); - - Arc::new(arrow_schema::Schema::new(logical_fields)) -} - impl FileOpener for VortexOpener { fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> DFResult { let session = self.session.clone(); @@ -165,26 +97,30 @@ impl FileOpener for VortexOpener { let projection = self.projection.clone(); let mut filter = self.filter.clone(); let file_pruning_predicate = self.file_pruning_predicate.clone(); - let expr_adapter_factory = self.expr_adapter_factory.clone(); + + // We create our custom expression and schema adapters. The schema adapter is just the + // DF default adapter. Our expression adapter is currently built to work around a bug + // in the upstream adapter. + // See: https://github.com/apache/datafusion/issues/17114 + let expr_adapter_factory = DEFAULT_EXPR_ADAPTER.clone(); + let schema_adapter_factory = DEFAULT_SCHEMA_ADAPTER.clone(); + let partition_fields = self.partition_fields.clone(); let file_cache = self.file_cache.clone(); - let logical_schema = self.logical_schema.clone(); + let table_schema = self.table_schema.clone(); let batch_size = self.batch_size; let limit = self.limit; let metrics = self.metrics.clone(); let layout_reader = self.layout_readers.clone(); let has_output_ordering = self.has_output_ordering; - let projected_schema = match projection.as_ref() { - None => logical_schema.clone(), - Some(indices) => Arc::new(logical_schema.project(indices)?), + let projected_table_schema = match projection.as_ref() { + None => table_schema.clone(), + Some(indices) => Arc::new(table_schema.project(indices)?), }; - let mut predicate_file_schema = logical_schema.clone(); - - let schema_adapter = self - .schema_adapter_factory - .create(projected_schema, logical_schema.clone()); + let schema_adapter = + schema_adapter_factory.create(projected_table_schema, table_schema.clone()); Ok(async move { // Create FilePruner when we have a predicate and either dynamic expressions @@ -200,7 +136,7 @@ impl FileOpener for VortexOpener { (is_dynamic_physical_expr(&predicate) | file.has_statistics()).then_some( FilePruner::new( predicate.clone(), - &logical_schema, + &table_schema, partition_fields.clone(), file.clone(), Count::default(), @@ -230,51 +166,38 @@ impl FileOpener for VortexOpener { DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}")) })?); - // Compute the logical file schema by merging physical file types with logical table types. - // This schema has the same field names as logical_schema, but with physical types from the file. - let logical_file_schema = - compute_logical_file_schema(&physical_file_schema, &logical_schema); - - if let Some(expr_adapter_factory) = expr_adapter_factory { - let partition_values = partition_fields - .iter() - .cloned() - .zip(file.partition_values) - .collect::>(); - - // The adapter rewrites the expression to the local file schema, allowing - // for schema evolution and divergence between the table's schema and individual files. - filter = filter - .map(|filter| { - let expr = expr_adapter_factory - .create(logical_file_schema.clone(), physical_file_schema.clone()) - .with_partition_values(partition_values) - .rewrite(filter)?; - - // Expression might now reference columns that don't exist in the file, so we can give it - // another simplification pass. - PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr) - }) - .transpose()?; - - predicate_file_schema = physical_file_schema; - } + let partition_values = partition_fields + .iter() + .cloned() + .zip(file.partition_values) + .collect::>(); + + // The adapter rewrites the expression to the local file schema, allowing + // for schema evolution and divergence between the table's schema and individual files. + filter = filter + .map(|filter| { + // Rewrite the filter to properly handle values in the table schema + let expr = expr_adapter_factory + .create(table_schema.clone(), physical_file_schema.clone()) + .with_partition_values(partition_values) + .rewrite(filter)?; + + // Expression might now reference columns that don't exist in the file, so we can give it + // another simplification pass. + PhysicalExprSimplifier::new(physical_file_schema.as_ref()) + .simplify(expr.clone()) + }) + .transpose()?; - // Use the pre-created schema adapter to map logical_file_schema to projected_schema. - // Since logical_file_schema has the same field names as logical_schema (which the adapter - // was created with), this works correctly and gives us the projection indices. let (schema_mapping, adapted_projections) = - schema_adapter.map_schema(&logical_file_schema)?; + schema_adapter.map_schema(&physical_file_schema)?; - // Build the Vortex projection expression using field names from logical_file_schema - let fields = adapted_projections - .iter() - .map(|&idx| { - let field = logical_file_schema.field(idx); - FieldName::from(field.name().as_str()) - }) - .collect::>(); - let projection_expr = select(fields, root()); + // We use the field names from pushdown expression instead. + let field_names: Vec = adapted_projections + .into_iter() + .map(|index| FieldName::from(physical_file_schema.field(index).name().as_str())) + .collect(); + let projection_expr = select(field_names, root()); // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once. let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) { @@ -318,7 +241,7 @@ impl FileOpener for VortexOpener { .and_then(|f| { let exprs = split_conjunction(&f) .into_iter() - .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) + .filter(|expr| can_be_pushed_down(expr, &physical_file_schema)) .collect::>(); make_vortex_predicate(&exprs).transpose() @@ -416,16 +339,15 @@ mod tests { use datafusion::arrow::array::RecordBatch; use datafusion::arrow::array::StringArray; use datafusion::arrow::array::StructArray; + use datafusion::arrow::compute::cast; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::util::display::FormatOptions; use datafusion::common::record_batch; - use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion::logical_expr::col; use datafusion::logical_expr::lit; use datafusion::physical_expr::planner::logical2physical; - use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory; - use datafusion::scalar::ScalarValue; + use datafusion_common::create_array; use insta::assert_snapshot; use itertools::Itertools; use object_store::ObjectMeta; @@ -517,96 +439,8 @@ mod tests { } } - #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))] - // If we don't have a physical expr adapter, we just drop filters on partition values - #[case(None, (1, 3), (1, 3))] - #[tokio::test] - async fn test_adapter_optimization_partition_column( - #[case] expr_adapter_factory: Option>, - #[case] expected_result1: (usize, usize), - #[case] expected_result2: (usize, usize), - ) -> anyhow::Result<()> { - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "part=1/file.vortex"; - let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - let data_size = - write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - let file_schema = batch.schema(); - let mut file = PartitionedFile::new(file_path.to_string(), data_size); - file.partition_values = vec![ScalarValue::Int32(Some(1))]; - - let table_schema = Arc::new(Schema::new(vec![ - Field::new("part", DataType::Int32, false), - Field::new("a", DataType::Int32, false), - ])); - - let make_opener = |filter| VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: file_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; - - // filter matches partition value - let filter = col("part").eq(lit(1)); - let filter = logical2physical(&filter, table_schema.as_ref()); - - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); - - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); - - assert_eq!((num_batches, num_rows), expected_result1); - - // filter doesn't matches partition value - let filter = col("part").eq(lit(2)); - let filter = logical2physical(&filter, table_schema.as_ref()); - - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); - - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); - assert_eq!((num_batches, num_rows), expected_result2); - - Ok(()) - } - - #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _))] - // If we don't have a physical expr adapter, we just drop filters on partition values. - // This is currently not supported, the work to support it requires to rewrite the predicate with appropriate casts. - // Seems like datafusion is moving towards having DefaultPhysicalExprAdapterFactory be always provided, which would make it work OOTB. - // See: https://github.com/apache/datafusion/issues/16800 - // #[case(None)] #[tokio::test] - async fn test_open_files_different_table_schema( - #[case] expr_adapter_factory: Option>, - ) -> anyhow::Result<()> { + async fn test_open_files_different_table_schema() -> anyhow::Result<()> { use datafusion::arrow::util::pretty::pretty_format_batches_with_options; let object_store = Arc::new(InMemory::new()) as Arc; @@ -629,11 +463,9 @@ mod tests { projection: Some([0].into()), filter: Some(filter), file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), @@ -717,11 +549,9 @@ mod tests { projection: Some([0, 1, 2].into()), filter: None, file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), @@ -808,11 +638,9 @@ mod tests { &table_schema, )), file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema, + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), @@ -834,6 +662,35 @@ mod tests { assert_eq!(data.len(), 1); assert_eq!(data[0].num_rows(), 3); + // Output should match + let expected_field1 = cast( + create_array!(Utf8, vec![Some("value1"), Some("value2"), Some("value3")]).as_ref(), + &DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + ) + .unwrap(); + let expected_field2 = cast( + create_array!(Utf8, vec![Some("a"), Some("b"), Some("c")]).as_ref(), + &DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), + ) + .unwrap(); + let expected_field3 = Arc::new(StringArray::new_null(3)); + + let my_struct = StructArray::new( + vec![ + Field::new("field1", expected_field1.data_type().clone(), true), + Field::new("field2", expected_field1.data_type().clone(), true), + Field::new("field3", DataType::Utf8, true), + ] + .into(), + vec![expected_field1, expected_field2, expected_field3], + None, + ); + + let expected_batch = + RecordBatch::try_new(table_schema.clone(), vec![Arc::new(my_struct)]).unwrap(); + + assert_eq!(data[0], expected_batch); + Ok(()) } @@ -877,11 +734,9 @@ mod tests { projection: Some(projection.into()), filter: None, file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), partition_fields: vec![], file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), + table_schema: table_schema.clone(), batch_size: 100, limit: None, metrics: Default::default(), diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index d42436eb652..4247f9610ae 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -13,12 +13,8 @@ use datafusion_common::config::ConfigOptions; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; use datafusion_datasource::file_stream::FileOpener; -use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory; -use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; use datafusion_physical_expr::conjunction; -use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory; -use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; @@ -57,8 +53,6 @@ pub struct VortexSource { pub(crate) projected_statistics: Option, /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema. pub(crate) arrow_file_schema: Option, - pub(crate) schema_adapter_factory: Option>, - pub(crate) expr_adapter_factory: Option>, _unused_df_metrics: ExecutionPlanMetricsSet, /// Shared layout readers, the source only lives as long as one scan. /// @@ -76,25 +70,10 @@ impl VortexSource { batch_size: None, projected_statistics: None, arrow_file_schema: None, - schema_adapter_factory: None, - expr_adapter_factory: None, _unused_df_metrics: Default::default(), layout_readers: Arc::new(DashMap::default()), } } - - /// Sets a [`PhysicalExprAdapterFactory`] for the [`VortexSource`]. - /// Currently, this must be provided in order to filter columns in files that have a different data type from the unified table schema. - /// - /// This factory will take precedence when opening files over instances provided by the [`FileScanConfig`]. - pub fn with_expr_adapter_factory( - &self, - expr_adapter_factory: Arc, - ) -> Arc { - let mut source = self.clone(); - source.expr_adapter_factory = Some(expr_adapter_factory); - Arc::new(source) - } } impl FileSource for VortexSource { @@ -113,31 +92,6 @@ impl FileSource for VortexSource { .batch_size .vortex_expect("batch_size must be supplied to VortexSource"); - let expr_adapter = self - .expr_adapter_factory - .as_ref() - .or(base_config.expr_adapter_factory.as_ref()); - let schema_adapter = self.schema_adapter_factory.as_ref(); - - // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details. - let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) { - (Some(expr_adapter), Some(schema_adapter)) => { - (Some(expr_adapter.clone()), schema_adapter.clone()) - } - (Some(expr_adapter), None) => ( - Some(expr_adapter.clone()), - Arc::new(DefaultSchemaAdapterFactory) as _, - ), - (None, Some(schema_adapter)) => { - // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory` - (None, schema_adapter.clone()) - } - (None, None) => ( - Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - Arc::new(DefaultSchemaAdapterFactory) as _, - ), - }; - let projection = base_config.file_column_projection_indices().map(Arc::from); let opener = VortexOpener { @@ -146,10 +100,8 @@ impl FileSource for VortexSource { projection, filter: self.vortex_predicate.clone(), file_pruning_predicate: self.full_predicate.clone(), - expr_adapter_factory, - schema_adapter_factory, partition_fields: base_config.table_partition_cols.clone(), - logical_schema: base_config.file_schema.clone(), + table_schema: base_config.file_schema.clone(), file_cache: self.file_cache.clone(), batch_size, limit: base_config.limit, @@ -300,17 +252,4 @@ impl FileSource for VortexSource { ) .with_updated_node(Arc::new(source) as _)) } - - fn with_schema_adapter_factory( - &self, - factory: Arc, - ) -> DFResult> { - let mut source = self.clone(); - source.schema_adapter_factory = Some(factory); - Ok(Arc::new(source)) - } - - fn schema_adapter_factory(&self) -> Option> { - self.schema_adapter_factory.clone() - } } diff --git a/vortex-datafusion/tests/schema_evolution.rs b/vortex-datafusion/tests/schema_evolution.rs new file mode 100644 index 00000000000..8fa97774cc2 --- /dev/null +++ b/vortex-datafusion/tests/schema_evolution.rs @@ -0,0 +1,417 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#![allow( + clippy::unwrap_in_result, + clippy::unwrap_used, + clippy::tests_outside_test_module +)] + +//! Test that checks we can evolve schemas in a compatible way across files. + +use std::sync::Arc; +use std::sync::LazyLock; + +use arrow_schema::DataType; +use arrow_schema::Field; +use arrow_schema::Schema; +use arrow_schema::SchemaRef; +use datafusion::arrow::array::RecordBatch; +use datafusion::arrow::array::StringViewArray; +use datafusion::arrow::array::{Array, Int32Array}; +use datafusion::arrow::array::{ArrayRef as ArrowArrayRef, StructArray}; +use datafusion::arrow::compute::concat_batches; +use datafusion::datasource::listing::ListingOptions; +use datafusion::datasource::listing::ListingTable; +use datafusion::datasource::listing::ListingTableConfig; +use datafusion::execution::SessionStateBuilder; +use datafusion::execution::context::SessionContext; +use datafusion_common::{create_array, record_batch}; +use datafusion_datasource::ListingTableUrl; +use datafusion_expr::col; +use datafusion_expr::lit; +use datafusion_functions::expr_fn::get_field; +use object_store::ObjectStore; +use object_store::memory::InMemory; +use object_store::path::Path; +use url::Url; +use vortex::ArrayRef; +use vortex::VortexSessionDefault; +use vortex::arrow::FromArrowArray; +use vortex::file::WriteOptionsSessionExt; +use vortex::io::ObjectStoreWriter; +use vortex::io::VortexWrite; +use vortex::session::VortexSession; +use vortex_datafusion::VortexFormat; +use vortex_datafusion::VortexFormatFactory; + +static SESSION: LazyLock = LazyLock::new(VortexSession::default); + +fn register_vortex_format_factory( + factory: VortexFormatFactory, + session_state_builder: &mut SessionStateBuilder, +) { + if let Some(table_factories) = session_state_builder.table_factories() { + table_factories.insert( + datafusion::common::GetExt::get_ext(&factory).to_uppercase(), // Has to be uppercase + Arc::new(datafusion::datasource::provider::DefaultTableFactory::new()), + ); + } + + if let Some(file_formats) = session_state_builder.file_formats() { + file_formats.push(Arc::new(factory)); + } +} + +fn make_session_ctx() -> (SessionContext, Arc) { + let factory: VortexFormatFactory = VortexFormatFactory::new(); + let mut session_state_builder = SessionStateBuilder::new().with_default_features(); + register_vortex_format_factory(factory, &mut session_state_builder); + let ctx = SessionContext::new_with_state(session_state_builder.build()); + let store = Arc::new(InMemory::new()); + ctx.register_object_store(&Url::parse("s3://in-memory/").unwrap(), store.clone()); + + (ctx, store) +} + +async fn write_file(store: &Arc, path: &str, records: &RecordBatch) { + let array = ArrayRef::from_arrow(records, false); + let path = Path::from_url_path(path).unwrap(); + let mut write = ObjectStoreWriter::new(store.clone(), &path).await.unwrap(); + SESSION + .write_options() + .write(&mut write, array.to_array_stream()) + .await + .unwrap(); + write.shutdown().await.unwrap(); +} + +#[tokio::test] +async fn test_filter_with_schema_evolution() { + let (ctx, store) = make_session_ctx(); + + // file1 only contains field "a" + write_file( + &store, + "files/file1.vortex", + &record_batch!(("a", Utf8, vec![Some("one"), Some("two"), Some("three")])).unwrap(), + ) + .await; + + // file2 only contains field "b" + write_file( + &store, + "files/file2.vortex", + &record_batch!(("b", Utf8, vec![Some("four"), Some("five"), Some("six")])).unwrap(), + ) + .await; + + // Read the table back as Vortex + let table_url = ListingTableUrl::parse("s3://in-memory/files").unwrap(); + let list_opts = ListingOptions::new(Arc::new(VortexFormat::new(SESSION.clone()))) + .with_session_config_options(ctx.state().config()) + .with_file_extension("vortex"); + + let table = ListingTable::try_new( + ListingTableConfig::new(table_url) + .with_listing_options(list_opts) + .infer_schema(&ctx.state()) + .await + .unwrap(), + ) + .unwrap(); + + let table = Arc::new(table); + + let df = ctx.read_table(table).unwrap(); + + let table_schema = Arc::new(df.schema().as_arrow().clone()); + + // Table schema contains both fields + assert_eq!( + table_schema.as_ref(), + &Schema::new(vec![ + Field::new("a", DataType::Utf8View, true), + Field::new("b", DataType::Utf8View, true), + ]) + ); + + // Filter the result to only ones with a column, i.e. only file1 + let result = df + .filter(col("a").is_not_null()) + .unwrap() + .collect() + .await + .unwrap(); + let table = concat_batches(&table_schema, result.iter()).unwrap(); + + // We read back the full table, with nulls filled in for missing fields + assert_eq!( + table, + record_batch( + &table_schema, + vec![ + // a + Arc::new(StringViewArray::from(vec![ + Some("one"), + Some("two"), + Some("three"), + ])) as ArrowArrayRef, + // b + Arc::new(StringViewArray::from(vec![ + Option::<&str>::None, + None, + None + ])) as ArrowArrayRef, + ] + ) + ); +} + +#[tokio::test] +async fn test_filter_schema_evolution_order() { + let (ctx, store) = make_session_ctx(); + + // file1 only contains field "a" + write_file( + &store, + "files/file1.vortex", + &record_batch!(("a", Int32, vec![Some(1), Some(3), Some(5)])).unwrap(), + ) + .await; + + // file2 containing fields "b" and "a", where "a" needs to be upcast at scan time. + write_file( + &store, + "files/file2.vortex", + &record_batch!( + ("b", Utf8, vec![Some("two"), Some("four"), Some("six")]), + ("a", Int16, vec![Some(2), Some(4), Some(6)]) + ) + .unwrap(), + ) + .await; + + // Read the table back as Vortex + let table_url = ListingTableUrl::parse("s3://in-memory/files").unwrap(); + let list_opts = ListingOptions::new(Arc::new(VortexFormat::new(SESSION.clone()))) + .with_session_config_options(ctx.state().config()) + .with_file_extension("vortex"); + + // We force the table schema, because file1/file2 have different types for the "a" column + let read_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8View, true), + ])); + + let table = ListingTable::try_new( + ListingTableConfig::new(table_url) + .with_listing_options(list_opts) + .with_schema(read_schema.clone()), + ) + .unwrap(); + + let table = Arc::new(table); + + let df = ctx.read_table(table.clone()).unwrap(); + + let table_schema = Arc::new(df.schema().as_arrow().clone()); + + // Table schema contains both fields + assert_eq!( + table_schema.as_ref(), + &Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Utf8View, true), + ]) + ); + + // Filter referencing the b column, which only appears in file2 + let result = df + .filter(col("b").eq(lit("two"))) + .unwrap() + .collect() + .await + .unwrap(); + let result = concat_batches(&table_schema, result.iter()).unwrap(); + + assert_eq!( + result, + record_batch( + &table_schema, + vec![ + // a + Arc::new(Int32Array::from(vec![Some(2)])) as ArrowArrayRef, + // b + Arc::new(StringViewArray::from(vec![Some("two"),])) as ArrowArrayRef, + ] + ) + ); + + // Filter on the "a" column, which has different types for each file + let result = ctx + .read_table(table) + .unwrap() + .filter(col("a").gt_eq(lit(3i16))) + .unwrap() + .collect() + .await + .unwrap(); + let table = concat_batches(&table_schema, result.iter()).unwrap(); + + // file1, then file2 + assert_eq!( + table, + record_batch( + &table_schema, + vec![ + // a field: present in both files + Arc::new(Int32Array::from(vec![Some(3), Some(5), Some(4), Some(6)])) + as ArrowArrayRef, + // b field: only present in file2, file1 fills with nulls + Arc::new(StringViewArray::from(vec![ + None, + None, + Some("four"), + Some("six") + ])) as ArrowArrayRef, + ] + ) + ); +} + +#[tokio::test] +async fn test_filter_schema_evolution_struct_fields() { + // Test for correct schema evolution behavior in the presence of nested struct fields. + // We use a hypothetical schema of some observability data with "wide records", struct columns + // with nullable payloads that may or may not be present for every file. + + let (ctx, store) = make_session_ctx(); + + fn make_metrics( + hostname: &str, + uptime: Vec, + instance: Option>>, + ) -> RecordBatch { + let values_array: ArrowArrayRef = create_array!(Int64, uptime); + let payload_array = if let Some(tags) = instance { + let tags_array: ArrowArrayRef = create_array!(Utf8, tags); + Arc::new(StructArray::new( + vec![ + Field::new("uptime", DataType::Int64, true), + Field::new("instance", DataType::Utf8, true), + ] + .into(), + vec![values_array, tags_array], + None, + )) + } else { + Arc::new(StructArray::new( + vec![Field::new("uptime", DataType::Int64, true)].into(), + vec![values_array], + None, + )) + }; + + let len = payload_array.len(); + let hostname_array = create_array!(Utf8, vec![Some(hostname); len]); + + let payload_type = payload_array.data_type().clone(); + let hostname_type = hostname_array.data_type().clone(); + + RecordBatch::from(StructArray::new( + vec![ + Field::new("hostname", hostname_type, true), + Field::new("payload", payload_type, true), + ] + .into(), + vec![hostname_array, payload_array], + None, + )) + } + + let host01 = make_metrics("host01.local", vec![1, 2, 3, 4], None); + let host02 = make_metrics( + "host02.local", + vec![10, 20, 30, 40], + // host02 has new logging code which adds the new "instance" nested field in its payload + Some(vec![Some("c6i"), Some("c6i"), Some("m5"), Some("r5")]), + ); + + // Write metrics files to storage + write_file(&store, "files/host01.vortex", &host01).await; + write_file(&store, "files/host02.vortex", &host02).await; + + // Read the table back as Vortex + let table_url = ListingTableUrl::parse("s3://in-memory/files").unwrap(); + let list_opts = ListingOptions::new(Arc::new(VortexFormat::new(SESSION.clone()))) + .with_session_config_options(ctx.state().config()) + .with_file_extension("vortex"); + + // We force the table schema to be the one inclusive of the new instance field. + let read_schema = host02.schema(); + + let table = ListingTable::try_new( + ListingTableConfig::new(table_url) + .with_listing_options(list_opts) + .with_schema(read_schema.clone()), + ) + .unwrap(); + + let table = Arc::new(table); + + let df = ctx.read_table(table.clone()).unwrap(); + + let table_schema = Arc::new(df.schema().as_arrow().clone()); + + // Table schema contains both fields + assert_eq!(table_schema.as_ref(), read_schema.as_ref(),); + + // Scan all the records, NULLs are filled in for nested optional fields. + let full_scan = df.collect().await.unwrap(); + let full_scan = concat_batches(&table_schema, full_scan.iter()).unwrap(); + + let expected = concat_batches( + &table_schema, + &[ + // host01 with extra nulls for the payload.instance field + make_metrics("host01.local", vec![1, 2, 3, 4], Some(vec![None; 4])), + host02, + ], + ) + .unwrap(); + assert_eq!(full_scan, expected); + + // run a filter that touches both the payload.uptime AND the payload.instance nested fields + let df = ctx.read_table(table.clone()).unwrap(); + let filtered_scan = df + .filter( + // payload.instance = 'c6i' OR payload.uptime < 10 + // We need to perform filtering over nested columns which don't exist in every + // file type. + get_field(col("payload"), "instance") + .eq(lit("c6i")) + .or(get_field(col("payload"), "uptime").lt(lit(10))), + ) + .unwrap() + .collect() + .await + .unwrap(); + let filtered_scan = concat_batches(&table_schema, filtered_scan.iter()).unwrap(); + let expected = concat_batches( + &table_schema, + &[ + make_metrics("host01", vec![1, 2, 3, 4], Some(vec![None; 4])), + make_metrics("host02", vec![10, 20], Some(vec![Some("c6i"), Some("c6i")])), + ], + ) + .unwrap(); + assert_eq!(filtered_scan, expected); +} + +fn record_batch( + schema: &SchemaRef, + fields: impl IntoIterator, +) -> RecordBatch { + RecordBatch::try_new(schema.clone(), fields.into_iter().collect()).unwrap() +}