diff --git a/datafusion/functions-nested/benches/arrays_zip.rs b/datafusion/functions-nested/benches/arrays_zip.rs index bc82b2978cc42..812e5e3dbec8a 100644 --- a/datafusion/functions-nested/benches/arrays_zip.rs +++ b/datafusion/functions-nested/benches/arrays_zip.rs @@ -109,7 +109,7 @@ fn bench_arrays_zip(c: &mut Criterion, name: &str, null_density: f64) { } fn criterion_benchmark(c: &mut Criterion) { - bench_arrays_zip(c, "arrays_zip_no_nulls_8192", 0.0); + bench_arrays_zip(c, "arrays_zip_perfect_zip_8192", 0.0); bench_arrays_zip(c, "arrays_zip_10pct_nulls_8192", 0.1); } diff --git a/datafusion/functions-nested/src/arrays_zip.rs b/datafusion/functions-nested/src/arrays_zip.rs index 5f1cb9dedf408..64900a196487b 100644 --- a/datafusion/functions-nested/src/arrays_zip.rs +++ b/datafusion/functions-nested/src/arrays_zip.rs @@ -165,6 +165,10 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let num_rows = args[0].len(); + if let Some(result) = try_perfect_list_zip(args)? { + return Ok(result); + } + // Build a type-erased ListColumnView for each argument. // None means the argument is Null-typed (all nulls, no backing data). let mut views: Vec> = Vec::with_capacity(args.len()); @@ -327,3 +331,181 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { Ok(Arc::new(result)) } + +/// Fast path for regular List inputs whose existing buffers already match the +/// zipped output: all offsets and values lengths match, and null rows cover no +/// values. This lets us reuse offsets and child values instead of rebuilding. +fn try_perfect_list_zip(args: &[ArrayRef]) -> Result> { + let mut list_arrays = Vec::with_capacity(args.len()); + let mut struct_fields = Vec::with_capacity(args.len()); + + for (i, arg) in args.iter().enumerate() { + let arr = match arg.data_type() { + List(field) => { + struct_fields.push(Field::new( + (i + 1).to_string(), + field.data_type().clone(), + true, + )); + as_list_array(arg)? + } + _ => return Ok(None), + }; + + list_arrays.push(arr); + } + + let first = list_arrays[0]; + let num_rows = first.len(); + let offsets = first.offsets().clone(); + let values_len = first.values().len(); + + for arr in &list_arrays { + if arr.len() != num_rows + || arr.values().len() != values_len + || arr.offsets() != &offsets + { + return Ok(None); + } + } + + let nulls = if list_arrays.iter().any(|arr| arr.null_count() != 0) { + let mut null_builder = NullBufferBuilder::new(num_rows); + for row_idx in 0..num_rows { + let mut all_null = true; + + for arr in &list_arrays { + if arr.is_null(row_idx) { + if arr.offsets()[row_idx + 1] != arr.offsets()[row_idx] { + return Ok(None); + } + } else { + all_null = false; + } + } + + if all_null { + null_builder.append_null(); + } else { + null_builder.append_non_null(); + } + } + + null_builder.finish() + } else { + None + }; + + let struct_columns = list_arrays + .iter() + .map(|arr| Arc::clone(arr.values())) + .collect::>(); + let struct_array = + StructArray::try_new(Fields::from(struct_fields), struct_columns, None)?; + let result = ListArray::try_new( + Arc::new(Field::new_list_field( + struct_array.data_type().clone(), + true, + )), + offsets, + Arc::new(struct_array), + nulls, + )?; + + Ok(Some(Arc::new(result))) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int64Array; + use arrow::buffer::NullBuffer; + + fn list(values: Vec, offsets: Vec) -> Arc { + list_with_validity(values, offsets, None) + } + + fn list_with_validity( + values: Vec, + offsets: Vec, + valid: Option>, + ) -> Arc { + Arc::new( + ListArray::try_new( + Arc::new(Field::new_list_field(DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(Int64Array::from(values)), + valid.map(NullBuffer::from), + ) + .unwrap(), + ) + } + + #[test] + fn perfect_zip_reuses_input_values_and_offsets() { + let left = list(vec![1, 2, 3, 4, 5, 6], vec![0, 2, 3, 6]); + let right = list(vec![10, 20, 30, 40, 50, 60], vec![0, 2, 3, 6]); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let values = result + .values() + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(result.offsets().ptr_eq(left.offsets())); + assert!(Arc::ptr_eq(values.column(0), left.values())); + assert!(Arc::ptr_eq(values.column(1), right.values())); + } + + #[test] + fn perfect_zip_reuses_zero_length_null_rows() { + let left = list_with_validity( + vec![1, 2, 3, 4], + vec![0, 2, 2, 4], + Some(vec![true, false, true]), + ); + let right = list_with_validity( + vec![10, 20, 30, 40], + vec![0, 2, 2, 4], + Some(vec![true, false, true]), + ); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert!(result.offsets().ptr_eq(left.offsets())); + assert!(result.is_null(1)); + } + + #[test] + fn null_row_with_hidden_values_uses_general_path() { + let left = + list_with_validity(vec![1, 2, 3, 4], vec![0, 2, 4], Some(vec![true, false])); + let right = list_with_validity( + vec![10, 20, 30, 40], + vec![0, 2, 4], + Some(vec![true, false]), + ); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert!(!result.offsets().ptr_eq(left.offsets())); + assert_eq!(result.value_offsets(), &[0, 2, 2]); + assert!(result.is_null(1)); + } +}