Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 193 additions & 12 deletions datafusion/functions-nested/src/arrays_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ use std::sync::Arc;
struct ListColumnView {
/// The flat values array backing this list column.
values: ArrayRef,

/// Pre-computed per-row start offsets (length = num_rows + 1).
offsets: Vec<usize>,

/// Null bitmap from the input array (None means no nulls).
nulls: Option<arrow::buffer::NullBuffer>,
}
Expand All @@ -53,6 +55,110 @@ impl ListColumnView {
}
}

/// Returns true if all arrays are perfectly aligned and require
/// no null padding.
///
/// Conditions:
/// - all offsets identical
/// - null rows contain zero elements
fn is_perfect_zip(views: &[Option<ListColumnView>], num_rows: usize) -> bool {
let first = match views.iter().flatten().next() {
Some(v) => v,
None => return true,
};

// All offsets must match exactly.
for view in views.iter().flatten() {
if view.offsets != first.offsets {
return false;
}
}

// Null rows must not contain hidden values.
for view in views.iter().flatten() {
for row_idx in 0..num_rows {
if view.is_null(row_idx) {
let start = view.offsets[row_idx];
let end = view.offsets[row_idx + 1];

if start != end {
return false;
}
}
}
}

true
}

/// Fast-path optimization for perfectly aligned arrays.
///
/// Reuses:
/// - child value arrays
/// - offsets buffer
///
/// Avoids:
/// - MutableArrayData
/// - row-by-row copying
/// - null padding
fn try_fast_path(
views: &[Option<ListColumnView>],
element_types: &[DataType],
num_rows: usize,
) -> Result<Option<ArrayRef>> {
if !is_perfect_zip(views, num_rows) {
return Ok(None);
}

let first_view = match views.iter().flatten().next() {
Some(v) => v,
None => return Ok(None),
};

let struct_fields: Fields = element_types
.iter()
.enumerate()
.map(|(i, dt)| Field::new(format!("{}", i + 1), dt.clone(), true))
.collect::<Vec<_>>()
.into();

let total_values = *first_view.offsets.last().unwrap();

// Reuse original child arrays directly.
let struct_columns: Vec<ArrayRef> = views
.iter()
.zip(element_types.iter())
.map(|(view, elem_type)| match view {
Some(v) => Arc::clone(&v.values),
None => new_null_array(
if elem_type.is_null() {
&Null
} else {
elem_type
},
total_values,
),
})
.collect();

let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?;

// Reuse offsets directly.
let offsets: Vec<i32> = first_view.offsets.iter().map(|&o| o as i32).collect();

let result = ListArray::try_new(
Arc::new(Field::new_list_field(
struct_array.data_type().clone(),
true,
)),
OffsetBuffer::new(offsets.into()),
Arc::new(struct_array),
first_view.nulls.clone(),
)?;

Ok(Some(Arc::new(result)))
}

make_udf_expr_and_func!(
ArraysZip,
arrays_zip,
Expand All @@ -71,11 +177,12 @@ make_udf_expr_and_func!(
+---------------------------------------------------+
| [{1: 1}, {1: 2}, {1: 3}] |
+---------------------------------------------------+

> select arrays_zip([1, 2], [3, 4, 5]);
+---------------------------------------------------+
| arrays_zip([1, 2], [3, 4, 5]) |
+---------------------------------------------------+
| [{1: 1, 2: 3}, {1: 2, 2: 4}, {1: NULL, 2: 5}] |
| [{1: 1, 2: 3}, {1: 2, 2: 4}, {1: NULL, 2: 5}] |
+---------------------------------------------------+
```"#,
argument(name = "array1", description = "First array expression."),
Expand Down Expand Up @@ -120,16 +227,20 @@ impl ScalarUDFImpl for ArraysZip {
}

let mut fields = Vec::with_capacity(arg_types.len());

for (i, arg_type) in arg_types.iter().enumerate() {
let element_type = match arg_type {
List(field) | LargeList(field) | FixedSizeList(field, _) => {
field.data_type().clone()
}

Null => Null,

dt => {
return exec_err!("arrays_zip expects array arguments, got {dt}");
}
};

fields.push(Field::new(format!("{}", i + 1), element_type, true));
}

Expand Down Expand Up @@ -157,67 +268,94 @@ impl ScalarUDFImpl for ArraysZip {
/// Takes N list arrays and produces a list of structs where each struct
/// has one field per input array. If arrays within a row have different
/// lengths, shorter arrays are padded with NULLs.
/// Supports List, LargeList, and Null input types.
///
/// Supports:
/// - List
/// - LargeList
/// - FixedSizeList
/// - Null
fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
if args.is_empty() {
return exec_err!("arrays_zip requires at least one argument");
}

let num_rows = args[0].len();

// Build a type-erased ListColumnView for each argument.
// None means the argument is Null-typed (all nulls, no backing data).
// Build type-erased views.
let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());

let mut element_types: Vec<DataType> = Vec::with_capacity(args.len());

for (i, arg) in args.iter().enumerate() {
match arg.data_type() {
List(field) => {
let arr = as_list_array(arg)?;

let raw_offsets = arr.value_offsets();

let offsets: Vec<usize> =
raw_offsets.iter().map(|&o| o as usize).collect();

element_types.push(field.data_type().clone());

views.push(Some(ListColumnView {
values: Arc::clone(arr.values()),
offsets,
nulls: arr.nulls().cloned(),
}));
}

LargeList(field) => {
let arr = as_large_list_array(arg)?;

let raw_offsets = arr.value_offsets();

let offsets: Vec<usize> =
raw_offsets.iter().map(|&o| o as usize).collect();

element_types.push(field.data_type().clone());

views.push(Some(ListColumnView {
values: Arc::clone(arr.values()),
offsets,
nulls: arr.nulls().cloned(),
}));
}

FixedSizeList(field, size) => {
let arr = as_fixed_size_list_array(arg)?;

let size = *size as usize;

let offsets: Vec<usize> = (0..=num_rows).map(|row| row * size).collect();

element_types.push(field.data_type().clone());

views.push(Some(ListColumnView {
values: Arc::clone(arr.values()),
offsets,
nulls: arr.nulls().cloned(),
}));
}

Null => {
element_types.push(Null);
views.push(None);
}

dt => {
return exec_err!("arrays_zip argument {i} expected list type, got {dt}");
}
}
}

// Collect per-column values data for MutableArrayData builders.
// FAST PATH
if let Some(result) = try_fast_path(&views, &element_types, num_rows)? {
return Ok(result);
}

// Existing slow path below.

let values_data: Vec<_> = views
.iter()
.map(|v| v.as_ref().map(|view| view.values.to_data()))
Expand All @@ -230,8 +368,6 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
.collect::<Vec<_>>()
.into();

// Create a MutableArrayData builder per column. For None (Null-typed)
// args we only need extend_nulls, so we track them separately.
let mut builders: Vec<Option<MutableArrayData>> = values_data
.iter()
.map(|vd| {
Expand All @@ -242,20 +378,23 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
.collect();

let mut offsets: Vec<i32> = Vec::with_capacity(num_rows + 1);

offsets.push(0);

let mut null_builder = NullBufferBuilder::new(num_rows);

let mut total_values: usize = 0;

// Process each row: compute per-array lengths, then copy values
// and pad shorter arrays with NULLs.
for row_idx in 0..num_rows {
let mut max_len: usize = 0;
let mut all_null = true;

for view in views.iter().flatten() {
if !view.is_null(row_idx) {
all_null = false;

let len = view.offsets[row_idx + 1] - view.offsets[row_idx];

max_len = max_len.max(len);
}
}
Expand All @@ -265,23 +404,28 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
offsets.push(*offsets.last().unwrap());
continue;
}

null_builder.append_non_null();

// Extend each column builder for this row.
for (col_idx, view) in views.iter().enumerate() {
match view {
Some(v) if !v.is_null(row_idx) => {
let start = v.offsets[row_idx];

let end = v.offsets[row_idx + 1];

let len = end - start;

let builder = builders[col_idx].as_mut().unwrap();

builder.extend(0, start, end);

if len < max_len {
builder.extend_nulls(max_len - len);
}
}

_ => {
// Null list entry or None (Null-typed) arg — all nulls.
if let Some(builder) = builders[col_idx].as_mut() {
builder.extend_nulls(max_len);
}
Expand All @@ -290,16 +434,18 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
}

total_values += max_len;

let last = *offsets.last().unwrap();

offsets.push(last + max_len as i32);
}

// Assemble struct columns from builders.
let struct_columns: Vec<ArrayRef> = builders
.into_iter()
.zip(element_types.iter())
.map(|(builder, elem_type)| match builder {
Some(b) => arrow::array::make_array(b.freeze()),

None => new_null_array(
if elem_type.is_null() {
&Null
Expand Down Expand Up @@ -327,3 +473,38 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {

Ok(Arc::new(result))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{Int32Array, ListArray};
use arrow::buffer::OffsetBuffer;

#[test]
fn test_arrays_zip_fast_path_perfect_alignment() -> Result<()> {
let values1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef;

let values2 = Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef;

let offsets = OffsetBuffer::new(vec![0, 2, 3].into());

let list1 = ListArray::try_new(
Arc::new(Field::new_list_field(DataType::Int32, true)),
offsets.clone(),
values1,
None,
)?;

let list2 = ListArray::try_new(
Arc::new(Field::new_list_field(DataType::Int32, true)),
offsets,
values2,
None,
)?;

let result = arrays_zip_inner(&[Arc::new(list1), Arc::new(list2)])?;

assert_eq!(result.len(), 2);

Ok(())
}
}
Loading