Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,499 changes: 797 additions & 4,702 deletions Cargo.lock

Large diffs are not rendered by default.

19 changes: 17 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
members = [
"bench-vortex",
# "bench-vortex",
"encodings/*",
"fuzz",
"vortex",
Expand All @@ -11,7 +11,7 @@ members = [
"vortex-cxx",
"vortex-datafusion",
"vortex-dtype",
"vortex-duckdb",
# "vortex-duckdb",
"vortex-error",
"vortex-ffi",
"vortex-file",
Expand Down Expand Up @@ -339,3 +339,18 @@ lto = false
[profile.bench_assert]
debug-assertions = true
inherits = "bench"

[patch.crates-io]
datafusion = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-catalog = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-common = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-common-runtime = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-datasource = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-execution = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-expr = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-functions = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-physical-expr = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-physical-expr-adapter = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-physical-expr-common = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-physical-plan = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
datafusion-pruning = { git = "https://github.com/apache/datafusion", rev = "86011519b5a" }
37 changes: 28 additions & 9 deletions vortex-array/src/arrays/struct_/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use itertools::Itertools;
use vortex_dtype::DType;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_scalar::Scalar;

use crate::ArrayRef;
use crate::IntoArray;
use crate::arrays::ConstantArray;
use crate::arrays::StructArray;
use crate::arrays::StructVTable;
use crate::compute::CastKernel;
Expand All @@ -28,8 +30,30 @@ impl CastKernel for StructVTable {
.as_struct_fields_opt()
.vortex_expect("struct array must have struct dtype");

if target_sdtype.names() != source_sdtype.names() {
vortex_bail!("cannot cast {} to {}", array.dtype(), dtype);
// Re-order, handle fields by value instead.
let mut cast_fields = vec![];
for (target_name, target_type) in
target_sdtype.names().iter().zip_eq(target_sdtype.fields())
{
match source_sdtype.find(target_name) {
None => {
// No source field with this name => evolve the schema compatibly.
// If the field is nullable, we add a new ConstantArray field with the type.
vortex_ensure!(
target_type.is_nullable(),
"CAST for struct only supports added nullable fields"
);

cast_fields.push(
ConstantArray::new(Scalar::null(target_type), array.len).into_array(),
);
}
Some(src_field_idx) => {
// Field exists in source field. Cast it to the target type.
let cast_field = cast(array.fields()[src_field_idx].as_ref(), &target_type)?;
cast_fields.push(cast_field);
}
}
}

let validity = array
Expand All @@ -39,12 +63,7 @@ impl CastKernel for StructVTable {

StructArray::try_new(
target_sdtype.names().clone(),
array
.fields()
.iter()
.zip_eq(target_sdtype.fields())
.map(|(field, dtype)| cast(field, &dtype))
.collect::<Result<Vec<_>, _>>()?,
cast_fields,
array.len(),
validity,
)
Expand Down
1 change: 1 addition & 0 deletions vortex-datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ insta = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["test-util", "rt-multi-thread", "fs"] }
url = { workspace = true }
walkdir = { workspace = true }

[lints]
Expand Down
74 changes: 21 additions & 53 deletions vortex-datafusion/src/convert/exprs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use datafusion_expr::Operator as DFOperator;
use datafusion_functions::core::getfield::GetFieldFunc;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::ScalarFunctionExpr;
use datafusion_physical_expr_common::physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::physical_expr::is_dynamic_physical_expr;
use datafusion_physical_plan::expressions as df_expr;
use itertools::Itertools;
Expand Down Expand Up @@ -40,7 +39,7 @@ use crate::convert::TryFromDataFusion;

/// Tries to convert the expressions into a vortex conjunction. Will return Ok(None) iff the input conjunction is empty.
pub(crate) fn make_vortex_predicate(
predicate: &[&Arc<dyn PhysicalExpr>],
predicate: &[Arc<dyn PhysicalExpr>],
) -> VortexResult<Option<Expression>> {
let exprs = predicate
.iter()
Expand Down Expand Up @@ -89,6 +88,14 @@ impl TryFromDataFusion<dyn PhysicalExpr> for Expression {
return Ok(cast(child, cast_dtype));
}

if let Some(cast_col_expr) = df.as_any().downcast_ref::<df_expr::CastColumnExpr>() {
let target = cast_col_expr.target_field();

let target_dtype = DType::from_arrow((target.data_type(), target.is_nullable().into()));
let child = Expression::try_from_df(cast_col_expr.expr().as_ref())?;
return Ok(cast(child, target_dtype));
}

if let Some(is_null_expr) = df.as_any().downcast_ref::<df_expr::IsNullExpr>() {
let arg = Expression::try_from_df(is_null_expr.arg().as_ref())?;
return Ok(is_null(arg));
Expand Down Expand Up @@ -216,7 +223,7 @@ impl TryFromDataFusion<DFOperator> for Operator {
}
}

pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) -> bool {
pub(crate) fn can_be_pushed_down(df_expr: &Arc<dyn PhysicalExpr>, schema: &Schema) -> bool {
// We currently do not support pushdown of dynamic expressions in DF.
// See issue: https://github.com/vortex-data/vortex/issues/4034
if is_dynamic_physical_expr(df_expr) {
Expand All @@ -235,8 +242,10 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) ->
can_be_pushed_down(like.expr(), schema) && can_be_pushed_down(like.pattern(), schema)
} else if let Some(lit) = expr.downcast_ref::<df_expr::Literal>() {
supported_data_types(&lit.value().data_type())
} else if let Some(cast) = expr.downcast_ref::<df_expr::CastExpr>() {
supported_data_types(cast.cast_type()) && can_be_pushed_down(cast.expr(), schema)
} else if expr.downcast_ref::<df_expr::CastExpr>().is_some()
|| expr.downcast_ref::<df_expr::CastColumnExpr>().is_some()
{
true
} else if let Some(is_null) = expr.downcast_ref::<df_expr::IsNullExpr>() {
can_be_pushed_down(is_null.arg(), schema)
} else if let Some(is_not_null) = expr.downcast_ref::<df_expr::IsNotNullExpr>() {
Expand All @@ -245,7 +254,7 @@ pub(crate) fn can_be_pushed_down(df_expr: &PhysicalExprRef, schema: &Schema) ->
can_be_pushed_down(in_list.expr(), schema)
&& in_list.list().iter().all(|e| can_be_pushed_down(e, schema))
} else if let Some(scalar_fn) = expr.downcast_ref::<ScalarFunctionExpr>() {
can_scalar_fn_be_pushed_down(scalar_fn, schema)
can_scalar_fn_be_pushed_down(scalar_fn)
} else {
tracing::debug!(%df_expr, "DataFusion expression can't be pushed down");
false
Expand Down Expand Up @@ -293,50 +302,8 @@ fn supported_data_types(dt: &DataType) -> bool {
}

/// Checks if a GetField scalar function can be pushed down.
fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr, schema: &Schema) -> bool {
let Some(get_field_fn) = ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn)
else {
// Only get_field pushdown is supported.
return false;
};

let args = get_field_fn.args();
if args.len() != 2 {
tracing::debug!(
"Expected 2 arguments for GetField, not pushing down {} arguments",
args.len()
);
return false;
}
let source_expr = &args[0];
let field_name_expr = &args[1];
let Some(field_name) = field_name_expr
.as_any()
.downcast_ref::<df_expr::Literal>()
.and_then(|lit| lit.value().try_as_str().flatten())
else {
return false;
};

let Ok(source_dt) = source_expr.data_type(schema) else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type for GetField, not pushing down"
);
return false;
};
let DataType::Struct(fields) = source_dt else {
tracing::debug!(
field_name = field_name,
schema = ?schema,
source_expr = ?source_expr,
"Failed to get source type as struct for GetField, not pushing down"
);
return false;
};
fields.find(field_name).is_some()
fn can_scalar_fn_be_pushed_down(scalar_fn: &ScalarFunctionExpr) -> bool {
ScalarFunctionExpr::try_downcast_func::<GetFieldFunc>(scalar_fn).is_some()
}

#[cfg(test)]
Expand Down Expand Up @@ -391,15 +358,15 @@ mod tests {
#[test]
fn test_make_vortex_predicate_single() {
let col_expr = Arc::new(df_expr::Column::new("test", 0)) as Arc<dyn PhysicalExpr>;
let result = make_vortex_predicate(&[&col_expr]).unwrap();
let result = make_vortex_predicate(&[col_expr]).unwrap();
assert!(result.is_some());
}

#[test]
fn test_make_vortex_predicate_multiple() {
let col1 = Arc::new(df_expr::Column::new("col1", 0)) as Arc<dyn PhysicalExpr>;
let col2 = Arc::new(df_expr::Column::new("col2", 1)) as Arc<dyn PhysicalExpr>;
let result = make_vortex_predicate(&[&col1, &col2]).unwrap();
let result = make_vortex_predicate(&[col1, col2]).unwrap();
assert!(result.is_some());
// Result should be an AND expression combining the two columns
}
Expand Down Expand Up @@ -532,7 +499,8 @@ mod tests {
DataType::List(Arc::new(Field::new("item", DataType::Int32, true))),
false
)]
#[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()), false)]
#[case::struct_type(DataType::Struct(vec![Field::new("field", DataType::Int32, true)].into()
), false)]
// Dictionary types - should be supported if value type is supported
#[case::dict_utf8(
DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
Expand Down
25 changes: 21 additions & 4 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use datafusion_common::Result as DFResult;
use datafusion_common::Statistics;
use datafusion_common::config::ConfigField;
use datafusion_common::config_namespace;
use datafusion_common::internal_datafusion_err;
use datafusion_common::not_impl_err;
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
use datafusion_common_runtime::SpawnedTask;
use datafusion_datasource::TableSchema;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::FileFormat;
Expand Down Expand Up @@ -349,10 +351,16 @@ impl FileFormat for VortexFormat {
.transpose()
});

let uncompressed_size = stats_set.get_as::<usize>(
Stat::UncompressedSizeInBytes,
&DType::Primitive(PType::U64, Nullability::Nullable),
);

ColumnStatistics {
null_count: null_count.to_df(),
max_value: max.to_df(),
min_value: min.to_df(),
byte_size: uncompressed_size.to_df(),
sum_value: Precision::Absent,
distinct_count: stats_set
.get_as::<bool>(
Expand Down Expand Up @@ -386,12 +394,20 @@ impl FileFormat for VortexFormat {
_state: &dyn Session,
file_scan_config: FileScanConfig,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let source = VortexSource::new(self.session.clone(), self.file_cache.clone());
let source = Arc::new(source);
let mut source = file_scan_config
.file_source()
.as_any()
.downcast_ref::<VortexSource>()
.cloned()
.ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;

// Make sure session and file caches are attached to the source
source.session = self.session.clone();
source.file_cache = self.file_cache.clone();

Ok(DataSourceExec::from_data_source(
FileScanConfigBuilder::from(file_scan_config)
.with_source(source)
.with_source(Arc::new(source))
.build(),
))
}
Expand All @@ -413,8 +429,9 @@ impl FileFormat for VortexFormat {
Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
}

fn file_source(&self) -> Arc<dyn FileSource> {
fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
Arc::new(VortexSource::new(
table_schema,
self.session.clone(),
self.file_cache.clone(),
))
Expand Down
Loading
Loading