diff --git a/Cargo.lock b/Cargo.lock index 64327fb86ac..4a8a6424c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8554,6 +8554,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "url", "vortex", "vortex-utils", "walkdir", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index c929b1befc1..cb0e2998e56 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -37,12 +37,14 @@ object_store = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } tokio-stream = { workspace = true } tracing = { workspace = true } +url = { workspace = true } vortex = { workspace = true, features = ["object_store", "tokio"] } vortex-utils = { workspace = true, features = ["dashmap"] } [dev-dependencies] anyhow = { workspace = true } datafusion = { workspace = true } +datafusion-common = { workspace = true } insta = { workspace = true } rstest = { workspace = true } tempfile = { workspace = true } diff --git a/vortex-datafusion/examples/vortex_table.rs b/vortex-datafusion/examples/vortex_table.rs index b98bbbe1106..7f96ffb13fc 100644 --- a/vortex-datafusion/examples/vortex_table.rs +++ b/vortex-datafusion/examples/vortex_table.rs @@ -79,7 +79,11 @@ async fn main() -> anyhow::Result<()> { ctx.register_table("vortex_tbl", listing_table as _)?; - run_query(&ctx, "SELECT * FROM vortex_tbl").await?; + run_query( + &ctx, + "SELECT * FROM vortex_tbl where numbers % 2 = 0 AND strings LIKE 'b%'", + ) + .await?; Ok(()) } diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index cdd194482c8..13e248f9946 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -38,18 +38,6 @@ use vortex::scalar::Scalar; use crate::convert::FromDataFusion; use crate::convert::TryFromDataFusion; -/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty. -pub(crate) fn make_vortex_predicate( - predicate: &[&Arc], -) -> VortexResult> { - let exprs = predicate - .iter() - .map(|e| Expression::try_from_df(e.as_ref())) - .collect::>>()?; - - Ok(exprs.into_iter().reduce(and)) -} - // TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep // for that node, up to any `and` or `or` node. impl TryFromDataFusion for Expression { @@ -245,7 +233,10 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> can_be_pushed_down(in_list.expr(), schema) && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) } else if let Some(scalar_fn) = expr.downcast_ref::() { - can_scalar_fn_be_pushed_down(scalar_fn, schema) + // Only get_field expressions should be pushed down. Note, we know that + // the GetFieldFunc call should be well-formed, because the DataFusion planner + // checks that for us before we even get to the DataSource. + ScalarFunctionExpr::try_downcast_func::(scalar_fn).is_some() } else { tracing::debug!(%df_expr, "DataFusion expression can't be pushed down"); false @@ -283,6 +274,7 @@ fn supported_data_types(dt: &DataType) -> bool { | Timestamp(_, _) | Time32(_) | Time64(_) + | Struct(_) ); if !is_supported { @@ -292,75 +284,47 @@ fn supported_data_types(dt: &DataType) -> bool { is_supported } -/// Checks if a GetField scalar function can be pushed down. -fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { - let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::(scalar_fn) - else { - // Only get_field pushdown is supported. - return false; - }; - - let args = get_field_fn.args(); - if args.len() != 2 { - tracing::debug!( - "Expected 2 arguments for GetField, not pushing down {} arguments", - args.len() - ); - return false; - } - let source_expr = &args[0]; - let field_name_expr = &args[1]; - let Some(field_name) = field_name_expr - .as_any() - .downcast_ref::() - .and_then(|lit| lit.value().try_as_str().flatten()) - else { - return false; - }; - - let Ok(source_dt) = source_expr.data_type(schema) else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type for GetField, not pushing down" - ); - return false; - }; - let DataType::Struct(fields) = source_dt else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type as struct for GetField, not pushing down" - ); - return false; - }; - fields.find(field_name).is_some() -} - #[cfg(test)] mod tests { + use std::any::Any; use std::sync::Arc; use arrow_schema::DataType; use arrow_schema::Field; - use arrow_schema::Fields; use arrow_schema::Schema; + use arrow_schema::SchemaBuilder; + use arrow_schema::SchemaRef; use arrow_schema::TimeUnit as ArrowTimeUnit; use datafusion::functions::core::getfield::GetFieldFunc; + use datafusion::logical_expr::ColumnarValue; + use datafusion::logical_expr::Signature; use datafusion_common::ScalarValue; + use datafusion_common::ToDFSchema; use datafusion_common::config::ConfigOptions; + use datafusion_datasource::file::FileSource; + use datafusion_expr::Expr; use datafusion_expr::Operator as DFOperator; + use datafusion_expr::ScalarFunctionArgs; use datafusion_expr::ScalarUDF; + use datafusion_expr::ScalarUDFImpl; + use datafusion_expr::Volatility; + use datafusion_expr::col; + use datafusion_expr::expr::ScalarFunction; + use datafusion_functions::expr_fn::get_field; use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::expressions as df_expr; + use datafusion_physical_plan::filter_pushdown::PushedDown; use insta::assert_snapshot; use rstest::rstest; + use vortex::VortexSessionDefault; use vortex::expr::Expression; use vortex::expr::Operator; + use vortex::session::VortexSession; use super::*; + use crate::VortexSource; + use crate::persistent::cache::VortexFileCache; #[rstest::fixture] fn test_schema() -> Schema { @@ -532,7 +496,7 @@ mod tests { DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), false )] - #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), false)] + #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), true)] // Dictionary types - should be supported if value type is supported #[case::dict_utf8( DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), @@ -674,33 +638,155 @@ mod tests { "#); } - #[rstest] - #[case::valid_field("field1", true)] - #[case::missing_field("nonexistent_field", false)] - fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) { - let struct_fields = Fields::from(vec![ - Field::new("field1", DataType::Utf8, true), - Field::new("field2", DataType::Int32, true), - ]); - let schema = Schema::new(vec![Field::new( - "my_struct", - DataType::Struct(struct_fields), - true, - )]); + #[test] + fn test_pushdown_nested_filter() { + // schema: + // a: struct + // |- one: i32 + // b:struct + // |- two: i32 + let mut test_schema = SchemaBuilder::new(); + test_schema.push(Field::new_struct( + "a", + vec![Field::new("one", DataType::Int32, false)], + false, + )); + test_schema.push(Field::new_struct( + "b", + vec![Field::new("two", DataType::Int32, false)], + false, + )); - let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc; - let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some( - field_name.to_string(), - )))) as Arc; + let test_schema = Arc::new(test_schema.finish()); + // Make sure filter is pushed down + let filter = get_field(col("b"), "two").eq(datafusion_expr::lit(10i32)); - let get_field_expr = Arc::new(ScalarFunctionExpr::new( - "get_field", - Arc::new(ScalarUDF::from(GetFieldFunc::new())), - vec![struct_col, field_name_lit], - Arc::new(Field::new(field_name, DataType::Utf8, true)), - Arc::new(ConfigOptions::new()), - )) as Arc; + let df_schema = test_schema.clone().to_dfschema().unwrap(); + + let physical_filter = logical2physical(&filter, df_schema.as_ref()); + + let source = vortex_source(&test_schema); + + let prop = source + .try_pushdown_filters(vec![physical_filter.clone()], &ConfigOptions::default()) + .unwrap(); + let updated_source = prop.updated_node.unwrap(); + let pushed_filters = updated_source.filter(); + assert_eq!(pushed_filters, Some(physical_filter)) + } + + #[test] + fn test_pushdown_deeply_nested_filter() { + // schema: + // a: struct + // |- b: struct + // |- c: i32 + let mut schema = SchemaBuilder::new(); + + let a = Field::new_struct( + "a", + vec![Field::new_struct( + "b", + vec![Field::new("c", DataType::Int32, false)], + false, + )], + false, + ); + schema.push(a); + + let schema = Arc::new(schema.finish()); + let df_schema = schema.clone().to_dfschema().unwrap(); + + let source = vortex_source(&schema); + + let deep_filter = get_field(get_field(col("a"), "b"), "c").eq(datafusion_expr::lit(10i32)); + + let physical_filter = logical2physical(&deep_filter, df_schema.as_ref()); + + let prop = source + .try_pushdown_filters(vec![physical_filter.clone()], &ConfigOptions::default()) + .unwrap(); + let updated_source = prop.updated_node.unwrap(); + let pushed_filters = updated_source.filter(); + assert_eq!(pushed_filters, Some(physical_filter)) + } + + #[test] + fn test_unknown_scalar_function() { + #[derive(Debug, PartialEq, Eq, Hash)] + pub struct UnknownImpl { + signature: Signature, + } + + impl ScalarUDFImpl for UnknownImpl { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "unknown" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result { + Ok(DataType::Int32) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))) + } + } + + // schema: + // a: struct + // |- b: struct + // |- c: i32 + let mut schema = SchemaBuilder::new(); + + let a = Field::new_struct( + "a", + vec![Field::new_struct( + "b", + vec![Field::new("c", DataType::Int32, false)], + false, + )], + false, + ); + schema.push(a); + + let schema = Arc::new(schema.finish()); + let df_schema = schema.clone().to_dfschema().unwrap(); + + let source = vortex_source(&schema); + + let unknown_func = Expr::ScalarFunction(ScalarFunction { + func: Arc::new(ScalarUDF::new_from_impl(UnknownImpl { + signature: Signature::nullary(Volatility::Immutable), + })), + args: vec![], + }); + + // Another weird ScalarFunction that we can't push down + let deep_filter = unknown_func.eq(datafusion_expr::lit(10i32)); + + let physical_filter = logical2physical(&deep_filter, df_schema.as_ref()); + + let prop = source + .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) + .unwrap(); + assert!(matches!(prop.filters[0], PushedDown::No)); + } + + fn vortex_source(schema: &SchemaRef) -> Arc { + let session = VortexSession::default(); + let cache = VortexFileCache::new(1024, 1024, session.clone()); - assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected); + Arc::new(VortexSource::new(session, cache)).with_schema(schema.clone()) } } diff --git a/vortex-datafusion/src/convert/mod.rs b/vortex-datafusion/src/convert/mod.rs index d40be1d38fc..1fbdf1a30c8 100644 --- a/vortex-datafusion/src/convert/mod.rs +++ b/vortex-datafusion/src/convert/mod.rs @@ -4,6 +4,7 @@ use vortex::error::VortexResult; pub(crate) mod exprs; +pub(crate) mod ranges; mod scalars; /// First-party trait for implementing conversion from DataFusion types to Vortex types. diff --git a/vortex-datafusion/src/convert/ranges.rs b/vortex-datafusion/src/convert/ranges.rs new file mode 100644 index 00000000000..5bb995fe8b0 --- /dev/null +++ b/vortex-datafusion/src/convert/ranges.rs @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use datafusion_datasource::FileRange; +use vortex::ArrayRef; +use vortex::scan::ScanBuilder; + +/// If the file has a [`FileRange`](datafusion::datasource::listing::FileRange), we translate it into a row range in the file for the scan. +pub(crate) fn apply_byte_range( + file_range: FileRange, + total_size: u64, + row_count: u64, + scan_builder: ScanBuilder, +) -> ScanBuilder { + let row_range = byte_range_to_row_range( + file_range.start as u64..file_range.end as u64, + row_count, + total_size, + ); + + scan_builder.with_row_range(row_range) +} + +fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u64) -> Range { + let average_row = total_size / row_count; + assert!(average_row > 0, "A row must always have at least one byte"); + + let start_row = byte_range.start / average_row; + let end_row = byte_range.end / average_row; + + // We take the min here as `end_row` might overshoot + start_row..u64::min(row_count, end_row) +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use itertools::Itertools; + use rstest::rstest; + + use crate::convert::ranges::byte_range_to_row_range; + + #[rstest] + #[case(0..100, 100, 100, 0..100)] + #[case(0..105, 100, 105, 0..100)] + #[case(0..50, 100, 105, 0..50)] + #[case(50..105, 100, 105, 50..100)] + #[case(0..1, 4, 8, 0..0)] + #[case(1..8, 4, 8, 0..4)] + fn test_range_translation( + #[case] byte_range: Range, + #[case] row_count: u64, + #[case] total_size: u64, + #[case] expected: Range, + ) { + assert_eq!( + byte_range_to_row_range(byte_range, row_count, total_size), + expected + ); + } + + #[test] + fn test_consecutive_ranges() { + let row_count = 100; + let total_size = 429; + let bytes_a = 0..143; + let bytes_b = 143..286; + let bytes_c = 286..429; + + let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size); + let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size); + let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size); + + assert_eq!(rows_a.end - rows_a.start, 35); + assert_eq!(rows_b.end - rows_b.start, 36); + assert_eq!(rows_c.end - rows_c.start, 29); + + assert_eq!(rows_a.start, 0); + assert_eq!(rows_c.end, 100); + for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() { + assert_eq!(left.end, right.start); + } + } +} diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 9be23b7d804..4e9458758df 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors //! Persistent implementation of a Vortex table provider. -mod cache; +pub(crate) mod cache; mod format; pub mod metrics; mod opener; diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index d20844efbc0..2e59c59fa54 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::ops::Range; use std::sync::Arc; use std::sync::Weak; @@ -12,13 +11,15 @@ use arrow_schema::SchemaRef; use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; use datafusion_common::arrow::array::RecordBatch; -use datafusion_datasource::FileRange; +use datafusion_common::arrow::compute::filter_record_batch; +use datafusion_common::cast::as_boolean_array; use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::conjunction; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr::split_conjunction; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -32,11 +33,11 @@ use futures::stream; use object_store::ObjectStore; use object_store::path::Path; use tracing::Instrument; -use vortex::ArrayRef; use vortex::dtype::FieldName; use vortex::error::VortexError; +use vortex::expr; +use vortex::expr::Expression; use vortex::expr::root; -use vortex::expr::select; use vortex::layout::LayoutReader; use vortex::metrics::VortexMetrics; use vortex::scan::ScanBuilder; @@ -45,8 +46,9 @@ use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; use super::cache::VortexFileCache; +use crate::convert::TryFromDataFusion; use crate::convert::exprs::can_be_pushed_down; -use crate::convert::exprs::make_vortex_predicate; +use crate::convert::ranges::apply_byte_range; #[derive(Clone)] pub(crate) struct VortexOpener { @@ -230,6 +232,9 @@ impl FileOpener for VortexOpener { DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}")) })?); + let logical_file_schema = + compute_logical_file_schema(&physical_file_schema, &logical_schema); + if let Some(expr_adapter_factory) = expr_adapter_factory { let partition_values = partition_fields .iter() @@ -241,13 +246,8 @@ impl FileOpener for VortexOpener { // for schema evolution and divergence between the table's schema and individual files. filter = filter .map(|filter| { - let logical_file_schema = compute_logical_file_schema( - &physical_file_schema.clone(), - &logical_schema, - ); - let expr = expr_adapter_factory - .create(logical_file_schema, physical_file_schema.clone()) + .create(logical_file_schema.clone(), physical_file_schema.clone()) .with_partition_values(partition_values) .rewrite(filter)?; @@ -263,52 +263,19 @@ impl FileOpener for VortexOpener { // Create the initial mapping from physical file schema to projected schema. // This gives us the field reordering and tells us which logical schema fields // to select. - let (_schema_mapping, adapted_projections) = - schema_adapter.map_schema(&physical_file_schema)?; + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(&logical_file_schema)?; // Build the Vortex projection expression using the adapted projections. // This will reorder the fields to match the target order. let fields = adapted_projections .iter() - .map(|idx| { - let field = logical_schema.field(*idx); + .map(|&idx| { + let field = logical_file_schema.field(idx); FieldName::from(field.name().as_str()) }) .collect::>(); - let projection_expr = select(fields, root()); - - // After Vortex applies the projection, the batch will have fields in the target - // order (matching adapted_projections), but with the physical file types. - // We need a second schema mapping for type casting only, not reordering. - // Build a schema that represents what Vortex will return: fields in target order - // with physical types. - let projected_physical_fields: Vec = adapted_projections - .iter() - .map(|&idx| { - let logical_field = logical_schema.field(idx); - let field_name = logical_field.name(); - - // Find this field in the physical schema to get its physical type - physical_file_schema - .field_with_name(field_name) - .map(|phys_field| { - Field::new( - field_name, - merge_field_types(phys_field, logical_field), - phys_field.is_nullable(), - ) - }) - .unwrap_or_else(|_| (*logical_field).clone()) - }) - .collect(); - - let projected_physical_schema = - Arc::new(arrow_schema::Schema::new(projected_physical_fields)); - - // Create a second mapping from the projected physical schema (what Vortex returns) - // to the final projected schema. This mapping will handle type casting without reordering. - let (batch_schema_mapping, _) = - schema_adapter.map_schema(&projected_physical_schema)?; + let projection_expr = expr::select(fields, root()); // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once. let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) { @@ -348,20 +315,49 @@ impl FileOpener for VortexOpener { ); } - let filter = filter - .and_then(|f| { - let exprs = split_conjunction(&f) - .into_iter() - .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) - .collect::>(); + // Split the filter expressions into those that can be applied within the file scan, + // and those that need to be applied afterward in-memory. + let mut pushed_filters = Vec::new(); + let mut post_filters = Vec::new(); + + if let Some(filter) = filter { + for expr in split_conjunction(&filter) { + if can_be_pushed_down(expr, &predicate_file_schema) + && let Ok(vortex_expr) = Expression::try_from_df(expr.as_ref()) + { + pushed_filters.push(vortex_expr); + } else { + post_filters.push(expr.clone()); + } + } + } + + let pushed_filter = pushed_filters.into_iter().reduce(expr::and); + let post_filter: Box) -> DFResult + Send> = + if post_filters.is_empty() { + Box::new(|batch: DFResult| batch) + } else { + let conjunction = conjunction(post_filters.clone()); + Box::new( + move |batch: DFResult| -> DFResult { + let batch = batch?; + let filter = conjunction.evaluate(&batch)?; + let filter = filter.into_array(batch.num_rows())?; + let filter = as_boolean_array(&filter)?; + filter_record_batch(&batch, filter).map_err(DataFusionError::from) + }, + ) + }; - make_vortex_predicate(&exprs).transpose() - }) - .transpose() - .map_err(|e| DataFusionError::External(e.into()))?; + tracing::debug!( + ?pushed_filter, + ?post_filters, + ?projection_expr, + "opening file with predicates and projection" + ); if let Some(limit) = limit - && filter.is_none() + && pushed_filter.is_none() { scan_builder = scan_builder.with_limit(limit); } @@ -369,7 +365,7 @@ impl FileOpener for VortexOpener { let stream = scan_builder .with_metrics(metrics) .with_projection(projection_expr) - .with_some_filter(filter) + .with_some_filter(pushed_filter) .with_ordered(has_output_ordering) .map(|chunk| RecordBatch::try_from(chunk.as_ref())) .into_stream() @@ -404,7 +400,12 @@ impl FileOpener for VortexOpener { )))) }) .try_flatten() - .map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b))) + .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) + // Apply the post-filter step, which will execute any filters that couldn't + // be pushed down for this file. This is applicable for any filters over fields + // missing from the file schema that exist in the table schema, and are filled in + // from the schema adapter. + .map(post_filter) .boxed(); Ok(stream) @@ -414,274 +415,230 @@ impl FileOpener for VortexOpener { } } -/// If the file has a [`FileRange`](datafusion::datasource::listing::FileRange), we translate it into a row range in the file for the scan. -fn apply_byte_range( - file_range: FileRange, - total_size: u64, - row_count: u64, - scan_builder: ScanBuilder, -) -> ScanBuilder { - let row_range = byte_range_to_row_range( - file_range.start as u64..file_range.end as u64, - row_count, - total_size, - ); - - scan_builder.with_row_range(row_range) -} - -fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u64) -> Range { - let average_row = total_size / row_count; - assert!(average_row > 0, "A row must always have at least one byte"); - - let start_row = byte_range.start / average_row; - let end_row = byte_range.end / average_row; - - // We take the min here as `end_row` might overshoot - start_row..u64::min(row_count, end_row) -} - #[cfg(test)] mod tests { - use std::sync::LazyLock; + use std::str::FromStr; use arrow_schema::Fields; + use chrono::DateTime; use chrono::Utc; use datafusion::arrow::array::RecordBatch; - use datafusion::arrow::array::StringArray; use datafusion::arrow::array::StructArray; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::util::display::FormatOptions; + use datafusion::arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::record_batch; - use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion::logical_expr::col; use datafusion::logical_expr::lit; use datafusion::physical_expr::planner::logical2physical; - use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory; - use datafusion::scalar::ScalarValue; + use datafusion::prelude::SessionContext; + use datafusion_common::config::ConfigOptions; + use datafusion_common::create_array; + use datafusion_datasource::file::FileSource; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_physical_plan::filter_pushdown::PushedDown; + use futures::pin_mut; use insta::assert_snapshot; - use itertools::Itertools; use object_store::ObjectMeta; use object_store::memory::InMemory; use rstest::rstest; + use url::Url; + use vortex::ArrayRef; use vortex::VortexSessionDefault; use vortex::arrow::FromArrowArray; use vortex::file::WriteOptionsSessionExt; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; use vortex::session::VortexSession; + use vortex_utils::aliases::hash_map::HashMap; use super::*; + use crate::VortexSource; - static SESSION: LazyLock = LazyLock::new(VortexSession::default); - - #[rstest] - #[case(0..100, 100, 100, 0..100)] - #[case(0..105, 100, 105, 0..100)] - #[case(0..50, 100, 105, 0..50)] - #[case(50..105, 100, 105, 50..100)] - #[case(0..1, 4, 8, 0..0)] - #[case(1..8, 4, 8, 0..4)] - fn test_range_translation( - #[case] byte_range: Range, - #[case] row_count: u64, - #[case] total_size: u64, - #[case] expected: Range, - ) { - assert_eq!( - byte_range_to_row_range(byte_range, row_count, total_size), - expected - ); + /// Fixtures used for integration testing the FileSource and FileOpener + struct TestFixtures { + object_store: Arc, + // We need to return this to the caller to prevent the session context from + // being dropped and the object_store from being removed + #[allow(dead_code)] + session_context: SessionContext, + source: Arc, + files: Files, } - #[test] - fn test_consecutive_ranges() { - let row_count = 100; - let total_size = 429; - let bytes_a = 0..143; - let bytes_b = 143..286; - let bytes_c = 286..429; - - let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size); - let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size); - let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size); - - assert_eq!(rows_a.end - rows_a.start, 35); - assert_eq!(rows_b.end - rows_b.start, 36); - assert_eq!(rows_c.end - rows_c.start, 29); - - assert_eq!(rows_a.start, 0); - assert_eq!(rows_c.end, 100); - for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() { - assert_eq!(left.end, right.start); - } + struct Files { + file_meta: HashMap, } - async fn write_arrow_to_vortex( - object_store: Arc, - path: &str, - rb: RecordBatch, - ) -> anyhow::Result { - let array = ArrayRef::from_arrow(rb, false); - let path = Path::parse(path)?; - - let mut write = ObjectStoreWriter::new(object_store, &path).await?; - let summary = SESSION - .write_options() - .write(&mut write, array.to_array_stream()) - .await?; - write.shutdown().await?; - - Ok(summary.size()) + impl Files { + fn get(&self, path: &str) -> (FileMeta, PartitionedFile) { + let file = self + .file_meta + .get(path) + .unwrap_or_else(|| panic!("Missing file {}", path)); + ( + file.clone(), + PartitionedFile::new(path, file.object_meta.size), + ) + } } - fn make_meta(path: &str, data_size: u64) -> FileMeta { - FileMeta { - object_meta: ObjectMeta { - location: Path::from(path), - last_modified: Utc::now(), - size: data_size, - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, + // Make a set of files and record batches + async fn make_source( + files: HashMap, + file_schema: &SchemaRef, + ) -> anyhow::Result { + let session = VortexSession::default(); + + let ctx = SessionContext::new(); + + let store: Arc = Arc::new(InMemory::new()); + + ctx.register_object_store(&Url::from_str("s3://in-memory")?, store.clone()); + + // "write" all the record batches to the named file paths + let mut file_meta = HashMap::with_capacity(files.len()); + + for (path_str, rb) in files.iter() { + let array = ArrayRef::from_arrow(rb, false); + let path = Path::from_url_path(path_str.as_str())?; + let mut write = ObjectStoreWriter::new(store.clone(), &path).await?; + let summary = session + .write_options() + .write(&mut write, array.to_array_stream()) + .await?; + write.shutdown().await?; + + file_meta.insert( + path_str.clone(), + FileMeta::from(ObjectMeta { + location: path.clone(), + size: summary.size(), + e_tag: None, + version: None, + last_modified: DateTime::::from_timestamp_secs(0).unwrap(), + }), + ); } + + let source = VortexSource::new( + session.clone(), + VortexFileCache::new(1024, 1024, session.clone()), + ); + let source = source + .with_schema(Arc::clone(file_schema)) + .with_batch_size(100); + + Ok(TestFixtures { + session_context: ctx, + object_store: store, + files: Files { file_meta }, + source, + }) } #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))] - // If we don't have a physical expr adapter, we just drop filters on partition values - #[case(None, (1, 3), (1, 3))] #[tokio::test] - async fn test_adapter_optimization_partition_column( - #[case] expr_adapter_factory: Option>, - #[case] expected_result1: (usize, usize), - #[case] expected_result2: (usize, usize), - ) -> anyhow::Result<()> { - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "part=1/file.vortex"; - let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - let data_size = - write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - let file_schema = batch.schema(); - let mut file = PartitionedFile::new(file_path.to_string(), data_size); - file.partition_values = vec![ScalarValue::Int32(Some(1))]; + async fn test_do_not_pushdown_filter_on_partition_columns() -> anyhow::Result<()> { + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?; + let file_schema = batch.schema().clone(); + let files = HashMap::from_iter([("part=1/file.vortex".to_string(), batch)]); let table_schema = Arc::new(Schema::new(vec![ Field::new("part", DataType::Int32, false), Field::new("a", DataType::Int32, false), ])); - let make_opener = |filter| VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: file_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let TestFixtures { + source, + object_store, + files, + .. + } = make_source(files, &file_schema).await?; - // filter matches partition value - let filter = col("part").eq(lit(1)); - let filter = logical2physical(&filter, table_schema.as_ref()); + // Attempting to push filters over partitions should fail. + let filter_partition_col = col("part").eq(lit(1i32)); + let filter_partition_col = logical2physical(&filter_partition_col, table_schema.as_ref()); - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); + let push_filters = + source.try_pushdown_filters(vec![filter_partition_col], &ConfigOptions::default())?; - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); + let source = push_filters.updated_node.unwrap(); - assert_eq!((num_batches, num_rows), expected_result1); + assert!(matches!(push_filters.filters[0], PushedDown::No)); - // filter doesn't matches partition value - let filter = col("part").eq(lit(2)); - let filter = logical2physical(&filter, table_schema.as_ref()); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + file_schema.clone(), + source.clone(), + ) + .build(); - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); + // Create an opener with this + let opener = source.create_file_opener(object_store.clone(), &base_config, 0); - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); - assert_eq!((num_batches, num_rows), expected_result2); + let (file1, part_file1) = files.get("part=1/file.vortex"); + + let open_result = opener.open(file1, part_file1)?.await?; + + pin_mut!(open_result); + let mut rbs = open_result.try_collect::>().await?; + assert_eq!(rbs.len(), 1); + + let rb = rbs.pop().unwrap(); + assert_eq!(rb.num_rows(), 3); + let expected = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?; + + assert_eq!(rb, expected); Ok(()) } - #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _))] - // If we don't have a physical expr adapter, we just drop filters on partition values. - // This is currently not supported, the work to support it requires to rewrite the predicate with appropriate casts. // Seems like datafusion is moving towards having DefaultPhysicalExprAdapterFactory be always provided, which would make it work OOTB. // See: https://github.com/apache/datafusion/issues/16800 - // #[case(None)] #[tokio::test] - async fn test_open_files_different_table_schema( - #[case] expr_adapter_factory: Option>, - ) -> anyhow::Result<()> { - use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - - let object_store = Arc::new(InMemory::new()) as Arc; - let file1_path = "/path/file1.vortex"; - let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - let data_size1 = write_arrow_to_vortex(object_store.clone(), file1_path, batch1).await?; - let file1 = PartitionedFile::new(file1_path.to_string(), data_size1); - - let file2_path = "/path/file2.vortex"; - let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)])).unwrap(); - let data_size2 = write_arrow_to_vortex(object_store.clone(), file2_path, batch2).await?; - let file2 = PartitionedFile::new(file1_path.to_string(), data_size1); + async fn test_open_files_different_table_schema() -> anyhow::Result<()> { + let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?; + let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)]))?; + let files = HashMap::from_iter([ + ("path/file1.vortex".to_string(), batch1), + ("path/file2.vortex".to_string(), batch2), + ]); // Table schema has can accommodate both files let table_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); - let make_opener = |filter| VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let TestFixtures { + object_store, + source, + files, + .. + } = make_source(files, &table_schema).await?; + + let (file1, part_file1) = files.get("path/file1.vortex"); + let (file2, part_file2) = files.get("path/file2.vortex"); + // Try and push filters into the source. let filter = col("a").lt(lit(100_i32)); let filter = logical2physical(&filter, table_schema.as_ref()); + let pushdown_result = + source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?; - let opener1 = make_opener(filter.clone()); - let stream = opener1 - .open(make_meta(file1_path, data_size1), file1)? - .await?; + let source = pushdown_result.updated_node.unwrap(); + + assert_eq!(source.filter(), Some(filter)); + + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + table_schema.clone(), + source.clone(), + ) + .build(); + + let opener = source.create_file_opener(object_store.clone(), &base_config, 0); + let stream = opener.open(file1, part_file1)?.await?; let format_opts = FormatOptions::new().with_types_info(true); @@ -697,11 +654,7 @@ mod tests { +-------+ "); - let opener2 = make_opener(filter.clone()); - let stream = opener2 - .open(make_meta(file2_path, data_size2), file2)? - .await?; - + let stream = opener.open(file2, part_file2)?.await?; let data = stream.try_collect::>().await?; assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" +-------+ @@ -725,18 +678,30 @@ mod tests { async fn test_schema_different_column_order() -> anyhow::Result<()> { use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "/path/file.vortex"; - - // File has columns in order: c, b, a + // File has field order c,b,a let batch = record_batch!( ("c", Int32, vec![Some(300), Some(301), Some(302)]), ("b", Int32, vec![Some(200), Some(201), Some(202)]), ("a", Int32, vec![Some(100), Some(101), Some(102)]) - ) - .unwrap(); - let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?; - let file = PartitionedFile::new(file_path.to_string(), data_size); + )?; + + // table schema has field order a,b,c + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ])); + + let files = HashMap::from_iter([("path/file1.vortex".to_string(), batch)]); + + let TestFixtures { + source, + files, + object_store, + .. + } = make_source(files, &table_schema).await?; + + let (file1, part_file1) = files.get("path/file1.vortex"); // Table schema has columns in different order: a, b, c let table_schema = Arc::new(Schema::new(vec![ @@ -745,26 +710,16 @@ mod tests { Field::new("c", DataType::Int32, true), ])); - let opener = VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0, 1, 2].into()), - filter: None, - file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + table_schema.clone(), + source.clone(), + ) + .build(); + let opener = source.create_file_opener(object_store, &base_config, 0); // The opener should successfully open the file and reorder columns - let stream = opener.open(make_meta(file_path, data_size), file)?.await?; + let stream = opener.open(file1, part_file1)?.await?; let format_opts = FormatOptions::new().with_types_info(true); let data = stream.try_collect::>().await?; @@ -790,83 +745,94 @@ mod tests { // a nested schema mismatch between the physical file schema and logical // table schema. async fn test_adapter_logical_physical_struct_mismatch() -> anyhow::Result<()> { - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "/path/file.vortex"; - let file_struct_fields = Fields::from(vec![ - Field::new("field1", DataType::Utf8, true), - Field::new("field2", DataType::Utf8, true), - ]); + let field1 = create_array!(Utf8, vec![Some("value1"), Some("value2"), Some("value3")]); + let field2 = create_array!(Utf8, vec![Some("a"), Some("b"), Some("c")]); + let struct_array = StructArray::new( - file_struct_fields.clone(), - vec![ - Arc::new(StringArray::from(vec!["value1", "value2", "value3"])), - Arc::new(StringArray::from(vec!["a", "b", "c"])), - ], + Fields::from(vec![ + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), + ]), + vec![field1.clone(), field2.clone()], None, ); - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new( - "my_struct", - DataType::Struct(file_struct_fields), - true, - )])), - vec![Arc::new(struct_array)], - )?; - let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?; - // Table schema has an extra utf8 field. + // file schema reflects the data + let file_schema = Arc::new(Schema::new(vec![Field::new( + "my_struct", + DataType::Struct(Fields::from(vec![ + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), + ])), + true, + )])); + + // Table schema has an extra inner utf8 field. let table_schema = Arc::new(Schema::new(vec![Field::new( "my_struct", DataType::Struct(Fields::from(vec![ - Field::new( - "field1", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), - true, - ), - Field::new( - "field2", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), - true, - ), + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), Field::new("field3", DataType::Utf8, true), ])), true, )])); - let opener = VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: None, - filter: Some(logical2physical( - &col("my_struct").is_not_null(), - &table_schema, - )), - file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema, - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let batch = RecordBatch::try_new(file_schema.clone(), vec![Arc::new(struct_array)])?; + + let files = HashMap::from_iter([("path/file.vortex".to_string(), batch.clone())]); + + let TestFixtures { + source, + files, + object_store, + .. + } = make_source(files, &table_schema).await?; + + let filter = logical2physical(&col("my_struct").is_not_null(), &table_schema); + let pushdown_result = + source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?; + + let source = pushdown_result.updated_node.unwrap(); + assert_eq!(source.filter(), Some(filter)); + + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + table_schema.clone(), + source.clone(), + ) + .build(); + + let opener = source.create_file_opener(object_store.clone(), &base_config, 0); + + let (file, part_file) = files.get("path/file.vortex"); - // The opener should be able to open the file with a filter on the - // struct column. let data = opener - .open( - make_meta(file_path, data_size), - PartitionedFile::new(file_path.to_string(), data_size), - )? + .open(file, part_file)? .await? .try_collect::>() .await?; assert_eq!(data.len(), 1); - assert_eq!(data[0].num_rows(), 3); + + // The opener will return batches that have been adapted with the extra "field3" with + // nulls added. + let field3 = create_array!(Utf8, vec![Option::::None; 3]); + let table_struct = StructArray::new( + Fields::from(vec![ + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), + Field::new("field3", DataType::Utf8, true), + ]), + vec![field1, field2, field3], + None, + ); + + let batch_with_nulls = + RecordBatch::try_new(table_schema.clone(), vec![Arc::new(table_struct)])?; + + // The opener returns us a stream where field3 is replaced with nulls + assert_eq!(data[0], batch_with_nulls); Ok(()) } diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index d42436eb652..b4dd9207f96 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -52,11 +52,11 @@ pub struct VortexSource { pub(crate) full_predicate: Option, /// Subset of predicates that can be pushed down into Vortex scan operations. /// These are expressions that Vortex can efficiently evaluate during scanning. - pub(crate) vortex_predicate: Option, + pub(crate) pushed_predicate: Option, pub(crate) batch_size: Option, pub(crate) projected_statistics: Option, /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema. - pub(crate) arrow_file_schema: Option, + pub(crate) table_schema: Option, pub(crate) schema_adapter_factory: Option>, pub(crate) expr_adapter_factory: Option>, _unused_df_metrics: ExecutionPlanMetricsSet, @@ -72,10 +72,10 @@ impl VortexSource { session, file_cache, full_predicate: None, - vortex_predicate: None, + pushed_predicate: None, batch_size: None, projected_statistics: None, - arrow_file_schema: None, + table_schema: None, schema_adapter_factory: None, expr_adapter_factory: None, _unused_df_metrics: Default::default(), @@ -144,7 +144,7 @@ impl FileSource for VortexSource { session: self.session.clone(), object_store, projection, - filter: self.vortex_predicate.clone(), + filter: self.pushed_predicate.clone(), file_pruning_predicate: self.full_predicate.clone(), expr_adapter_factory, schema_adapter_factory, @@ -173,7 +173,7 @@ impl FileSource for VortexSource { fn with_schema(&self, schema: SchemaRef) -> Arc { let mut source = self.clone(); - source.arrow_file_schema = Some(schema); + source.table_schema = Some(schema); Arc::new(source) } @@ -188,7 +188,7 @@ impl FileSource for VortexSource { } fn filter(&self) -> Option> { - self.vortex_predicate.clone() + self.pushed_predicate.clone() } fn metrics(&self) -> &ExecutionPlanMetricsSet { @@ -201,7 +201,7 @@ impl FileSource for VortexSource { .clone() .vortex_expect("projected_statistics must be set"); - if self.vortex_predicate.is_some() { + if self.pushed_predicate.is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -215,13 +215,13 @@ impl FileSource for VortexSource { fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.pushed_predicate { write!(f, ", predicate: {predicate}")?; } } // Use TreeRender style key=value formatting to display the predicate DisplayFormatType::TreeRender => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.pushed_predicate { writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; }; } @@ -240,7 +240,8 @@ impl FileSource for VortexSource { )); } - let Some(schema) = self.arrow_file_schema.as_ref() else { + // only try and push filters if we know the schema + let Some(schema) = self.table_schema.as_ref() else { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( vec![PushedDown::No; filters.len()], )); @@ -257,6 +258,7 @@ impl FileSource for VortexSource { None => Some(conjunction(filters.clone())), }; + // Update the predicate with any pushed filters let supported_filters = filters .into_iter() .map(|expr| { @@ -286,14 +288,14 @@ impl FileSource for VortexSource { }) .cloned(); - let predicate = match source.vortex_predicate { + let predicate = match source.pushed_predicate { Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)), None => conjunction(supported), }; - tracing::debug!(%predicate, "Saving predicate"); + tracing::debug!(%predicate, "updating predicate with new filters"); - source.vortex_predicate = Some(predicate); + source.pushed_predicate = Some(predicate); Ok(FilterPushdownPropagation::with_parent_pushdown_result( supported_filters.iter().map(|f| f.discriminant).collect(),