Skip to content
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ object_store = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "fs"] }
tokio-stream = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
vortex = { workspace = true, features = ["object_store", "tokio"] }
vortex-utils = { workspace = true, features = ["dashmap"] }

[dev-dependencies]
anyhow = { workspace = true }
datafusion = { workspace = true }
datafusion-common = { workspace = true }
insta = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion vortex-datafusion/examples/vortex_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ async fn main() -> anyhow::Result<()> {

ctx.register_table("vortex_tbl", listing_table as _)?;

run_query(&ctx, "SELECT * FROM vortex_tbl").await?;
run_query(
&ctx,
"SELECT * FROM vortex_tbl where numbers % 2 = 0 AND strings LIKE 'b%'",
)
.await?;

Ok(())
}
Expand Down
260 changes: 173 additions & 87 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,6 @@ use vortex::scalar::Scalar;
use crate::convert::FromDataFusion;
use crate::convert::TryFromDataFusion;

/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
pub(crate) fn make_vortex_predicate(
predicate: &[&Arc<dyn PhysicalExpr>],
) -> VortexResult<Option<Expression>> {
let exprs = predicate
.iter()
.map(|e| Expression::try_from_df(e.as_ref()))
.collect::<VortexResult<Vec<_>>>()?;

Ok(exprs.into_iter().reduce(and))
}

// TODO(joe): Don't return an error when we have an unsupported node, bubble up "TRUE" as in keep
// for that node, up to any `and` or `or` node.
impl TryFromDataFusion<dyn PhysicalExpr> for Expression {
Expand Down Expand Up @@ -245,7 +233,10 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) ->
can_be_pushed_down(in_list.expr(), schema)
&& in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
can_scalar_fn_be_pushed_down(scalar_fn, schema)
// Only get_field expressions should be pushed down. Note, we know that
// the GetFieldFunc call should be well-formed, because the DataFusion planner
// checks that for us before we even get to the DataSource.
ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn).is_some()
} else {
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
false
Expand Down Expand Up @@ -283,6 +274,7 @@ fn supported_data_types(dt: &DataType) -> bool {
| Timestamp(_, _)
| Time32(_)
| Time64(_)
| Struct(_)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably expand this list further? I just added Struct to make one of the tests pass

Copy link
Contributor

@robert3005 robert3005 Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are missing List/ListView/FixedSiedList and interval types. Map and Union as well

);

if !is_supported {
Expand All @@ -292,75 +284,47 @@ fn supported_data_types(dt: &DataType) -> bool {
is_supported
}

/// Checks if a GetField scalar function can be pushed down.
fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool {
let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
else {
// Only get_field pushdown is supported.
return false;
};

let args = get_field_fn.args();
if args.len() != 2 {
tracing::debug!(
"Expected 2 arguments for GetField, not pushing down {} arguments",
args.len()
);
return false;
}
let source_expr = &args[0];
let field_name_expr = &args[1];
let Some(field_name) = field_name_expr
.as_any()
.downcast_ref::<df_expr::Literal>()
.and_then(|lit| lit.value().try_as_str().flatten())
else {
return false;
};

let Ok(source_dt) = source_expr.data_type(schema) else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type for GetField, not pushing down"
);
return false;
};
let DataType::Struct(fields) = source_dt else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type as struct for GetField, not pushing down"
);
return false;
};
fields.find(field_name).is_some()
}

#[cfg(test)]
mod tests {
use std::any::Any;
use std::sync::Arc;

use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::Fields;
use arrow_schema::Schema;
use arrow_schema::SchemaBuilder;
use arrow_schema::SchemaRef;
use arrow_schema::TimeUnit as ArrowTimeUnit;
use datafusion::functions::core::getfield::GetFieldFunc;
use datafusion::logical_expr::ColumnarValue;
use datafusion::logical_expr::Signature;
use datafusion_common::ScalarValue;
use datafusion_common::ToDFSchema;
use datafusion_common::config::ConfigOptions;
use datafusion_datasource::file::FileSource;
use datafusion_expr::Expr;
use datafusion_expr::Operator as DFOperator;
use datafusion_expr::ScalarFunctionArgs;
use datafusion_expr::ScalarUDF;
use datafusion_expr::ScalarUDFImpl;
use datafusion_expr::Volatility;
use datafusion_expr::col;
use datafusion_expr::expr::ScalarFunction;
use datafusion_functions::expr_fn::get_field;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::planner::logical2physical;
use datafusion_physical_plan::expressions as df_expr;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use insta::assert_snapshot;
use rstest::rstest;
use vortex::VortexSessionDefault;
use vortex::expr::Expression;
use vortex::expr::Operator;
use vortex::session::VortexSession;

use super::*;
use crate::VortexSource;
use crate::persistent::cache::VortexFileCache;

#[rstest::fixture]
fn test_schema() -> Schema {
Expand Down Expand Up @@ -532,7 +496,7 @@ mod tests {
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false
)]
#[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), false)]
#[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), true)]
// Dictionary types - should be supported if value type is supported
#[case::dict_utf8(
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
Expand Down Expand Up @@ -674,33 +638,155 @@ mod tests {
"#);
}

#[rstest]
#[case::valid_field("field1", true)]
#[case::missing_field("nonexistent_field", false)]
fn test_can_be_pushed_down_get_field(#[case] field_name: &str, #[case] expected: bool) {
let struct_fields = Fields::from(vec![
Field::new("field1", DataType::Utf8, true),
Field::new("field2", DataType::Int32, true),
]);
let schema = Schema::new(vec![Field::new(
"my_struct",
DataType::Struct(struct_fields),
true,
)]);
#[test]
fn test_pushdown_nested_filter() {
// schema:
// a: struct
// |- one: i32
// b:struct
// |- two: i32
let mut test_schema = SchemaBuilder::new();
test_schema.push(Field::new_struct(
"a",
vec![Field::new("one", DataType::Int32, false)],
false,
));
test_schema.push(Field::new_struct(
"b",
vec![Field::new("two", DataType::Int32, false)],
false,
));

let struct_col = Arc::new(df_expr::Column::new("my_struct", 0)) as Arc<dyn PhysicalExpr>;
let field_name_lit = Arc::new(df_expr::Literal::new(ScalarValue::Utf8(Some(
field_name.to_string(),
)))) as Arc<dyn PhysicalExpr>;
let test_schema = Arc::new(test_schema.finish());
// Make sure filter is pushed down
let filter = get_field(col("b"), "two").eq(datafusion_expr::lit(10i32));

let get_field_expr = Arc::new(ScalarFunctionExpr::new(
"get_field",
Arc::new(ScalarUDF::from(GetFieldFunc::new())),
vec![struct_col, field_name_lit],
Arc::new(Field::new(field_name, DataType::Utf8, true)),
Arc::new(ConfigOptions::new()),
)) as Arc<dyn PhysicalExpr>;
let df_schema = test_schema.clone().to_dfschema().unwrap();

let physical_filter = logical2physical(&filter, df_schema.as_ref());

let source = vortex_source(&test_schema);

let prop = source
.try_pushdown_filters(vec![physical_filter.clone()], &ConfigOptions::default())
.unwrap();
let updated_source = prop.updated_node.unwrap();
let pushed_filters = updated_source.filter();
assert_eq!(pushed_filters, Some(physical_filter))
}

#[test]
fn test_pushdown_deeply_nested_filter() {
// schema:
// a: struct
// |- b: struct
// |- c: i32
let mut schema = SchemaBuilder::new();

let a = Field::new_struct(
"a",
vec![Field::new_struct(
"b",
vec![Field::new("c", DataType::Int32, false)],
false,
)],
false,
);
schema.push(a);

let schema = Arc::new(schema.finish());
let df_schema = schema.clone().to_dfschema().unwrap();

let source = vortex_source(&schema);

let deep_filter = get_field(get_field(col("a"), "b"), "c").eq(datafusion_expr::lit(10i32));

let physical_filter = logical2physical(&deep_filter, df_schema.as_ref());

let prop = source
.try_pushdown_filters(vec![physical_filter.clone()], &ConfigOptions::default())
.unwrap();
let updated_source = prop.updated_node.unwrap();
let pushed_filters = updated_source.filter();
assert_eq!(pushed_filters, Some(physical_filter))
}

#[test]
fn test_unknown_scalar_function() {
#[derive(Debug, PartialEq, Eq, Hash)]
pub struct UnknownImpl {
signature: Signature,
}

impl ScalarUDFImpl for UnknownImpl {
fn as_any(&self) -> &dyn Any {
self
}

fn name(&self) -> &str {
"unknown"
}

fn signature(&self) -> &Signature {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> datafusion_common::Result<DataType> {
Ok(DataType::Int32)
}

fn invoke_with_args(
&self,
_args: ScalarFunctionArgs,
) -> datafusion_common::Result<ColumnarValue> {
Ok(ColumnarValue::Scalar(ScalarValue::Int32(Some(1))))
}
}

// schema:
// a: struct
// |- b: struct
// |- c: i32
let mut schema = SchemaBuilder::new();

let a = Field::new_struct(
"a",
vec![Field::new_struct(
"b",
vec![Field::new("c", DataType::Int32, false)],
false,
)],
false,
);
schema.push(a);

let schema = Arc::new(schema.finish());
let df_schema = schema.clone().to_dfschema().unwrap();

let source = vortex_source(&schema);

let unknown_func = Expr::ScalarFunction(ScalarFunction {
func: Arc::new(ScalarUDF::new_from_impl(UnknownImpl {
signature: Signature::nullary(Volatility::Immutable),
})),
args: vec![],
});

// Another weird ScalarFunction that we can't push down
let deep_filter = unknown_func.eq(datafusion_expr::lit(10i32));

let physical_filter = logical2physical(&deep_filter, df_schema.as_ref());

let prop = source
.try_pushdown_filters(vec![physical_filter], &ConfigOptions::default())
.unwrap();
assert!(matches!(prop.filters[0], PushedDown::No));
}

fn vortex_source(schema: &SchemaRef) -> Arc<dyn FileSource> {
let session = VortexSession::default();
let cache = VortexFileCache::new(1024, 1024, session.clone());

assert_eq!(can_be_pushed_down(&get_field_expr, &schema), expected);
Arc::new(VortexSource::new(session, cache)).with_schema(schema.clone())
}
}
1 change: 1 addition & 0 deletions vortex-datafusion/src/convert/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use vortex::error::VortexResult;

pub(crate) mod exprs;
pub(crate) mod ranges;
mod scalars;

/// First-party trait for implementing conversion from DataFusion types to Vortex types.
Expand Down
Loading
Loading