Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/functions-nested/benches/arrays_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn bench_arrays_zip(c: &mut Criterion, name: &str, null_density: f64) {
}

fn criterion_benchmark(c: &mut Criterion) {
bench_arrays_zip(c, "arrays_zip_no_nulls_8192", 0.0);
bench_arrays_zip(c, "arrays_zip_perfect_zip_8192", 0.0);
bench_arrays_zip(c, "arrays_zip_10pct_nulls_8192", 0.1);
}

Expand Down
182 changes: 182 additions & 0 deletions datafusion/functions-nested/src/arrays_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {

let num_rows = args[0].len();

if let Some(result) = try_perfect_list_zip(args)? {
return Ok(result);
}

// Build a type-erased ListColumnView for each argument.
// None means the argument is Null-typed (all nulls, no backing data).
let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());
Expand Down Expand Up @@ -327,3 +331,181 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {

Ok(Arc::new(result))
}

/// Fast path for regular List inputs whose existing buffers already match the
/// zipped output: all offsets and values lengths match, and null rows cover no
/// values. This lets us reuse offsets and child values instead of rebuilding.
fn try_perfect_list_zip(args: &[ArrayRef]) -> Result<Option<ArrayRef>> {
let mut list_arrays = Vec::with_capacity(args.len());
let mut struct_fields = Vec::with_capacity(args.len());

for (i, arg) in args.iter().enumerate() {
let arr = match arg.data_type() {
List(field) => {
struct_fields.push(Field::new(
(i + 1).to_string(),
field.data_type().clone(),
true,
));
as_list_array(arg)?
}
_ => return Ok(None),
};
Comment on lines +338 to +353

list_arrays.push(arr);
}

let first = list_arrays[0];
let num_rows = first.len();
let offsets = first.offsets().clone();
let values_len = first.values().len();

for arr in &list_arrays {
if arr.len() != num_rows
|| arr.values().len() != values_len
|| arr.offsets() != &offsets
{
return Ok(None);
}
}

let nulls = if list_arrays.iter().any(|arr| arr.null_count() != 0) {
let mut null_builder = NullBufferBuilder::new(num_rows);
for row_idx in 0..num_rows {
let mut all_null = true;

for arr in &list_arrays {
if arr.is_null(row_idx) {
if arr.offsets()[row_idx + 1] != arr.offsets()[row_idx] {
return Ok(None);
}
} else {
all_null = false;
}
}

if all_null {
null_builder.append_null();
} else {
null_builder.append_non_null();
}
}

null_builder.finish()
} else {
None
};

let struct_columns = list_arrays
.iter()
.map(|arr| Arc::clone(arr.values()))
.collect::<Vec<_>>();
let struct_array =
StructArray::try_new(Fields::from(struct_fields), struct_columns, None)?;
let result = ListArray::try_new(
Arc::new(Field::new_list_field(
struct_array.data_type().clone(),
true,
)),
offsets,
Arc::new(struct_array),
nulls,
)?;

Ok(Some(Arc::new(result)))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int64Array;
use arrow::buffer::NullBuffer;

fn list(values: Vec<i64>, offsets: Vec<i32>) -> Arc<ListArray> {
list_with_validity(values, offsets, None)
}

fn list_with_validity(
values: Vec<i64>,
offsets: Vec<i32>,
valid: Option<Vec<bool>>,
) -> Arc<ListArray> {
Arc::new(
ListArray::try_new(
Arc::new(Field::new_list_field(DataType::Int64, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(Int64Array::from(values)),
valid.map(NullBuffer::from),
)
.unwrap(),
)
}

#[test]
fn perfect_zip_reuses_input_values_and_offsets() {
let left = list(vec![1, 2, 3, 4, 5, 6], vec![0, 2, 3, 6]);
let right = list(vec![10, 20, 30, 40, 50, 60], vec![0, 2, 3, 6]);

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();
let values = result
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();

assert!(result.offsets().ptr_eq(left.offsets()));
assert!(Arc::ptr_eq(values.column(0), left.values()));
assert!(Arc::ptr_eq(values.column(1), right.values()));
}

#[test]
fn perfect_zip_reuses_zero_length_null_rows() {
let left = list_with_validity(
vec![1, 2, 3, 4],
vec![0, 2, 2, 4],
Some(vec![true, false, true]),
);
let right = list_with_validity(
vec![10, 20, 30, 40],
vec![0, 2, 2, 4],
Some(vec![true, false, true]),
);

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();

assert!(result.offsets().ptr_eq(left.offsets()));
assert!(result.is_null(1));
}

#[test]
fn null_row_with_hidden_values_uses_general_path() {
let left =
list_with_validity(vec![1, 2, 3, 4], vec![0, 2, 4], Some(vec![true, false]));
let right = list_with_validity(
vec![10, 20, 30, 40],
vec![0, 2, 4],
Some(vec![true, false]),
);

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();

assert!(!result.offsets().ptr_eq(left.offsets()));
assert_eq!(result.value_offsets(), &[0, 2, 2]);
assert!(result.is_null(1));
}
}