From 82a80b665f69e001b101171b99af4685ac34097a Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 19 Nov 2025 14:35:07 -0500 Subject: [PATCH 01/16] fix: filter pushdown for nested fields In #5295, we accidentally broke nested filter pushdown. The issue is that FileSource::try_pushdown_filters seems like it's meant to evaluate using the whole file schema, rather than any projected schema. As an example, in the Github Archive benchmark dataset, we have the following query, which should trivially pushdown and be pruned, executing about 30ms or so: ``` SELECT COUNT(*) from events WHERE payload.ref = 'refs/head/main' ``` However, after this change, pushdown of this field was failing, pushing query time up 100x. The root cause is that the old logic attempted to apply the file schema to the source_expr directly. Concretely, for the gharchive query, the whole expression is something like: ```text BinaryExpr { lhs: GetField { source_expr: Column { name: "payload", index: 0 }, field_expr: Literal { value: "ref" } } rhs: Literal { value: "refs/head/main" } operator: Eq } ``` The issue is that the column index 0 is wrong for the whole file. Instead, we need to recursively ensure that the source_expr is a valid sequence of Column and GetField expressions that resolve properly. Note how we already were doing this for checking if a standalone Column expression can be pushed down: ``` } else if let Some(col) = expr.downcast_ref::() { schema .field_with_name(col.name()) .ok() .is_some_and(|field| supported_data_types(field.data_type())) ``` Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 85 ++++++++++++-------------- 1 file changed, 40 insertions(+), 45 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index cdd194482c8..bc06336a543 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -244,8 +244,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> } else if let Some(in_list) = expr.downcast_ref::() { can_be_pushed_down(in_list.expr(), schema) && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) - } else if let Some(scalar_fn) = expr.downcast_ref::() { - can_scalar_fn_be_pushed_down(scalar_fn, schema) + } else if expr.downcast_ref::().is_some() { + get_source_data_type(df_expr, schema).is_some() } else { tracing::debug!(%df_expr, "DataFusion expression can't be pushed down"); false @@ -292,51 +292,46 @@ fn supported_data_types(dt: &DataType) -> bool { is_supported } -/// Checks if a GetField scalar function can be pushed down. -fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool { - let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::(scalar_fn) - else { - // Only get_field pushdown is supported. - return false; - }; +/// Evaluate the source `expr` within the scope of `schema` and return its data type. If the source +/// expression is not composed of valid field accesses that we can pushdown to Vortex, fail. +fn get_source_data_type(expr: &Arc, schema: &Schema) -> Option { + if let Some(col) = expr.as_any().downcast_ref::() { + // Column expression handler + let Ok(field) = schema.field_with_name(col.name()) else { + return None; + }; + + // Get back the data type here instead. + Some(field.data_type().clone()) + } else if let Some(scalar_fn) = expr.as_any().downcast_ref::() { + // Struct field access handler + let get_field_fn = ScalarFunctionExpr::try_downcast_func::(scalar_fn)?; + + let args = get_field_fn.args(); + if args.len() != 2 { + return None; + } - let args = get_field_fn.args(); - if args.len() != 2 { - tracing::debug!( - "Expected 2 arguments for GetField, not pushing down {} arguments", - args.len() - ); - return false; + let source = &args[0]; + let field_name_expr = &args[1]; + + let DataType::Struct(fields) = get_source_data_type(source, schema)? else { + return None; + }; + + let field_name = field_name_expr + .as_any() + .downcast_ref::() + .and_then(|l| l.value().try_as_str()) + .flatten()?; + + // Extract the named field from the struct type + fields + .find(field_name) + .map(|(_, dt)| dt.data_type().clone()) + } else { + None } - let source_expr = &args[0]; - let field_name_expr = &args[1]; - let Some(field_name) = field_name_expr - .as_any() - .downcast_ref::() - .and_then(|lit| lit.value().try_as_str().flatten()) - else { - return false; - }; - - let Ok(source_dt) = source_expr.data_type(schema) else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type for GetField, not pushing down" - ); - return false; - }; - let DataType::Struct(fields) = source_dt else { - tracing::debug!( - field_name = field_name, - schema = ?schema, - source_expr = ?source_expr, - "Failed to get source type as struct for GetField, not pushing down" - ); - return false; - }; - fields.find(field_name).is_some() } #[cfg(test)] From af45f36512c7f032c2ac148efe10eb3d32c98ec2 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 19 Nov 2025 16:41:51 -0500 Subject: [PATCH 02/16] remove, add tests Signed-off-by: Andrew Duffy --- vortex-datafusion/Cargo.toml | 1 + vortex-datafusion/src/convert/exprs.rs | 231 ++++++++++++++++-------- vortex-datafusion/src/persistent/mod.rs | 2 +- 3 files changed, 162 insertions(+), 72 deletions(-) diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index c929b1befc1..f0f3bc363d3 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -43,6 +43,7 @@ vortex-utils = { workspace = true, features = ["dashmap"] } [dev-dependencies] anyhow = { workspace = true } datafusion = { workspace = true } +datafusion-common = { workspace = true } insta = { workspace = true } rstest = { workspace = true } tempfile = { workspace = true } diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index bc06336a543..57c011a057f 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -244,8 +244,11 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> } else if let Some(in_list) = expr.downcast_ref::() { can_be_pushed_down(in_list.expr(), schema) && in_list.list().iter().all(|e| can_be_pushed_down(e, schema)) - } else if expr.downcast_ref::().is_some() { - get_source_data_type(df_expr, schema).is_some() + } else if let Some(scalar_fn) = expr.downcast_ref::() { + // Only get_field expressions should be pushed down. Note, we know that + // the GetFieldFunc call should be well-formed, because the DataFusion planner + // checks that for us before we even get to the DataSource. + ScalarFunctionExpr::try_downcast_func::(scalar_fn).is_some() } else { tracing::debug!(%df_expr, "DataFusion expression can't be pushed down"); false @@ -292,70 +295,46 @@ fn supported_data_types(dt: &DataType) -> bool { is_supported } -/// Evaluate the source `expr` within the scope of `schema` and return its data type. If the source -/// expression is not composed of valid field accesses that we can pushdown to Vortex, fail. -fn get_source_data_type(expr: &Arc, schema: &Schema) -> Option { - if let Some(col) = expr.as_any().downcast_ref::() { - // Column expression handler - let Ok(field) = schema.field_with_name(col.name()) else { - return None; - }; - - // Get back the data type here instead. - Some(field.data_type().clone()) - } else if let Some(scalar_fn) = expr.as_any().downcast_ref::() { - // Struct field access handler - let get_field_fn = ScalarFunctionExpr::try_downcast_func::(scalar_fn)?; - - let args = get_field_fn.args(); - if args.len() != 2 { - return None; - } - - let source = &args[0]; - let field_name_expr = &args[1]; - - let DataType::Struct(fields) = get_source_data_type(source, schema)? else { - return None; - }; - - let field_name = field_name_expr - .as_any() - .downcast_ref::() - .and_then(|l| l.value().try_as_str()) - .flatten()?; - - // Extract the named field from the struct type - fields - .find(field_name) - .map(|(_, dt)| dt.data_type().clone()) - } else { - None - } -} - #[cfg(test)] mod tests { + use std::any::Any; use std::sync::Arc; use arrow_schema::DataType; use arrow_schema::Field; - use arrow_schema::Fields; use arrow_schema::Schema; + use arrow_schema::SchemaBuilder; + use arrow_schema::SchemaRef; use arrow_schema::TimeUnit as ArrowTimeUnit; use datafusion::functions::core::getfield::GetFieldFunc; + use datafusion::logical_expr::ColumnarValue; + use datafusion::logical_expr::Signature; use datafusion_common::ScalarValue; + use datafusion_common::ToDFSchema; use datafusion_common::config::ConfigOptions; + use datafusion_datasource::file::FileSource; + use datafusion_expr::Expr; use datafusion_expr::Operator as DFOperator; + use datafusion_expr::ScalarFunctionArgs; use datafusion_expr::ScalarUDF; + use datafusion_expr::ScalarUDFImpl; + use datafusion_expr::Volatility; + use datafusion_expr::col; + use datafusion_expr::execution_props::ExecutionProps; + use datafusion_expr::expr::ScalarFunction; + use datafusion_functions::expr_fn::get_field; use datafusion_physical_expr::PhysicalExpr; + use datafusion_physical_expr::create_physical_expr; use datafusion_physical_plan::expressions as df_expr; + use datafusion_physical_plan::filter_pushdown::PushedDown; use insta::assert_snapshot; use rstest::rstest; use vortex::expr::Expression; use vortex::expr::Operator; use super::*; + use crate::VortexSource; + use crate::persistent::cache::VortexFileCache; #[rstest::fixture] fn test_schema() -> Schema { @@ -527,7 +506,8 @@ mod tests { DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), false )] - #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), false)] + #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into() + ), false)] // Dictionary types - should be supported if value type is supported #[case::dict_utf8( DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), @@ -669,33 +649,142 @@ mod tests { "#); } - #[rstest] - #[case::valid_field("field1", true)] - #[case::missing_field("nonexistent_field", false)] - fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) { - let struct_fields = Fields::from(vec![ - Field::new("field1", DataType::Utf8, true), - Field::new("field2", DataType::Int32, true), - ]); - let schema = Schema::new(vec![Field::new( - "my_struct", - DataType::Struct(struct_fields), - true, - )]); + #[test] + fn test_pushdown_nested_filter() { + // schema: + // a: struct + // |- one: i32 + // b:struct + // |- two: i32 + let mut test_schema = SchemaBuilder::new(); + test_schema.push(Field::new_struct( + "a", + vec![Field::new("one", DataType::Int32, false)], + false, + )); + test_schema.push(Field::new_struct( + "b", + vec![Field::new("two", DataType::Int32, false)], + false, + )); - let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc; - let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some( - field_name.to_string(), - )))) as Arc; + let test_schema = Arc::new(test_schema.finish()); + // Make sure filter is pushed down + let filter = get_field(col("b"), "two").eq(datafusion_expr::lit(10i32)); - let get_field_expr = Arc::new(ScalarFunctionExpr::new( - "get_field", - Arc::new(ScalarUDF::from(GetFieldFunc::new())), - vec![struct_col, field_name_lit], - Arc::new(Field::new(field_name, DataType::Utf8, true)), - Arc::new(ConfigOptions::new()), - )) as Arc; + let df_schema = test_schema.clone().to_dfschema().unwrap(); + + let physical_filter = + create_physical_expr(&filter, &df_schema, &ExecutionProps::default()).unwrap(); + + let source = vortex_source(&test_schema); + + let prop = source + .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) + .unwrap(); + assert!(matches!(prop.filters[0], PushedDown::Yes)); + } + + #[test] + fn test_pushdown_deeply_nested_filter() { + // schema: + // a: struct + // |- b: struct + // |- c: i32 + let mut schema = SchemaBuilder::new(); + + let c = Field::new("c", DataType::Int32, false); + let b = Field::new_struct("b", vec![c], false); + let a = Field::new_struct("a", vec![b], false); + schema.push(a); + + let schema = Arc::new(schema.finish()); + let df_schema = schema.clone().to_dfschema().unwrap(); + + let source = vortex_source(&schema); + + let deep_filter = get_field(get_field(col("a"), "b"), "c").eq(datafusion_expr::lit(10i32)); + + let physical_filter = + create_physical_expr(&deep_filter, &df_schema, &ExecutionProps::default()).unwrap(); + + let prop = source + .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) + .unwrap(); + assert!(matches!(prop.filters[0], PushedDown::Yes)); + } + + #[test] + fn test_unknown_scalar_function() { + #[derive(Debug, PartialEq, Eq, Hash)] + pub struct UnknownImpl { + signature: Signature, + } + + impl ScalarUDFImpl for UnknownImpl { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "unknown" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result { + Ok(DataType::Int32) + } + + fn invoke_with_args( + &self, + _args: ScalarFunctionArgs, + ) -> datafusion_common::Result { + Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(1)))) + } + } + + // schema: + // a: struct + // |- b: struct + // |- c: i32 + let mut schema = SchemaBuilder::new(); + + let c = Field::new("c", DataType::Int32, false); + let b = Field::new_struct("b", vec![c], false); + let a = Field::new_struct("a", vec![b], false); + schema.push(a); + + let schema = Arc::new(schema.finish()); + let df_schema = schema.clone().to_dfschema().unwrap(); + + let source = vortex_source(&schema); + + let unknown_func = Expr::ScalarFunction(ScalarFunction { + func: Arc::new(ScalarUDF::new_from_impl(UnknownImpl { + signature: Signature::nullary(Volatility::Immutable), + })), + args: vec![], + }); + + // Another weird ScalarFunction that we can't push down + let deep_filter = unknown_func.eq(datafusion_expr::lit(10i32)); + + let physical_filter = + create_physical_expr(&deep_filter, &df_schema, &ExecutionProps::default()).unwrap(); + + let prop = source + .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) + .unwrap(); + assert!(matches!(prop.filters[0], PushedDown::No)); + } + + fn vortex_source(schema: &SchemaRef) -> Arc { + let session = VortexSession::default(); + let cache = VortexFileCache::new(1024, 1024, session.clone()); - assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected); + Arc::new(VortexSource::new(session.clone(), cache)).with_schema(schema.clone()) } } diff --git a/vortex-datafusion/src/persistent/mod.rs b/vortex-datafusion/src/persistent/mod.rs index 9be23b7d804..4e9458758df 100644 --- a/vortex-datafusion/src/persistent/mod.rs +++ b/vortex-datafusion/src/persistent/mod.rs @@ -2,7 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors //! Persistent implementation of a Vortex table provider. -mod cache; +pub(crate) mod cache; mod format; pub mod metrics; mod opener; From c05233cd41da1383e446c8ed968ccdab42576dd8 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 19 Nov 2025 16:47:22 -0500 Subject: [PATCH 03/16] clippy Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 57c011a057f..6045fa047b0 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -693,9 +693,15 @@ mod tests { // |- c: i32 let mut schema = SchemaBuilder::new(); - let c = Field::new("c", DataType::Int32, false); - let b = Field::new_struct("b", vec![c], false); - let a = Field::new_struct("a", vec![b], false); + let a = Field::new_struct( + "a", + vec![Field::new_struct( + "b", + vec![Field::new("c", DataType::Int32, false)], + false, + )], + false, + ); schema.push(a); let schema = Arc::new(schema.finish()); @@ -752,9 +758,15 @@ mod tests { // |- c: i32 let mut schema = SchemaBuilder::new(); - let c = Field::new("c", DataType::Int32, false); - let b = Field::new_struct("b", vec![c], false); - let a = Field::new_struct("a", vec![b], false); + let a = Field::new_struct( + "a", + vec![Field::new_struct( + "b", + vec![Field::new("c", DataType::Int32, false)], + false, + )], + false, + ); schema.push(a); let schema = Arc::new(schema.finish()); @@ -785,6 +797,6 @@ mod tests { let session = VortexSession::default(); let cache = VortexFileCache::new(1024, 1024, session.clone()); - Arc::new(VortexSource::new(session.clone(), cache)).with_schema(schema.clone()) + Arc::new(VortexSource::new(session, cache)).with_schema(schema.clone()) } } From 6987ba5faf9d2017e1a0b8d39ee869ce6465c8fe Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 20 Nov 2025 14:04:19 -0500 Subject: [PATCH 04/16] fail in planner if filter can no longer be pushed Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/opener.rs | 28 ++++++++++++---------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index d20844efbc0..86a7a18379a 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -12,8 +12,7 @@ use arrow_schema::SchemaRef; use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; use datafusion_common::arrow::array::RecordBatch; -use datafusion_datasource::FileRange; -use datafusion_datasource::PartitionedFile; +use datafusion_common::internal_datafusion_err; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; @@ -348,17 +347,22 @@ impl FileOpener for VortexOpener { ); } - let filter = filter - .and_then(|f| { - let exprs = split_conjunction(&f) - .into_iter() - .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) - .collect::>(); + let filter = match filter { + None => None, + Some(f) => { + let exprs = split_conjunction(&f).into_iter().collect::>(); - make_vortex_predicate(&exprs).transpose() - }) - .transpose() - .map_err(|e| DataFusionError::External(e.into()))?; + for expr in &exprs { + if !can_be_pushed_down(expr, &predicate_file_schema) { + internal_datafusion_err!("DataFusion predicate {expr} cannot be pushed down to Vortex file {} with schema {predicate_file_schema}", + file_meta.object_meta.location); + } + } + + make_vortex_predicate(&exprs) + .map_err(|e| DataFusionError::External(e.into()))? + } + }; if let Some(limit) = limit && filter.is_none() From 11afbc9548e71e113d0070c7dea476d46a292106 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 20 Nov 2025 16:17:12 -0500 Subject: [PATCH 05/16] cleanup Signed-off-by: Andrew Duffy --- Cargo.lock | 1 + vortex-datafusion/Cargo.toml | 1 + vortex-datafusion/src/convert/exprs.rs | 3 + vortex-datafusion/src/convert/mod.rs | 1 + vortex-datafusion/src/convert/ranges.rs | 87 ++++ vortex-datafusion/src/persistent/opener.rs | 494 ++++++++++----------- 6 files changed, 316 insertions(+), 271 deletions(-) create mode 100644 vortex-datafusion/src/convert/ranges.rs diff --git a/Cargo.lock b/Cargo.lock index 64327fb86ac..4a8a6424c92 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8554,6 +8554,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "url", "vortex", "vortex-utils", "walkdir", diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index f0f3bc363d3..84dfd6c592d 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -39,6 +39,7 @@ tokio-stream = { workspace = true } tracing = { workspace = true } vortex = { workspace = true, features = ["object_store", "tokio"] } vortex-utils = { workspace = true, features = ["dashmap"] } +url = "2.5.7" [dev-dependencies] anyhow = { workspace = true } diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 6045fa047b0..d4ba58086f7 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -223,6 +223,8 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> return false; } + let _expr_str = format!("{:?}", df_expr); + let expr = df_expr.as_any(); if let Some(binary) = expr.downcast_ref::() { can_binary_be_pushed_down(binary, schema) @@ -286,6 +288,7 @@ fn supported_data_types(dt: &DataType) -> bool { | Timestamp(_, _) | Time32(_) | Time64(_) + | Struct(_) ); if !is_supported { diff --git a/vortex-datafusion/src/convert/mod.rs b/vortex-datafusion/src/convert/mod.rs index d40be1d38fc..1fbdf1a30c8 100644 --- a/vortex-datafusion/src/convert/mod.rs +++ b/vortex-datafusion/src/convert/mod.rs @@ -4,6 +4,7 @@ use vortex::error::VortexResult; pub(crate) mod exprs; +pub(crate) mod ranges; mod scalars; /// First-party trait for implementing conversion from DataFusion types to Vortex types. diff --git a/vortex-datafusion/src/convert/ranges.rs b/vortex-datafusion/src/convert/ranges.rs new file mode 100644 index 00000000000..5bb995fe8b0 --- /dev/null +++ b/vortex-datafusion/src/convert/ranges.rs @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::ops::Range; + +use datafusion_datasource::FileRange; +use vortex::ArrayRef; +use vortex::scan::ScanBuilder; + +/// If the file has a [`FileRange`](datafusion::datasource::listing::FileRange), we translate it into a row range in the file for the scan. +pub(crate) fn apply_byte_range( + file_range: FileRange, + total_size: u64, + row_count: u64, + scan_builder: ScanBuilder, +) -> ScanBuilder { + let row_range = byte_range_to_row_range( + file_range.start as u64..file_range.end as u64, + row_count, + total_size, + ); + + scan_builder.with_row_range(row_range) +} + +fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u64) -> Range { + let average_row = total_size / row_count; + assert!(average_row > 0, "A row must always have at least one byte"); + + let start_row = byte_range.start / average_row; + let end_row = byte_range.end / average_row; + + // We take the min here as `end_row` might overshoot + start_row..u64::min(row_count, end_row) +} + +#[cfg(test)] +mod tests { + use std::ops::Range; + + use itertools::Itertools; + use rstest::rstest; + + use crate::convert::ranges::byte_range_to_row_range; + + #[rstest] + #[case(0..100, 100, 100, 0..100)] + #[case(0..105, 100, 105, 0..100)] + #[case(0..50, 100, 105, 0..50)] + #[case(50..105, 100, 105, 50..100)] + #[case(0..1, 4, 8, 0..0)] + #[case(1..8, 4, 8, 0..4)] + fn test_range_translation( + #[case] byte_range: Range, + #[case] row_count: u64, + #[case] total_size: u64, + #[case] expected: Range, + ) { + assert_eq!( + byte_range_to_row_range(byte_range, row_count, total_size), + expected + ); + } + + #[test] + fn test_consecutive_ranges() { + let row_count = 100; + let total_size = 429; + let bytes_a = 0..143; + let bytes_b = 143..286; + let bytes_c = 286..429; + + let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size); + let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size); + let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size); + + assert_eq!(rows_a.end - rows_a.start, 35); + assert_eq!(rows_b.end - rows_b.start, 36); + assert_eq!(rows_c.end - rows_c.start, 29); + + assert_eq!(rows_a.start, 0); + assert_eq!(rows_c.end, 100); + for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() { + assert_eq!(left.end, right.start); + } + } +} diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 86a7a18379a..df87285ca3d 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -use std::ops::Range; use std::sync::Arc; use std::sync::Weak; @@ -13,6 +12,7 @@ use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; use datafusion_common::arrow::array::RecordBatch; use datafusion_common::internal_datafusion_err; +use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; @@ -31,7 +31,6 @@ use futures::stream; use object_store::ObjectStore; use object_store::path::Path; use tracing::Instrument; -use vortex::ArrayRef; use vortex::dtype::FieldName; use vortex::error::VortexError; use vortex::expr::root; @@ -46,6 +45,7 @@ use vortex_utils::aliases::dash_map::Entry; use super::cache::VortexFileCache; use crate::convert::exprs::can_be_pushed_down; use crate::convert::exprs::make_vortex_predicate; +use crate::convert::ranges::apply_byte_range; #[derive(Clone)] pub(crate) struct VortexOpener { @@ -413,279 +413,222 @@ impl FileOpener for VortexOpener { Ok(stream) } - .in_current_span() - .boxed()) + .in_current_span() + .boxed()) } } -/// If the file has a [`FileRange`](datafusion::datasource::listing::FileRange), we translate it into a row range in the file for the scan. -fn apply_byte_range( - file_range: FileRange, - total_size: u64, - row_count: u64, - scan_builder: ScanBuilder, -) -> ScanBuilder { - let row_range = byte_range_to_row_range( - file_range.start as u64..file_range.end as u64, - row_count, - total_size, - ); - - scan_builder.with_row_range(row_range) -} - -fn byte_range_to_row_range(byte_range: Range, row_count: u64, total_size: u64) -> Range { - let average_row = total_size / row_count; - assert!(average_row > 0, "A row must always have at least one byte"); - - let start_row = byte_range.start / average_row; - let end_row = byte_range.end / average_row; - - // We take the min here as `end_row` might overshoot - start_row..u64::min(row_count, end_row) -} - #[cfg(test)] mod tests { - use std::sync::LazyLock; + use std::str::FromStr; use arrow_schema::Fields; + use chrono::DateTime; use chrono::Utc; use datafusion::arrow::array::RecordBatch; - use datafusion::arrow::array::StringArray; use datafusion::arrow::array::StructArray; use datafusion::arrow::datatypes::DataType; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::util::display::FormatOptions; + use datafusion::arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::record_batch; use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion::logical_expr::col; use datafusion::logical_expr::lit; use datafusion::physical_expr::planner::logical2physical; - use datafusion::physical_expr_adapter::DefaultPhysicalExprAdapterFactory; - use datafusion::scalar::ScalarValue; + use datafusion::prelude::SessionContext; + use datafusion_common::config::ConfigOptions; + use datafusion_common::create_array; + use datafusion_datasource::file::FileSource; + use datafusion_datasource::file_scan_config::FileScanConfigBuilder; + use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_physical_plan::filter_pushdown::PushedDown; + use futures::pin_mut; use insta::assert_snapshot; - use itertools::Itertools; use object_store::ObjectMeta; use object_store::memory::InMemory; use rstest::rstest; + use url::Url; + use vortex::ArrayRef; use vortex::VortexSessionDefault; use vortex::arrow::FromArrowArray; use vortex::file::WriteOptionsSessionExt; use vortex::io::ObjectStoreWriter; use vortex::io::VortexWrite; use vortex::session::VortexSession; + use vortex_utils::aliases::hash_map::HashMap; use super::*; + use crate::VortexSource; - static SESSION: LazyLock = LazyLock::new(VortexSession::default); - - #[rstest] - #[case(0..100, 100, 100, 0..100)] - #[case(0..105, 100, 105, 0..100)] - #[case(0..50, 100, 105, 0..50)] - #[case(50..105, 100, 105, 50..100)] - #[case(0..1, 4, 8, 0..0)] - #[case(1..8, 4, 8, 0..4)] - fn test_range_translation( - #[case] byte_range: Range, - #[case] row_count: u64, - #[case] total_size: u64, - #[case] expected: Range, - ) { - assert_eq!( - byte_range_to_row_range(byte_range, row_count, total_size), - expected - ); - } - - #[test] - fn test_consecutive_ranges() { - let row_count = 100; - let total_size = 429; - let bytes_a = 0..143; - let bytes_b = 143..286; - let bytes_c = 286..429; - - let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size); - let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size); - let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size); - - assert_eq!(rows_a.end - rows_a.start, 35); - assert_eq!(rows_b.end - rows_b.start, 36); - assert_eq!(rows_c.end - rows_c.start, 29); - - assert_eq!(rows_a.start, 0); - assert_eq!(rows_c.end, 100); - for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() { - assert_eq!(left.end, right.start); - } - } - - async fn write_arrow_to_vortex( + struct TestFixtures { object_store: Arc, - path: &str, - rb: RecordBatch, - ) -> anyhow::Result { - let array = ArrayRef::from_arrow(rb, false); - let path = Path::parse(path)?; - - let mut write = ObjectStoreWriter::new(object_store, &path).await?; - let summary = SESSION - .write_options() - .write(&mut write, array.to_array_stream()) - .await?; - write.shutdown().await?; - - Ok(summary.size()) + // We need to return this to the caller to prevent the session context from + // being dropped and the object_store from being removed + #[allow(dead_code)] + session_context: SessionContext, + source: Arc, + file_meta: HashMap, } - fn make_meta(path: &str, data_size: u64) -> FileMeta { - FileMeta { - object_meta: ObjectMeta { - location: Path::from(path), - last_modified: Utc::now(), - size: data_size, - e_tag: None, - version: None, - }, - range: None, - extensions: None, - metadata_size_hint: None, + // Make a set of files and record batches + async fn make_source( + files: HashMap, + file_schema: &SchemaRef, + ) -> anyhow::Result { + let session = VortexSession::default(); + + let ctx = SessionContext::new(); + + let store: Arc = Arc::new(InMemory::new()); + + ctx.register_object_store(&Url::from_str("s3://in-memory")?, store.clone()); + + // "write" all the record batches to the named file paths + let mut file_meta = HashMap::with_capacity(files.len()); + + // TODO: make file schema by superset of fields? + for (path_str, rb) in files.iter() { + let array = ArrayRef::from_arrow(rb, false); + let path = Path::from_url_path(path_str.as_str())?; + let mut write = ObjectStoreWriter::new(store.clone(), &path).await?; + let summary = session + .write_options() + .write(&mut write, array.to_array_stream()) + .await?; + write.shutdown().await?; + + file_meta.insert( + path_str.clone(), + FileMeta::from(ObjectMeta { + location: path.clone(), + size: summary.size(), + e_tag: None, + version: None, + last_modified: DateTime::::from_timestamp_secs(0).unwrap(), + }), + ); } + + let source = VortexSource::new( + session.clone(), + VortexFileCache::new(1024, 1024, session.clone()), + ); + let source = source + .with_schema(Arc::clone(file_schema)) + .with_batch_size(100); + + Ok(TestFixtures { + session_context: ctx, + object_store: store, + file_meta, + source, + }) } #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), (1, 3), (0, 0))] - // If we don't have a physical expr adapter, we just drop filters on partition values - #[case(None, (1, 3), (1, 3))] #[tokio::test] - async fn test_adapter_optimization_partition_column( - #[case] expr_adapter_factory: Option>, - #[case] expected_result1: (usize, usize), - #[case] expected_result2: (usize, usize), - ) -> anyhow::Result<()> { - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "part=1/file.vortex"; - let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - let data_size = - write_arrow_to_vortex(object_store.clone(), file_path, batch.clone()).await?; - - let file_schema = batch.schema(); - let mut file = PartitionedFile::new(file_path.to_string(), data_size); - file.partition_values = vec![ScalarValue::Int32(Some(1))]; + async fn test_do_not_pushdown_filter_on_partition_columns() -> anyhow::Result<()> { + let batch = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?; + let file_schema = batch.schema().clone(); + let files = HashMap::from_iter([("part=1/file.vortex".to_string(), batch)]); let table_schema = Arc::new(Schema::new(vec![ Field::new("part", DataType::Int32, false), Field::new("a", DataType::Int32, false), ])); - let make_opener = |filter| VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![Arc::new(Field::new("part", DataType::Int32, false))], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: file_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let TestFixtures { + source, + object_store, + file_meta, + .. + } = make_source(files, &file_schema).await?; - // filter matches partition value - let filter = col("part").eq(lit(1)); - let filter = logical2physical(&filter, table_schema.as_ref()); + // Attempting to push filters over partitions should fail. + let filter_partition_col = col("part").eq(lit(1i32)); + let filter_partition_col = logical2physical(&filter_partition_col, table_schema.as_ref()); - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); + let push_filters = + source.try_pushdown_filters(vec![filter_partition_col], &ConfigOptions::default())?; - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); + assert!(matches!(push_filters.filters[0], PushedDown::No)); - assert_eq!((num_batches, num_rows), expected_result1); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + file_schema.clone(), + source.clone(), + ) + .build(); - // filter doesn't matches partition value - let filter = col("part").eq(lit(2)); - let filter = logical2physical(&filter, table_schema.as_ref()); + // Create an opener with this + let opener = source.create_file_opener(object_store.clone(), &base_config, 0); - let opener = make_opener(filter); - let stream = opener - .open(make_meta(file_path, data_size), file.clone()) - .unwrap() - .await - .unwrap(); + let file1 = file_meta.get("part=1/file.vortex").unwrap().clone(); + let part_file1 = PartitionedFile::new("part=1/file.vortex", file1.object_meta.size); - let data = stream.try_collect::>().await?; - let num_batches = data.len(); - let num_rows = data.iter().map(|rb| rb.num_rows()).sum::(); - assert_eq!((num_batches, num_rows), expected_result2); + let open_result = opener.open(file1, part_file1)?.await?; + + pin_mut!(open_result); + let mut rbs = open_result.try_collect::>().await?; + assert_eq!(rbs.len(), 1); + + let rb = rbs.pop().unwrap(); + assert_eq!(rb.num_rows(), 3); + let expected = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?; + + assert_eq!(rb, expected); Ok(()) } - #[rstest] - #[case(Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _))] - // If we don't have a physical expr adapter, we just drop filters on partition values. - // This is currently not supported, the work to support it requires to rewrite the predicate with appropriate casts. // Seems like datafusion is moving towards having DefaultPhysicalExprAdapterFactory be always provided, which would make it work OOTB. // See: https://github.com/apache/datafusion/issues/16800 - // #[case(None)] #[tokio::test] - async fn test_open_files_different_table_schema( - #[case] expr_adapter_factory: Option>, - ) -> anyhow::Result<()> { - use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - - let object_store = Arc::new(InMemory::new()) as Arc; - let file1_path = "/path/file1.vortex"; - let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)])).unwrap(); - let data_size1 = write_arrow_to_vortex(object_store.clone(), file1_path, batch1).await?; - let file1 = PartitionedFile::new(file1_path.to_string(), data_size1); - - let file2_path = "/path/file2.vortex"; - let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)])).unwrap(); - let data_size2 = write_arrow_to_vortex(object_store.clone(), file2_path, batch2).await?; - let file2 = PartitionedFile::new(file1_path.to_string(), data_size1); + async fn test_open_files_different_table_schema() -> anyhow::Result<()> { + let batch1 = record_batch!(("a", Int32, vec![Some(1), Some(2), Some(3)]))?; + let batch2 = record_batch!(("a", Int16, vec![Some(-1), Some(-2), Some(-3)]))?; + let files = HashMap::from_iter([ + ("path/file1.vortex".to_string(), batch1), + ("path/file2.vortex".to_string(), batch2), + ]); // Table schema has can accommodate both files let table_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); - let make_opener = |filter| VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0].into()), - filter: Some(filter), - file_pruning_predicate: None, - expr_adapter_factory: expr_adapter_factory.clone(), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let TestFixtures { + file_meta, + object_store, + source, + .. + } = make_source(files, &table_schema).await?; + let file1 = file_meta.get("path/file1.vortex").unwrap().clone(); + let part_file1 = PartitionedFile::new("path/file1.vortex", file1.object_meta.size); + let file2 = file_meta.get("path/file2.vortex").unwrap().clone(); + let part_file2 = PartitionedFile::new("path/file2.vortex", file1.object_meta.size); + + // Try and push filters into the source. let filter = col("a").lt(lit(100_i32)); let filter = logical2physical(&filter, table_schema.as_ref()); + let pushdown_result = + source.try_pushdown_filters(vec![filter], &ConfigOptions::default())?; + // filter should've succeeded pushing + assert!(matches!(pushdown_result.filters[0], PushedDown::Yes)); + + // // Use a SchemaAdapter to allow for reading with evolution. + // Why does this not matter? + // let schema_adapter = DefaultSchemaAdapterFactory::default(); + + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + table_schema.clone(), + source.clone(), + ) + .build(); - let opener1 = make_opener(filter.clone()); - let stream = opener1 - .open(make_meta(file1_path, data_size1), file1)? - .await?; + let opener = source.create_file_opener(object_store.clone(), &base_config, 0); + let stream = opener.open(file1, part_file1)?.await?; let format_opts = FormatOptions::new().with_types_info(true); @@ -701,11 +644,7 @@ mod tests { +-------+ "); - let opener2 = make_opener(filter.clone()); - let stream = opener2 - .open(make_meta(file2_path, data_size2), file2)? - .await?; - + let stream = opener.open(file2, part_file2)?.await?; let data = stream.try_collect::>().await?; assert_snapshot!(pretty_format_batches_with_options(&data, &format_opts)?.to_string(), @r" +-------+ @@ -721,6 +660,7 @@ mod tests { Ok(()) } + // #[tokio::test] // This test verifies that files with different column order than the // table schema can be opened without errors. The fix ensures that the @@ -794,83 +734,95 @@ mod tests { // a nested schema mismatch between the physical file schema and logical // table schema. async fn test_adapter_logical_physical_struct_mismatch() -> anyhow::Result<()> { - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "/path/file.vortex"; - let file_struct_fields = Fields::from(vec![ - Field::new("field1", DataType::Utf8, true), - Field::new("field2", DataType::Utf8, true), - ]); + let field1 = create_array!(Utf8, vec![Some("value1"), Some("value2"), Some("value3")]); + let field2 = create_array!(Utf8, vec![Some("a"), Some("b"), Some("c")]); + let struct_array = StructArray::new( - file_struct_fields.clone(), - vec![ - Arc::new(StringArray::from(vec!["value1", "value2", "value3"])), - Arc::new(StringArray::from(vec!["a", "b", "c"])), - ], + Fields::from(vec![ + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), + ]), + vec![field1.clone(), field2.clone()], None, ); - let batch = RecordBatch::try_new( - Arc::new(Schema::new(vec![Field::new( - "my_struct", - DataType::Struct(file_struct_fields), - true, - )])), - vec![Arc::new(struct_array)], - )?; - let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?; - // Table schema has an extra utf8 field. + // file schema reflects the data + let file_schema = Arc::new(Schema::new(vec![Field::new( + "my_struct", + DataType::Struct(Fields::from(vec![ + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), + ])), + true, + )])); + + // Table schema has an extra inner utf8 field. let table_schema = Arc::new(Schema::new(vec![Field::new( "my_struct", DataType::Struct(Fields::from(vec![ - Field::new( - "field1", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), - true, - ), - Field::new( - "field2", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), - true, - ), + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), Field::new("field3", DataType::Utf8, true), ])), true, )])); - let opener = VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: None, - filter: Some(logical2physical( - &col("my_struct").is_not_null(), - &table_schema, - )), - file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema, - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let batch = RecordBatch::try_new(file_schema.clone(), vec![Arc::new(struct_array)])?; + + let files = HashMap::from_iter([("path/file.vortex".to_string(), batch.clone())]); + + let TestFixtures { + source, + file_meta, + object_store, + .. + } = make_source(files, &table_schema).await?; + + let filter = logical2physical(&col("my_struct").is_not_null(), &table_schema); + let pushdown_result = + source.try_pushdown_filters(vec![filter], &ConfigOptions::default())?; + + // The filter should not have been pushed + assert!(matches!(pushdown_result.filters[0], PushedDown::Yes)); + + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + table_schema.clone(), + source.clone(), + ) + .build(); + + let opener = source.create_file_opener(object_store.clone(), &base_config, 0); + + let file = file_meta.get("path/file.vortex").unwrap().clone(); + let part_file = PartitionedFile::new("path/file.vortex", file.object_meta.size); - // The opener should be able to open the file with a filter on the - // struct column. let data = opener - .open( - make_meta(file_path, data_size), - PartitionedFile::new(file_path.to_string(), data_size), - )? + .open(file, part_file)? .await? .try_collect::>() .await?; assert_eq!(data.len(), 1); - assert_eq!(data[0].num_rows(), 3); + + // The opener will return batches that have been adapted with the extra "field3" with + // nulls added. + let field3 = create_array!(Utf8, vec![Option::::None; 3]); + let table_struct = StructArray::new( + Fields::from(vec![ + Field::new("field1", DataType::Utf8, true), + Field::new("field2", DataType::Utf8, true), + Field::new("field3", DataType::Utf8, true), + ]), + vec![field1, field2, field3], + None, + ); + + let batch_with_nulls = + RecordBatch::try_new(table_schema.clone(), vec![Arc::new(table_struct)])?; + + // The opener returns us a stream where field3 is replaced with nulls + assert_eq!(data[0], batch_with_nulls); Ok(()) } From 5ad5366fe18e820493d2ac1d973f051200921d0b Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Thu, 20 Nov 2025 16:21:04 -0500 Subject: [PATCH 06/16] replace with logical2physical Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index d4ba58086f7..4c3ac6f6cf1 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -328,6 +328,8 @@ mod tests { use datafusion_functions::expr_fn::get_field; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_expr; + use datafusion_physical_expr::planner::logical2physical; + use datafusion_physical_expr::{PhysicalExpr, create_physical_expr}; use datafusion_physical_plan::expressions as df_expr; use datafusion_physical_plan::filter_pushdown::PushedDown; use insta::assert_snapshot; @@ -677,8 +679,7 @@ mod tests { let df_schema = test_schema.clone().to_dfschema().unwrap(); - let physical_filter = - create_physical_expr(&filter, &df_schema, &ExecutionProps::default()).unwrap(); + let physical_filter = logical2physical(&filter, df_schema.as_ref()).unwrap(); let source = vortex_source(&test_schema); @@ -714,8 +715,7 @@ mod tests { let deep_filter = get_field(get_field(col("a"), "b"), "c").eq(datafusion_expr::lit(10i32)); - let physical_filter = - create_physical_expr(&deep_filter, &df_schema, &ExecutionProps::default()).unwrap(); + let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()).unwrap(); let prop = source .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) @@ -787,8 +787,7 @@ mod tests { // Another weird ScalarFunction that we can't push down let deep_filter = unknown_func.eq(datafusion_expr::lit(10i32)); - let physical_filter = - create_physical_expr(&deep_filter, &df_schema, &ExecutionProps::default()).unwrap(); + let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()).unwrap(); let prop = source .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) From 5ddd409e58384c8e817b07e6692a7ae99422323e Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Tue, 25 Nov 2025 10:57:48 +0000 Subject: [PATCH 07/16] chore: apply imports_granularity = "Item" everywhere expect bench-vortex (#5521) requires some non-auto changes to to `bench-vortex` --------- Signed-off-by: Joe Isaacs --- vortex-array/src/arrays/struct_/vtable/reduce.rs | 2 +- vortex-array/src/compute/zip.rs | 3 ++- vortex-array/src/expr/analysis/immediate_access.rs | 6 +++--- vortex-datafusion/src/convert/exprs.rs | 4 +++- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/vortex-array/src/arrays/struct_/vtable/reduce.rs b/vortex-array/src/arrays/struct_/vtable/reduce.rs index 10a15f0b657..aca50b6ba0a 100644 --- a/vortex-array/src/arrays/struct_/vtable/reduce.rs +++ b/vortex-array/src/arrays/struct_/vtable/reduce.rs @@ -16,12 +16,12 @@ use crate::IntoArray; use crate::arrays::ExprArray; use crate::arrays::StructArray; use crate::expr::Expression; -use crate::expr::analysis::immediate_access::annotate_scope_access; use crate::expr::col; use crate::expr::root; use crate::expr::session::ExprSession; use crate::expr::transform::ExprOptimizer; use crate::expr::transform::PartitionedExpr; +use crate::expr::transform::immediate_access::annotate_scope_access; use crate::expr::transform::partition; use crate::expr::transform::replace; use crate::expr::transform::replace_root_fields; diff --git a/vortex-array/src/compute/zip.rs b/vortex-array/src/compute/zip.rs index 86b585eda89..1ca875b8bda 100644 --- a/vortex-array/src/compute/zip.rs +++ b/vortex-array/src/compute/zip.rs @@ -19,6 +19,7 @@ use super::cast; use crate::Array; use crate::ArrayRef; use crate::builders::ArrayBuilder; +use crate::builders::VarBinViewBuilder; use crate::builders::builder_with_capacity; use crate::compute::ComputeFn; use crate::compute::Kernel; @@ -260,7 +261,7 @@ mod tests { use crate::builders::ArrayBuilder; use crate::builders::BufferGrowthStrategy; use crate::builders::VarBinViewBuilder; - use crate::compute::zip; + use crate::compute::zip::VarBinViewBuilder; #[test] fn test_zip_basic() { diff --git a/vortex-array/src/expr/analysis/immediate_access.rs b/vortex-array/src/expr/analysis/immediate_access.rs index c05679b2c98..d2f01eab008 100644 --- a/vortex-array/src/expr/analysis/immediate_access.rs +++ b/vortex-array/src/expr/analysis/immediate_access.rs @@ -7,12 +7,12 @@ use vortex_error::VortexExpect; use vortex_utils::aliases::hash_set::HashSet; use crate::expr::Expression; -use crate::expr::analysis::AnnotationFn; -use crate::expr::analysis::Annotations; -use crate::expr::descendent_annotations; use crate::expr::exprs::get_item::GetItem; use crate::expr::exprs::root::Root; use crate::expr::exprs::select::Select; +use crate::expr::transform::annotations::AnnotationFn; +use crate::expr::transform::annotations::Annotations; +use crate::expr::transform::annotations::descendent_annotations; pub type FieldAccesses<'a> = Annotations<'a, FieldName>; diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 4c3ac6f6cf1..3d3709ac4a1 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -305,6 +305,7 @@ mod tests { use arrow_schema::DataType; use arrow_schema::Field; + use arrow_schema::Fields; use arrow_schema::Schema; use arrow_schema::SchemaBuilder; use arrow_schema::SchemaRef; @@ -329,13 +330,14 @@ mod tests { use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr::planner::logical2physical; - use datafusion_physical_expr::{PhysicalExpr, create_physical_expr}; use datafusion_physical_plan::expressions as df_expr; use datafusion_physical_plan::filter_pushdown::PushedDown; use insta::assert_snapshot; use rstest::rstest; + use vortex::VortexSessionDefault; use vortex::expr::Expression; use vortex::expr::Operator; + use vortex::session::VortexSession; use super::*; use crate::VortexSource; From 078c66180107e1f35604165ca2d4be1f45475ed7 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 14:43:44 -0500 Subject: [PATCH 08/16] remove dead diff Signed-off-by: Andrew Duffy --- vortex-array/src/arrays/struct_/vtable/reduce.rs | 2 +- vortex-array/src/compute/zip.rs | 3 +-- vortex-array/src/expr/analysis/immediate_access.rs | 6 +++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/vortex-array/src/arrays/struct_/vtable/reduce.rs b/vortex-array/src/arrays/struct_/vtable/reduce.rs index aca50b6ba0a..10a15f0b657 100644 --- a/vortex-array/src/arrays/struct_/vtable/reduce.rs +++ b/vortex-array/src/arrays/struct_/vtable/reduce.rs @@ -16,12 +16,12 @@ use crate::IntoArray; use crate::arrays::ExprArray; use crate::arrays::StructArray; use crate::expr::Expression; +use crate::expr::analysis::immediate_access::annotate_scope_access; use crate::expr::col; use crate::expr::root; use crate::expr::session::ExprSession; use crate::expr::transform::ExprOptimizer; use crate::expr::transform::PartitionedExpr; -use crate::expr::transform::immediate_access::annotate_scope_access; use crate::expr::transform::partition; use crate::expr::transform::replace; use crate::expr::transform::replace_root_fields; diff --git a/vortex-array/src/compute/zip.rs b/vortex-array/src/compute/zip.rs index 1ca875b8bda..86b585eda89 100644 --- a/vortex-array/src/compute/zip.rs +++ b/vortex-array/src/compute/zip.rs @@ -19,7 +19,6 @@ use super::cast; use crate::Array; use crate::ArrayRef; use crate::builders::ArrayBuilder; -use crate::builders::VarBinViewBuilder; use crate::builders::builder_with_capacity; use crate::compute::ComputeFn; use crate::compute::Kernel; @@ -261,7 +260,7 @@ mod tests { use crate::builders::ArrayBuilder; use crate::builders::BufferGrowthStrategy; use crate::builders::VarBinViewBuilder; - use crate::compute::zip::VarBinViewBuilder; + use crate::compute::zip; #[test] fn test_zip_basic() { diff --git a/vortex-array/src/expr/analysis/immediate_access.rs b/vortex-array/src/expr/analysis/immediate_access.rs index d2f01eab008..c05679b2c98 100644 --- a/vortex-array/src/expr/analysis/immediate_access.rs +++ b/vortex-array/src/expr/analysis/immediate_access.rs @@ -7,12 +7,12 @@ use vortex_error::VortexExpect; use vortex_utils::aliases::hash_set::HashSet; use crate::expr::Expression; +use crate::expr::analysis::AnnotationFn; +use crate::expr::analysis::Annotations; +use crate::expr::descendent_annotations; use crate::expr::exprs::get_item::GetItem; use crate::expr::exprs::root::Root; use crate::expr::exprs::select::Select; -use crate::expr::transform::annotations::AnnotationFn; -use crate::expr::transform::annotations::Annotations; -use crate::expr::transform::annotations::descendent_annotations; pub type FieldAccesses<'a> = Annotations<'a, FieldName>; From 4264c02ebd5f8c55120aa531eb05b9203075a17a Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 15:32:29 -0500 Subject: [PATCH 09/16] fix up Signed-off-by: Andrew Duffy --- vortex-datafusion/examples/vortex_table.rs | 6 +- vortex-datafusion/src/convert/exprs.rs | 9 +- vortex-datafusion/src/persistent/opener.rs | 128 +++++++++++---------- 3 files changed, 74 insertions(+), 69 deletions(-) diff --git a/vortex-datafusion/examples/vortex_table.rs b/vortex-datafusion/examples/vortex_table.rs index b98bbbe1106..7f96ffb13fc 100644 --- a/vortex-datafusion/examples/vortex_table.rs +++ b/vortex-datafusion/examples/vortex_table.rs @@ -79,7 +79,11 @@ async fn main() -> anyhow::Result<()> { ctx.register_table("vortex_tbl", listing_table as _)?; - run_query(&ctx, "SELECT * FROM vortex_tbl").await?; + run_query( + &ctx, + "SELECT * FROM vortex_tbl where numbers % 2 = 0 AND strings LIKE 'b%'", + ) + .await?; Ok(()) } diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 3d3709ac4a1..88bc5f90d42 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -305,7 +305,6 @@ mod tests { use arrow_schema::DataType; use arrow_schema::Field; - use arrow_schema::Fields; use arrow_schema::Schema; use arrow_schema::SchemaBuilder; use arrow_schema::SchemaRef; @@ -324,11 +323,9 @@ mod tests { use datafusion_expr::ScalarUDFImpl; use datafusion_expr::Volatility; use datafusion_expr::col; - use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::expr::ScalarFunction; use datafusion_functions::expr_fn::get_field; use datafusion_physical_expr::PhysicalExpr; - use datafusion_physical_expr::create_physical_expr; use datafusion_physical_expr::planner::logical2physical; use datafusion_physical_plan::expressions as df_expr; use datafusion_physical_plan::filter_pushdown::PushedDown; @@ -681,7 +678,7 @@ mod tests { let df_schema = test_schema.clone().to_dfschema().unwrap(); - let physical_filter = logical2physical(&filter, df_schema.as_ref()).unwrap(); + let physical_filter = logical2physical(&filter, df_schema.as_ref()); let source = vortex_source(&test_schema); @@ -717,7 +714,7 @@ mod tests { let deep_filter = get_field(get_field(col("a"), "b"), "c").eq(datafusion_expr::lit(10i32)); - let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()).unwrap(); + let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()); let prop = source .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) @@ -789,7 +786,7 @@ mod tests { // Another weird ScalarFunction that we can't push down let deep_filter = unknown_func.eq(datafusion_expr::lit(10i32)); - let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()).unwrap(); + let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()); let prop = source .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index df87285ca3d..70b5655d8c7 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -11,7 +11,6 @@ use arrow_schema::SchemaRef; use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; use datafusion_common::arrow::array::RecordBatch; -use datafusion_common::internal_datafusion_err; use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; @@ -347,22 +346,17 @@ impl FileOpener for VortexOpener { ); } - let filter = match filter { - None => None, - Some(f) => { - let exprs = split_conjunction(&f).into_iter().collect::>(); + let filter = filter + .and_then(|f| { + let exprs = split_conjunction(&f) + .into_iter() + .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) + .collect::>(); - for expr in &exprs { - if !can_be_pushed_down(expr, &predicate_file_schema) { - internal_datafusion_err!("DataFusion predicate {expr} cannot be pushed down to Vortex file {} with schema {predicate_file_schema}", - file_meta.object_meta.location); - } - } - - make_vortex_predicate(&exprs) - .map_err(|e| DataFusionError::External(e.into()))? - } - }; + make_vortex_predicate(&exprs).transpose() + }) + .transpose() + .map_err(|e| DataFusionError::External(e.into()))?; if let Some(limit) = limit && filter.is_none() @@ -413,8 +407,8 @@ impl FileOpener for VortexOpener { Ok(stream) } - .in_current_span() - .boxed()) + .in_current_span() + .boxed()) } } @@ -432,7 +426,6 @@ mod tests { use datafusion::arrow::util::display::FormatOptions; use datafusion::arrow::util::pretty::pretty_format_batches_with_options; use datafusion::common::record_batch; - use datafusion::datasource::schema_adapter::DefaultSchemaAdapterFactory; use datafusion::logical_expr::col; use datafusion::logical_expr::lit; use datafusion::physical_expr::planner::logical2physical; @@ -461,6 +454,7 @@ mod tests { use super::*; use crate::VortexSource; + /// Fixtures used for integration testing the FileSource and FileOpener struct TestFixtures { object_store: Arc, // We need to return this to the caller to prevent the session context from @@ -468,9 +462,26 @@ mod tests { #[allow(dead_code)] session_context: SessionContext, source: Arc, + files: Files, + } + + struct Files { file_meta: HashMap, } + impl Files { + fn get(&self, path: &str) -> (FileMeta, PartitionedFile) { + let file = self + .file_meta + .get(path) + .unwrap_or_else(|| panic!("Missing file {}", path)); + ( + file.clone(), + PartitionedFile::new(path, file.object_meta.size), + ) + } + } + // Make a set of files and record batches async fn make_source( files: HashMap, @@ -521,7 +532,7 @@ mod tests { Ok(TestFixtures { session_context: ctx, object_store: store, - file_meta, + files: Files { file_meta }, source, }) } @@ -541,7 +552,7 @@ mod tests { let TestFixtures { source, object_store, - file_meta, + files, .. } = make_source(files, &file_schema).await?; @@ -564,8 +575,7 @@ mod tests { // Create an opener with this let opener = source.create_file_opener(object_store.clone(), &base_config, 0); - let file1 = file_meta.get("part=1/file.vortex").unwrap().clone(); - let part_file1 = PartitionedFile::new("part=1/file.vortex", file1.object_meta.size); + let (file1, part_file1) = files.get("part=1/file.vortex"); let open_result = opener.open(file1, part_file1)?.await?; @@ -597,16 +607,14 @@ mod tests { let table_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)])); let TestFixtures { - file_meta, object_store, source, + files, .. } = make_source(files, &table_schema).await?; - let file1 = file_meta.get("path/file1.vortex").unwrap().clone(); - let part_file1 = PartitionedFile::new("path/file1.vortex", file1.object_meta.size); - let file2 = file_meta.get("path/file2.vortex").unwrap().clone(); - let part_file2 = PartitionedFile::new("path/file2.vortex", file1.object_meta.size); + let (file1, part_file1) = files.get("path/file1.vortex"); + let (file2, part_file2) = files.get("path/file2.vortex"); // Try and push filters into the source. let filter = col("a").lt(lit(100_i32)); @@ -616,10 +624,6 @@ mod tests { // filter should've succeeded pushing assert!(matches!(pushdown_result.filters[0], PushedDown::Yes)); - // // Use a SchemaAdapter to allow for reading with evolution. - // Why does this not matter? - // let schema_adapter = DefaultSchemaAdapterFactory::default(); - let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("s3://in-memory")?, table_schema.clone(), @@ -660,7 +664,6 @@ mod tests { Ok(()) } - // #[tokio::test] // This test verifies that files with different column order than the // table schema can be opened without errors. The fix ensures that the @@ -669,18 +672,30 @@ mod tests { async fn test_schema_different_column_order() -> anyhow::Result<()> { use datafusion::arrow::util::pretty::pretty_format_batches_with_options; - let object_store = Arc::new(InMemory::new()) as Arc; - let file_path = "/path/file.vortex"; - - // File has columns in order: c, b, a + // File has field order c,b,a let batch = record_batch!( ("c", Int32, vec![Some(300), Some(301), Some(302)]), ("b", Int32, vec![Some(200), Some(201), Some(202)]), ("a", Int32, vec![Some(100), Some(101), Some(102)]) - ) - .unwrap(); - let data_size = write_arrow_to_vortex(object_store.clone(), file_path, batch).await?; - let file = PartitionedFile::new(file_path.to_string(), data_size); + )?; + + // table schema has field order a,b,c + let table_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, true), + Field::new("b", DataType::Int32, true), + Field::new("c", DataType::Int32, true), + ])); + + let files = HashMap::from_iter([("path/file1.vortex".to_string(), batch)]); + + let TestFixtures { + source, + files, + object_store, + .. + } = make_source(files, &table_schema).await?; + + let (file1, part_file1) = files.get("path/file1.vortex"); // Table schema has columns in different order: a, b, c let table_schema = Arc::new(Schema::new(vec![ @@ -689,26 +704,16 @@ mod tests { Field::new("c", DataType::Int32, true), ])); - let opener = VortexOpener { - session: SESSION.clone(), - object_store: object_store.clone(), - projection: Some([0, 1, 2].into()), - filter: None, - file_pruning_predicate: None, - expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _), - schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), - partition_fields: vec![], - file_cache: VortexFileCache::new(1, 1, SESSION.clone()), - logical_schema: table_schema.clone(), - batch_size: 100, - limit: None, - metrics: Default::default(), - layout_readers: Default::default(), - has_output_ordering: false, - }; + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("s3://in-memory")?, + table_schema.clone(), + source.clone(), + ) + .build(); + let opener = source.create_file_opener(object_store, &base_config, 0); // The opener should successfully open the file and reorder columns - let stream = opener.open(make_meta(file_path, data_size), file)?.await?; + let stream = opener.open(file1, part_file1)?.await?; let format_opts = FormatOptions::new().with_types_info(true); let data = stream.try_collect::>().await?; @@ -773,7 +778,7 @@ mod tests { let TestFixtures { source, - file_meta, + files, object_store, .. } = make_source(files, &table_schema).await?; @@ -794,8 +799,7 @@ mod tests { let opener = source.create_file_opener(object_store.clone(), &base_config, 0); - let file = file_meta.get("path/file.vortex").unwrap().clone(); - let part_file = PartitionedFile::new("path/file.vortex", file.object_meta.size); + let (file, part_file) = files.get("path/file.vortex"); let data = opener .open(file, part_file)? From 12cdfd3a53e3160cef45ac0e663da25a5b842ab9 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 16:06:16 -0500 Subject: [PATCH 10/16] fixup Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/opener.rs | 43 ++++++++++-- vortex-datafusion/src/persistent/source.rs | 76 +++++++++------------- 2 files changed, 68 insertions(+), 51 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 70b5655d8c7..9f345fd3451 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -435,6 +435,8 @@ mod tests { use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; + use datafusion_physical_expr::utils::conjunction; + use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; use futures::pin_mut; use insta::assert_snapshot; @@ -454,6 +456,30 @@ mod tests { use super::*; use crate::VortexSource; + fn check_pushdown_result( + filters: Vec, + expected_pushed_filters: impl IntoIterator, + pushdown_result: &FilterPushdownPropagation>, + ) { + assert_eq!(filters.len(), pushdown_result.filters.len()); + + for filter in &pushdown_result.filters { + assert!(matches!(filter, PushedDown::No)); + } + + let updated_src = pushdown_result + .updated_node + .as_ref() + .expect("try_pushdown_filters for VortexSource should always return updated node"); + let vortex_src = updated_src + .as_any() + .downcast_ref::() + .expect("downcast to VortexSource"); + + let expected = conjunction(expected_pushed_filters); + assert_eq!(Some(expected), vortex_src.pushed_predicate); + } + /// Fixtures used for integration testing the FileSource and FileOpener struct TestFixtures { object_store: Arc, @@ -563,6 +589,8 @@ mod tests { let push_filters = source.try_pushdown_filters(vec![filter_partition_col], &ConfigOptions::default())?; + let source = push_filters.updated_node.unwrap(); + assert!(matches!(push_filters.filters[0], PushedDown::No)); let base_config = FileScanConfigBuilder::new( @@ -620,9 +648,11 @@ mod tests { let filter = col("a").lt(lit(100_i32)); let filter = logical2physical(&filter, table_schema.as_ref()); let pushdown_result = - source.try_pushdown_filters(vec![filter], &ConfigOptions::default())?; - // filter should've succeeded pushing - assert!(matches!(pushdown_result.filters[0], PushedDown::Yes)); + source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?; + + check_pushdown_result(vec![filter.clone()], vec![filter.clone()], &pushdown_result); + + let source = pushdown_result.updated_node.unwrap(); let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("s3://in-memory")?, @@ -785,10 +815,11 @@ mod tests { let filter = logical2physical(&col("my_struct").is_not_null(), &table_schema); let pushdown_result = - source.try_pushdown_filters(vec![filter], &ConfigOptions::default())?; + source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?; + + check_pushdown_result(vec![filter.clone()], vec![filter], &pushdown_result); - // The filter should not have been pushed - assert!(matches!(pushdown_result.filters[0], PushedDown::Yes)); + let source = pushdown_result.updated_node.unwrap(); let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("s3://in-memory")?, diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index d42436eb652..4d5061c6199 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -24,7 +24,6 @@ use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; -use datafusion_physical_plan::filter_pushdown::PushedDownPredicate; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; use object_store::path::Path; @@ -52,11 +51,11 @@ pub struct VortexSource { pub(crate) full_predicate: Option, /// Subset of predicates that can be pushed down into Vortex scan operations. /// These are expressions that Vortex can efficiently evaluate during scanning. - pub(crate) vortex_predicate: Option, + pub(crate) pushed_predicate: Option, pub(crate) batch_size: Option, pub(crate) projected_statistics: Option, /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema. - pub(crate) arrow_file_schema: Option, + pub(crate) table_schema: Option, pub(crate) schema_adapter_factory: Option>, pub(crate) expr_adapter_factory: Option>, _unused_df_metrics: ExecutionPlanMetricsSet, @@ -72,10 +71,10 @@ impl VortexSource { session, file_cache, full_predicate: None, - vortex_predicate: None, + pushed_predicate: None, batch_size: None, projected_statistics: None, - arrow_file_schema: None, + table_schema: None, schema_adapter_factory: None, expr_adapter_factory: None, _unused_df_metrics: Default::default(), @@ -144,7 +143,7 @@ impl FileSource for VortexSource { session: self.session.clone(), object_store, projection, - filter: self.vortex_predicate.clone(), + filter: self.pushed_predicate.clone(), file_pruning_predicate: self.full_predicate.clone(), expr_adapter_factory, schema_adapter_factory, @@ -173,7 +172,7 @@ impl FileSource for VortexSource { fn with_schema(&self, schema: SchemaRef) -> Arc { let mut source = self.clone(); - source.arrow_file_schema = Some(schema); + source.table_schema = Some(schema); Arc::new(source) } @@ -188,7 +187,7 @@ impl FileSource for VortexSource { } fn filter(&self) -> Option> { - self.vortex_predicate.clone() + self.pushed_predicate.clone() } fn metrics(&self) -> &ExecutionPlanMetricsSet { @@ -201,7 +200,7 @@ impl FileSource for VortexSource { .clone() .vortex_expect("projected_statistics must be set"); - if self.vortex_predicate.is_some() { + if self.pushed_predicate.is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -215,13 +214,13 @@ impl FileSource for VortexSource { fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.pushed_predicate { write!(f, ", predicate: {predicate}")?; } } // Use TreeRender style key=value formatting to display the predicate DisplayFormatType::TreeRender => { - if let Some(ref predicate) = self.vortex_predicate { + if let Some(ref predicate) = self.pushed_predicate { writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?; }; } @@ -234,13 +233,16 @@ impl FileSource for VortexSource { filters: Vec>, _config: &ConfigOptions, ) -> DFResult>> { + let num_filters = filters.len(); + if filters.is_empty() { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( vec![], )); } - let Some(schema) = self.arrow_file_schema.as_ref() else { + // only try and push filters if we know the schema + let Some(schema) = self.table_schema.as_ref() else { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( vec![PushedDown::No; filters.len()], )); @@ -257,47 +259,31 @@ impl FileSource for VortexSource { None => Some(conjunction(filters.clone())), }; + // Update the predicate with any pushed filters let supported_filters = filters .into_iter() - .map(|expr| { - if can_be_pushed_down(&expr, schema) { - PushedDownPredicate::supported(expr) - } else { - PushedDownPredicate::unsupported(expr) - } - }) + .filter(|expr| can_be_pushed_down(&expr, schema)) .collect::>(); - if supported_filters - .iter() - .all(|p| matches!(p.discriminant, PushedDown::No)) - { - return Ok(FilterPushdownPropagation::with_parent_pushdown_result( - vec![PushedDown::No; supported_filters.len()], - ) - .with_updated_node(Arc::new(source) as _)); - } - - let supported = supported_filters - .iter() - .filter_map(|p| match p.discriminant { - PushedDown::Yes => Some(&p.predicate), - PushedDown::No => None, - }) - .cloned(); - - let predicate = match source.vortex_predicate { - Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)), - None => conjunction(supported), + let predicate = match source.pushed_predicate { + Some(predicate) => conjunction(std::iter::once(predicate).chain(supported_filters)), + None => conjunction(supported_filters), }; - tracing::debug!(%predicate, "Saving predicate"); + tracing::debug!(%predicate, "updating predicate with new filters"); - source.vortex_predicate = Some(predicate); + source.pushed_predicate = Some(predicate); - Ok(FilterPushdownPropagation::with_parent_pushdown_result( - supported_filters.iter().map(|f| f.discriminant).collect(), - ) + // NOTE: we always report no pushdown to DataFusion, which forces it to postfilter our + // results. Due to schema evolution and schema adapters/expression adapters, we can't + // guarantee that filters over missing columns can be executed directly in Vortex. + // + // But, we still return the updated source node so that the filters are used for + // zone map pruning. + Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![ + PushedDown::No; + num_filters + ]) .with_updated_node(Arc::new(source) as _)) } From 96e4422ef7ee5553138ce1c0275def13947afb3c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 16:09:36 -0500 Subject: [PATCH 11/16] more Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 6 ++---- vortex-datafusion/src/persistent/source.rs | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 88bc5f90d42..61f2f0e9489 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -223,8 +223,6 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> return false; } - let _expr_str = format!("{:?}", df_expr); - let expr = df_expr.as_any(); if let Some(binary) = expr.downcast_ref::() { can_binary_be_pushed_down(binary, schema) @@ -714,7 +712,7 @@ mod tests { let deep_filter = get_field(get_field(col("a"), "b"), "c").eq(datafusion_expr::lit(10i32)); - let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()); + let physical_filter = logical2physical(&deep_filter, df_schema.as_ref()); let prop = source .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) @@ -786,7 +784,7 @@ mod tests { // Another weird ScalarFunction that we can't push down let deep_filter = unknown_func.eq(datafusion_expr::lit(10i32)); - let physical_filter = logical2physical(&deep_filter, &df_schema.as_ref()); + let physical_filter = logical2physical(&deep_filter, df_schema.as_ref()); let prop = source .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index 4d5061c6199..dd4651b71fb 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -262,7 +262,7 @@ impl FileSource for VortexSource { // Update the predicate with any pushed filters let supported_filters = filters .into_iter() - .filter(|expr| can_be_pushed_down(&expr, schema)) + .filter(|expr| can_be_pushed_down(expr, schema)) .collect::>(); let predicate = match source.pushed_predicate { From 8ae6e01a9ff01dcd944c565058ceee23ca5b08e4 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 16:12:06 -0500 Subject: [PATCH 12/16] url Signed-off-by: Andrew Duffy --- vortex-datafusion/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 84dfd6c592d..cb0e2998e56 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -37,9 +37,9 @@ object_store = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "fs"] } tokio-stream = { workspace = true } tracing = { workspace = true } +url = { workspace = true } vortex = { workspace = true, features = ["object_store", "tokio"] } vortex-utils = { workspace = true, features = ["dashmap"] } -url = "2.5.7" [dev-dependencies] anyhow = { workspace = true } From a562994c8a22f6a39ce89cd8536a5f10cedf0437 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 16:20:05 -0500 Subject: [PATCH 13/16] comment Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/opener.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 9f345fd3451..0c5e5d15ba4 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -524,7 +524,6 @@ mod tests { // "write" all the record batches to the named file paths let mut file_meta = HashMap::with_capacity(files.len()); - // TODO: make file schema by superset of fields? for (path_str, rb) in files.iter() { let array = ArrayRef::from_arrow(rb, false); let path = Path::from_url_path(path_str.as_str())?; From fda14c1a6cf05508ede73a1caeea47e686b5b855 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 17:05:58 -0500 Subject: [PATCH 14/16] fix tests Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index 61f2f0e9489..d71fcd5af7f 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -508,8 +508,7 @@ mod tests { DataType::List(Arc::new(Field::new("item", DataType::Int32, true))), false )] - #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into() - ), false)] + #[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), true)] // Dictionary types - should be supported if value type is supported #[case::dict_utf8( DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)), @@ -681,9 +680,11 @@ mod tests { let source = vortex_source(&test_schema); let prop = source - .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) + .try_pushdown_filters(vec![physical_filter.clone()], &ConfigOptions::default()) .unwrap(); - assert!(matches!(prop.filters[0], PushedDown::Yes)); + let updated_source = prop.updated_node.unwrap(); + let pushed_filters = updated_source.filter(); + assert_eq!(pushed_filters, Some(physical_filter)) } #[test] @@ -715,9 +716,11 @@ mod tests { let physical_filter = logical2physical(&deep_filter, df_schema.as_ref()); let prop = source - .try_pushdown_filters(vec![physical_filter], &ConfigOptions::default()) + .try_pushdown_filters(vec![physical_filter.clone()], &ConfigOptions::default()) .unwrap(); - assert!(matches!(prop.filters[0], PushedDown::Yes)); + let updated_source = prop.updated_node.unwrap(); + let pushed_filters = updated_source.filter(); + assert_eq!(pushed_filters, Some(physical_filter)) } #[test] From 17a608928a24b42852af7d16fb2b78ed28591ede Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Nov 2025 17:09:05 -0500 Subject: [PATCH 15/16] fix Signed-off-by: Andrew Duffy --- vortex-datafusion/src/persistent/opener.rs | 40 ++++++---------------- 1 file changed, 10 insertions(+), 30 deletions(-) diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index 0c5e5d15ba4..f4f6585dec4 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -358,6 +358,13 @@ impl FileOpener for VortexOpener { .transpose() .map_err(|e| DataFusionError::External(e.into()))?; + tracing::debug!( + ?filter, + ?projection, + ?projection_expr, + "opening file with predicate and projection" + ); + if let Some(limit) = limit && filter.is_none() { @@ -435,8 +442,6 @@ mod tests { use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfigBuilder; use datafusion_execution::object_store::ObjectStoreUrl; - use datafusion_physical_expr::utils::conjunction; - use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; use futures::pin_mut; use insta::assert_snapshot; @@ -456,30 +461,6 @@ mod tests { use super::*; use crate::VortexSource; - fn check_pushdown_result( - filters: Vec, - expected_pushed_filters: impl IntoIterator, - pushdown_result: &FilterPushdownPropagation>, - ) { - assert_eq!(filters.len(), pushdown_result.filters.len()); - - for filter in &pushdown_result.filters { - assert!(matches!(filter, PushedDown::No)); - } - - let updated_src = pushdown_result - .updated_node - .as_ref() - .expect("try_pushdown_filters for VortexSource should always return updated node"); - let vortex_src = updated_src - .as_any() - .downcast_ref::() - .expect("downcast to VortexSource"); - - let expected = conjunction(expected_pushed_filters); - assert_eq!(Some(expected), vortex_src.pushed_predicate); - } - /// Fixtures used for integration testing the FileSource and FileOpener struct TestFixtures { object_store: Arc, @@ -649,10 +630,10 @@ mod tests { let pushdown_result = source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?; - check_pushdown_result(vec![filter.clone()], vec![filter.clone()], &pushdown_result); - let source = pushdown_result.updated_node.unwrap(); + assert_eq!(source.filter(), Some(filter)); + let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("s3://in-memory")?, table_schema.clone(), @@ -816,9 +797,8 @@ mod tests { let pushdown_result = source.try_pushdown_filters(vec![filter.clone()], &ConfigOptions::default())?; - check_pushdown_result(vec![filter.clone()], vec![filter], &pushdown_result); - let source = pushdown_result.updated_node.unwrap(); + assert_eq!(source.filter(), Some(filter)); let base_config = FileScanConfigBuilder::new( ObjectStoreUrl::parse("s3://in-memory")?, From 373a365e28942d2bb83454f7af6b1f15cb86bc84 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Nov 2025 10:35:54 -0500 Subject: [PATCH 16/16] handle post-filtering within opener Signed-off-by: Andrew Duffy --- vortex-datafusion/src/convert/exprs.rs | 12 --- vortex-datafusion/src/persistent/opener.rs | 120 ++++++++++----------- vortex-datafusion/src/persistent/source.rs | 46 +++++--- 3 files changed, 89 insertions(+), 89 deletions(-) diff --git a/vortex-datafusion/src/convert/exprs.rs b/vortex-datafusion/src/convert/exprs.rs index d71fcd5af7f..13e248f9946 100644 --- a/vortex-datafusion/src/convert/exprs.rs +++ b/vortex-datafusion/src/convert/exprs.rs @@ -38,18 +38,6 @@ use vortex::scalar::Scalar; use crate::convert::FromDataFusion; use crate::convert::TryFromDataFusion; -/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty. -pub(crate) fn make_vortex_predicate( - predicate: &[&Arc], -) -> VortexResult> { - let exprs = predicate - .iter() - .map(|e| Expression::try_from_df(e.as_ref())) - .collect::>>()?; - - Ok(exprs.into_iter().reduce(and)) -} - // TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep // for that node, up to any `and` or `or` node. impl TryFromDataFusion for Expression { diff --git a/vortex-datafusion/src/persistent/opener.rs b/vortex-datafusion/src/persistent/opener.rs index f4f6585dec4..2e59c59fa54 100644 --- a/vortex-datafusion/src/persistent/opener.rs +++ b/vortex-datafusion/src/persistent/opener.rs @@ -11,12 +11,15 @@ use arrow_schema::SchemaRef; use datafusion_common::DataFusionError; use datafusion_common::Result as DFResult; use datafusion_common::arrow::array::RecordBatch; +use datafusion_common::arrow::compute::filter_record_batch; +use datafusion_common::cast::as_boolean_array; use datafusion_datasource::PartitionedFile; use datafusion_datasource::file_meta::FileMeta; use datafusion_datasource::file_stream::FileOpenFuture; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::schema_adapter::SchemaAdapterFactory; use datafusion_physical_expr::PhysicalExprRef; +use datafusion_physical_expr::conjunction; use datafusion_physical_expr::simplifier::PhysicalExprSimplifier; use datafusion_physical_expr::split_conjunction; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; @@ -32,8 +35,9 @@ use object_store::path::Path; use tracing::Instrument; use vortex::dtype::FieldName; use vortex::error::VortexError; +use vortex::expr; +use vortex::expr::Expression; use vortex::expr::root; -use vortex::expr::select; use vortex::layout::LayoutReader; use vortex::metrics::VortexMetrics; use vortex::scan::ScanBuilder; @@ -42,8 +46,8 @@ use vortex_utils::aliases::dash_map::DashMap; use vortex_utils::aliases::dash_map::Entry; use super::cache::VortexFileCache; +use crate::convert::TryFromDataFusion; use crate::convert::exprs::can_be_pushed_down; -use crate::convert::exprs::make_vortex_predicate; use crate::convert::ranges::apply_byte_range; #[derive(Clone)] @@ -228,6 +232,9 @@ impl FileOpener for VortexOpener { DataFusionError::Execution(format!("Failed to convert file schema to arrow: {e}")) })?); + let logical_file_schema = + compute_logical_file_schema(&physical_file_schema, &logical_schema); + if let Some(expr_adapter_factory) = expr_adapter_factory { let partition_values = partition_fields .iter() @@ -239,13 +246,8 @@ impl FileOpener for VortexOpener { // for schema evolution and divergence between the table's schema and individual files. filter = filter .map(|filter| { - let logical_file_schema = compute_logical_file_schema( - &physical_file_schema.clone(), - &logical_schema, - ); - let expr = expr_adapter_factory - .create(logical_file_schema, physical_file_schema.clone()) + .create(logical_file_schema.clone(), physical_file_schema.clone()) .with_partition_values(partition_values) .rewrite(filter)?; @@ -261,52 +263,19 @@ impl FileOpener for VortexOpener { // Create the initial mapping from physical file schema to projected schema. // This gives us the field reordering and tells us which logical schema fields // to select. - let (_schema_mapping, adapted_projections) = - schema_adapter.map_schema(&physical_file_schema)?; + let (schema_mapping, adapted_projections) = + schema_adapter.map_schema(&logical_file_schema)?; // Build the Vortex projection expression using the adapted projections. // This will reorder the fields to match the target order. let fields = adapted_projections .iter() - .map(|idx| { - let field = logical_schema.field(*idx); + .map(|&idx| { + let field = logical_file_schema.field(idx); FieldName::from(field.name().as_str()) }) .collect::>(); - let projection_expr = select(fields, root()); - - // After Vortex applies the projection, the batch will have fields in the target - // order (matching adapted_projections), but with the physical file types. - // We need a second schema mapping for type casting only, not reordering. - // Build a schema that represents what Vortex will return: fields in target order - // with physical types. - let projected_physical_fields: Vec = adapted_projections - .iter() - .map(|&idx| { - let logical_field = logical_schema.field(idx); - let field_name = logical_field.name(); - - // Find this field in the physical schema to get its physical type - physical_file_schema - .field_with_name(field_name) - .map(|phys_field| { - Field::new( - field_name, - merge_field_types(phys_field, logical_field), - phys_field.is_nullable(), - ) - }) - .unwrap_or_else(|_| (*logical_field).clone()) - }) - .collect(); - - let projected_physical_schema = - Arc::new(arrow_schema::Schema::new(projected_physical_fields)); - - // Create a second mapping from the projected physical schema (what Vortex returns) - // to the final projected schema. This mapping will handle type casting without reordering. - let (batch_schema_mapping, _) = - schema_adapter.map_schema(&projected_physical_schema)?; + let projection_expr = expr::select(fields, root()); // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once. let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) { @@ -346,27 +315,49 @@ impl FileOpener for VortexOpener { ); } - let filter = filter - .and_then(|f| { - let exprs = split_conjunction(&f) - .into_iter() - .filter(|expr| can_be_pushed_down(expr, &predicate_file_schema)) - .collect::>(); + // Split the filter expressions into those that can be applied within the file scan, + // and those that need to be applied afterward in-memory. + let mut pushed_filters = Vec::new(); + let mut post_filters = Vec::new(); + + if let Some(filter) = filter { + for expr in split_conjunction(&filter) { + if can_be_pushed_down(expr, &predicate_file_schema) + && let Ok(vortex_expr) = Expression::try_from_df(expr.as_ref()) + { + pushed_filters.push(vortex_expr); + } else { + post_filters.push(expr.clone()); + } + } + } - make_vortex_predicate(&exprs).transpose() - }) - .transpose() - .map_err(|e| DataFusionError::External(e.into()))?; + let pushed_filter = pushed_filters.into_iter().reduce(expr::and); + let post_filter: Box) -> DFResult + Send> = + if post_filters.is_empty() { + Box::new(|batch: DFResult| batch) + } else { + let conjunction = conjunction(post_filters.clone()); + Box::new( + move |batch: DFResult| -> DFResult { + let batch = batch?; + let filter = conjunction.evaluate(&batch)?; + let filter = filter.into_array(batch.num_rows())?; + let filter = as_boolean_array(&filter)?; + filter_record_batch(&batch, filter).map_err(DataFusionError::from) + }, + ) + }; tracing::debug!( - ?filter, - ?projection, + ?pushed_filter, + ?post_filters, ?projection_expr, - "opening file with predicate and projection" + "opening file with predicates and projection" ); if let Some(limit) = limit - && filter.is_none() + && pushed_filter.is_none() { scan_builder = scan_builder.with_limit(limit); } @@ -374,7 +365,7 @@ impl FileOpener for VortexOpener { let stream = scan_builder .with_metrics(metrics) .with_projection(projection_expr) - .with_some_filter(filter) + .with_some_filter(pushed_filter) .with_ordered(has_output_ordering) .map(|chunk| RecordBatch::try_from(chunk.as_ref())) .into_stream() @@ -409,7 +400,12 @@ impl FileOpener for VortexOpener { )))) }) .try_flatten() - .map(move |batch| batch.and_then(|b| batch_schema_mapping.map_batch(b))) + .map(move |batch| batch.and_then(|b| schema_mapping.map_batch(b))) + // Apply the post-filter step, which will execute any filters that couldn't + // be pushed down for this file. This is applicable for any filters over fields + // missing from the file schema that exist in the table schema, and are filled in + // from the schema adapter. + .map(post_filter) .boxed(); Ok(stream) diff --git a/vortex-datafusion/src/persistent/source.rs b/vortex-datafusion/src/persistent/source.rs index dd4651b71fb..b4dd9207f96 100644 --- a/vortex-datafusion/src/persistent/source.rs +++ b/vortex-datafusion/src/persistent/source.rs @@ -24,6 +24,7 @@ use datafusion_physical_plan::DisplayFormatType; use datafusion_physical_plan::PhysicalExpr; use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::filter_pushdown::PushedDown; +use datafusion_physical_plan::filter_pushdown::PushedDownPredicate; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; use object_store::path::Path; @@ -233,8 +234,6 @@ impl FileSource for VortexSource { filters: Vec>, _config: &ConfigOptions, ) -> DFResult>> { - let num_filters = filters.len(); - if filters.is_empty() { return Ok(FilterPushdownPropagation::with_parent_pushdown_result( vec![], @@ -262,28 +261,45 @@ impl FileSource for VortexSource { // Update the predicate with any pushed filters let supported_filters = filters .into_iter() - .filter(|expr| can_be_pushed_down(expr, schema)) + .map(|expr| { + if can_be_pushed_down(&expr, schema) { + PushedDownPredicate::supported(expr) + } else { + PushedDownPredicate::unsupported(expr) + } + }) .collect::>(); + if supported_filters + .iter() + .all(|p| matches!(p.discriminant, PushedDown::No)) + { + return Ok(FilterPushdownPropagation::with_parent_pushdown_result( + vec![PushedDown::No; supported_filters.len()], + ) + .with_updated_node(Arc::new(source) as _)); + } + + let supported = supported_filters + .iter() + .filter_map(|p| match p.discriminant { + PushedDown::Yes => Some(&p.predicate), + PushedDown::No => None, + }) + .cloned(); + let predicate = match source.pushed_predicate { - Some(predicate) => conjunction(std::iter::once(predicate).chain(supported_filters)), - None => conjunction(supported_filters), + Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)), + None => conjunction(supported), }; tracing::debug!(%predicate, "updating predicate with new filters"); source.pushed_predicate = Some(predicate); - // NOTE: we always report no pushdown to DataFusion, which forces it to postfilter our - // results. Due to schema evolution and schema adapters/expression adapters, we can't - // guarantee that filters over missing columns can be executed directly in Vortex. - // - // But, we still return the updated source node so that the filters are used for - // zone map pruning. - Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![ - PushedDown::No; - num_filters - ]) + Ok(FilterPushdownPropagation::with_parent_pushdown_result( + supported_filters.iter().map(|f| f.discriminant).collect(), + ) .with_updated_node(Arc::new(source) as _)) }