From 6574e17622ce39c9c79eebe3691b8bb5f4ba31e6 Mon Sep 17 00:00:00 2001 From: Bipasha Date: Sat, 16 May 2026 15:15:14 +0530 Subject: [PATCH 1/2] Optimize arrays_zip for perfectly aligned arrays --- datafusion/functions-nested/src/arrays_zip.rs | 210 +++++++++++++++++- 1 file changed, 198 insertions(+), 12 deletions(-) diff --git a/datafusion/functions-nested/src/arrays_zip.rs b/datafusion/functions-nested/src/arrays_zip.rs index 5f1cb9dedf408..2b00c1cf521ac 100644 --- a/datafusion/functions-nested/src/arrays_zip.rs +++ b/datafusion/functions-nested/src/arrays_zip.rs @@ -41,8 +41,10 @@ use std::sync::Arc; struct ListColumnView { /// The flat values array backing this list column. values: ArrayRef, + /// Pre-computed per-row start offsets (length = num_rows + 1). offsets: Vec, + /// Null bitmap from the input array (None means no nulls). nulls: Option, } @@ -53,6 +55,110 @@ impl ListColumnView { } } +/// Returns true if all arrays are perfectly aligned and require +/// no null padding. +/// +/// Conditions: +/// - all offsets identical +/// - null rows contain zero elements +fn is_perfect_zip(views: &[Option], num_rows: usize) -> bool { + let first = match views.iter().flatten().next() { + Some(v) => v, + None => return true, + }; + + // All offsets must match exactly. + for view in views.iter().flatten() { + if view.offsets != first.offsets { + return false; + } + } + + // Null rows must not contain hidden values. + for view in views.iter().flatten() { + for row_idx in 0..num_rows { + if view.is_null(row_idx) { + let start = view.offsets[row_idx]; + let end = view.offsets[row_idx + 1]; + + if start != end { + return false; + } + } + } + } + + true +} + +/// Fast-path optimization for perfectly aligned arrays. +/// +/// Reuses: +/// - child value arrays +/// - offsets buffer +/// +/// Avoids: +/// - MutableArrayData +/// - row-by-row copying +/// - null padding +fn try_fast_path( + views: &[Option], + element_types: &[DataType], + num_rows: usize, +) -> Result> { + if !is_perfect_zip(views, num_rows) { + return Ok(None); + } + + let first_view = match views.iter().flatten().next() { + Some(v) => v, + None => return Ok(None), + }; + + let struct_fields: Fields = element_types + .iter() + .enumerate() + .map(|(i, dt)| Field::new(format!("{}", i + 1), dt.clone(), true)) + .collect::>() + .into(); + + let total_values = *first_view.offsets.last().unwrap(); + + // Reuse original child arrays directly. + let struct_columns: Vec = views + .iter() + .zip(element_types.iter()) + .map(|(view, elem_type)| match view { + Some(v) => Arc::clone(&v.values), + None => new_null_array( + if elem_type.is_null() { + &Null + } else { + elem_type + }, + total_values, + ), + }) + .collect(); + + let struct_array = StructArray::try_new(struct_fields, struct_columns, None)?; + + // Reuse offsets directly. + let offsets: Vec = first_view.offsets.iter().map(|&o| o as i32).collect(); + + let result = ListArray::try_new( + Arc::new(Field::new_list_field( + struct_array.data_type().clone(), + true, + )), + OffsetBuffer::new(offsets.into()), + Arc::new(struct_array), + first_view.nulls.clone(), + )?; + + Ok(Some(Arc::new(result))) +} + make_udf_expr_and_func!( ArraysZip, arrays_zip, @@ -71,11 +177,12 @@ make_udf_expr_and_func!( +---------------------------------------------------+ | [{1: 1}, {1: 2}, {1: 3}] | +---------------------------------------------------+ + > select arrays_zip([1, 2], [3, 4, 5]); +---------------------------------------------------+ | arrays_zip([1, 2], [3, 4, 5]) | +---------------------------------------------------+ -| [{1: 1, 2: 3}, {1: 2, 2: 4}, {1: NULL, 2: 5}] | +| [{1: 1, 2: 3}, {1: 2, 2: 4}, {1: NULL, 2: 5}] | +---------------------------------------------------+ ```"#, argument(name = "array1", description = "First array expression."), @@ -120,16 +227,20 @@ impl ScalarUDFImpl for ArraysZip { } let mut fields = Vec::with_capacity(arg_types.len()); + for (i, arg_type) in arg_types.iter().enumerate() { let element_type = match arg_type { List(field) | LargeList(field) | FixedSizeList(field, _) => { field.data_type().clone() } + Null => Null, + dt => { return exec_err!("arrays_zip expects array arguments, got {dt}"); } }; + fields.push(Field::new(format!("{}", i + 1), element_type, true)); } @@ -157,7 +268,12 @@ impl ScalarUDFImpl for ArraysZip { /// Takes N list arrays and produces a list of structs where each struct /// has one field per input array. If arrays within a row have different /// lengths, shorter arrays are padded with NULLs. -/// Supports List, LargeList, and Null input types. +/// +/// Supports: +/// - List +/// - LargeList +/// - FixedSizeList +/// - Null fn arrays_zip_inner(args: &[ArrayRef]) -> Result { if args.is_empty() { return exec_err!("arrays_zip requires at least one argument"); @@ -165,59 +281,81 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let num_rows = args[0].len(); - // Build a type-erased ListColumnView for each argument. - // None means the argument is Null-typed (all nulls, no backing data). + // Build type-erased views. let mut views: Vec> = Vec::with_capacity(args.len()); + let mut element_types: Vec = Vec::with_capacity(args.len()); for (i, arg) in args.iter().enumerate() { match arg.data_type() { List(field) => { let arr = as_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = raw_offsets.iter().map(|&o| o as usize).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { values: Arc::clone(arr.values()), offsets, nulls: arr.nulls().cloned(), })); } + LargeList(field) => { let arr = as_large_list_array(arg)?; + let raw_offsets = arr.value_offsets(); + let offsets: Vec = raw_offsets.iter().map(|&o| o as usize).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { values: Arc::clone(arr.values()), offsets, nulls: arr.nulls().cloned(), })); } + FixedSizeList(field, size) => { let arr = as_fixed_size_list_array(arg)?; + let size = *size as usize; + let offsets: Vec = (0..=num_rows).map(|row| row * size).collect(); + element_types.push(field.data_type().clone()); + views.push(Some(ListColumnView { values: Arc::clone(arr.values()), offsets, nulls: arr.nulls().cloned(), })); } + Null => { element_types.push(Null); views.push(None); } + dt => { return exec_err!("arrays_zip argument {i} expected list type, got {dt}"); } } } - // Collect per-column values data for MutableArrayData builders. + // FAST PATH + if let Some(result) = try_fast_path(&views, &element_types, num_rows)? { + return Ok(result); + } + + // Existing slow path below. + let values_data: Vec<_> = views .iter() .map(|v| v.as_ref().map(|view| view.values.to_data())) @@ -230,8 +368,6 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { .collect::>() .into(); - // Create a MutableArrayData builder per column. For None (Null-typed) - // args we only need extend_nulls, so we track them separately. let mut builders: Vec> = values_data .iter() .map(|vd| { @@ -242,12 +378,13 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { .collect(); let mut offsets: Vec = Vec::with_capacity(num_rows + 1); + offsets.push(0); + let mut null_builder = NullBufferBuilder::new(num_rows); + let mut total_values: usize = 0; - // Process each row: compute per-array lengths, then copy values - // and pad shorter arrays with NULLs. for row_idx in 0..num_rows { let mut max_len: usize = 0; let mut all_null = true; @@ -255,7 +392,9 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { for view in views.iter().flatten() { if !view.is_null(row_idx) { all_null = false; + let len = view.offsets[row_idx + 1] - view.offsets[row_idx]; + max_len = max_len.max(len); } } @@ -265,23 +404,28 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { offsets.push(*offsets.last().unwrap()); continue; } + null_builder.append_non_null(); - // Extend each column builder for this row. for (col_idx, view) in views.iter().enumerate() { match view { Some(v) if !v.is_null(row_idx) => { let start = v.offsets[row_idx]; + let end = v.offsets[row_idx + 1]; + let len = end - start; + let builder = builders[col_idx].as_mut().unwrap(); + builder.extend(0, start, end); + if len < max_len { builder.extend_nulls(max_len - len); } } + _ => { - // Null list entry or None (Null-typed) arg — all nulls. if let Some(builder) = builders[col_idx].as_mut() { builder.extend_nulls(max_len); } @@ -290,16 +434,18 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { } total_values += max_len; + let last = *offsets.last().unwrap(); + offsets.push(last + max_len as i32); } - // Assemble struct columns from builders. let struct_columns: Vec = builders .into_iter() .zip(element_types.iter()) .map(|(builder, elem_type)| match builder { Some(b) => arrow::array::make_array(b.freeze()), + None => new_null_array( if elem_type.is_null() { &Null @@ -327,3 +473,43 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { Ok(Arc::new(result)) } +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{Int32Array, ListArray}; + use arrow::buffer::OffsetBuffer; + + #[test] + fn test_arrays_zip_fast_path_perfect_alignment() -> Result<()> { + let values1 = + Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + + let values2 = + Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef; + + let offsets = OffsetBuffer::new(vec![0, 2, 3].into()); + + let list1 = ListArray::try_new( + Arc::new(Field::new_list_field(DataType::Int32, true)), + offsets.clone(), + values1, + None, + )?; + + let list2 = ListArray::try_new( + Arc::new(Field::new_list_field(DataType::Int32, true)), + offsets, + values2, + None, + )?; + + let result = arrays_zip_inner(&[ + Arc::new(list1), + Arc::new(list2), + ])?; + + assert_eq!(result.len(), 2); + + Ok(()) + } +} \ No newline at end of file From 9e63ba262601d0c7ab0b8a6980939dd6c8df7663 Mon Sep 17 00:00:00 2001 From: Bipasha Date: Sat, 16 May 2026 18:20:51 +0530 Subject: [PATCH 2/2] Fix formatting --- datafusion/functions-nested/src/arrays_zip.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/datafusion/functions-nested/src/arrays_zip.rs b/datafusion/functions-nested/src/arrays_zip.rs index 2b00c1cf521ac..cf0b6e2c1282e 100644 --- a/datafusion/functions-nested/src/arrays_zip.rs +++ b/datafusion/functions-nested/src/arrays_zip.rs @@ -481,11 +481,9 @@ mod tests { #[test] fn test_arrays_zip_fast_path_perfect_alignment() -> Result<()> { - let values1 = - Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let values1 = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; - let values2 = - Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef; + let values2 = Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef; let offsets = OffsetBuffer::new(vec![0, 2, 3].into()); @@ -503,13 +501,10 @@ mod tests { None, )?; - let result = arrays_zip_inner(&[ - Arc::new(list1), - Arc::new(list2), - ])?; + let result = arrays_zip_inner(&[Arc::new(list1), Arc::new(list2)])?; assert_eq!(result.len(), 2); Ok(()) } -} \ No newline at end of file +}