diff --git a/datafusion/functions-nested/benches/arrays_zip.rs b/datafusion/functions-nested/benches/arrays_zip.rs index bc82b2978cc42..812e5e3dbec8a 100644 --- a/datafusion/functions-nested/benches/arrays_zip.rs +++ b/datafusion/functions-nested/benches/arrays_zip.rs @@ -109,7 +109,7 @@ fn bench_arrays_zip(c: &mut Criterion, name: &str, null_density: f64) { } fn criterion_benchmark(c: &mut Criterion) { - bench_arrays_zip(c, "arrays_zip_no_nulls_8192", 0.0); + bench_arrays_zip(c, "arrays_zip_perfect_zip_8192", 0.0); bench_arrays_zip(c, "arrays_zip_10pct_nulls_8192", 0.1); } diff --git a/datafusion/functions-nested/src/arrays_zip.rs b/datafusion/functions-nested/src/arrays_zip.rs index 5f1cb9dedf408..76b1b589f42f5 100644 --- a/datafusion/functions-nested/src/arrays_zip.rs +++ b/datafusion/functions-nested/src/arrays_zip.rs @@ -22,7 +22,7 @@ use arrow::array::{ Array, ArrayRef, Capacities, ListArray, MutableArrayData, NullBufferBuilder, StructArray, new_null_array, }; -use arrow::buffer::OffsetBuffer; +use arrow::buffer::{NullBuffer, OffsetBuffer}; use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null}; use arrow::datatypes::{DataType, Field, Fields}; use datafusion_common::cast::{ @@ -44,7 +44,7 @@ struct ListColumnView { /// Pre-computed per-row start offsets (length = num_rows + 1). offsets: Vec, /// Null bitmap from the input array (None means no nulls). - nulls: Option, + nulls: Option, } impl ListColumnView { @@ -130,7 +130,7 @@ impl ScalarUDFImpl for ArraysZip { return exec_err!("arrays_zip expects array arguments, got {dt}"); } }; - fields.push(Field::new(format!("{}", i + 1), element_type, true)); + fields.push(Field::new(arrays_zip_field_name(i), element_type, true)); } Ok(List(Arc::new(Field::new_list_field( @@ -163,8 +163,13 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { return exec_err!("arrays_zip requires at least one argument"); } + let field_names = arrays_zip_field_names(args.len()); let num_rows = args[0].len(); + if let Some(result) = try_perfect_list_zip(args, &field_names)? { + return Ok(result); + } + // Build a type-erased ListColumnView for each argument. // None means the argument is Null-typed (all nulls, no backing data). let mut views: Vec> = Vec::with_capacity(args.len()); @@ -225,8 +230,8 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { let struct_fields: Fields = element_types .iter() - .enumerate() - .map(|(i, dt)| Field::new(format!("{}", i + 1), dt.clone(), true)) + .zip(field_names.iter()) + .map(|(dt, name)| Field::new(name.clone(), dt.clone(), true)) .collect::>() .into(); @@ -327,3 +332,282 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result { Ok(Arc::new(result)) } + +fn arrays_zip_field_name(index: usize) -> String { + (index + 1).to_string() +} + +fn arrays_zip_field_names(len: usize) -> Vec { + (0..len).map(arrays_zip_field_name).collect() +} + +/// Fast path for regular List inputs whose existing buffers already match the +/// zipped output: all offsets and values lengths match, and null rows cover no +/// values. This lets us reuse offsets and child values instead of rebuilding. +fn try_perfect_list_zip( + args: &[ArrayRef], + field_names: &[String], +) -> Result> { + debug_assert_eq!(args.len(), field_names.len()); + + let mut list_arrays = Vec::with_capacity(args.len()); + let mut struct_fields = Vec::with_capacity(args.len()); + + for (arg, field_name) in args.iter().zip(field_names) { + let arr = match arg.data_type() { + List(field) => { + struct_fields.push(Field::new( + field_name.clone(), + field.data_type().clone(), + true, + )); + as_list_array(arg)? + } + _ => return Ok(None), + }; + + list_arrays.push(arr); + } + + let first = list_arrays[0]; + let num_rows = first.len(); + let offsets = first.offsets().clone(); + let values_len = first.values().len(); + + // Reusing the child arrays is only valid when every list uses the exact + // same row boundaries and exposes the same total number of child values. + for arr in &list_arrays { + if arr.values().len() != values_len || arr.offsets() != &offsets { + return Ok(None); + } + } + + let nulls = if list_arrays.iter().any(|arr| arr.null_count() != 0) { + let first_nulls = first.nulls(); + if list_arrays.iter().all(|arr| arr.nulls() == first_nulls) { + first_nulls.cloned() + } else { + // Match the general path: arrays_zip only marks an output row null + // when every concrete input list is null. Mixed null and non-null + // empty lists still produce a non-null empty list, but mixed null + // rows with values must fall back to preserve field-level nulls. + let mut null_builder = NullBufferBuilder::new(num_rows); + for row_idx in 0..num_rows { + let mut all_null = true; + + for arr in &list_arrays { + if arr.is_null(row_idx) { + if arr.offsets()[row_idx + 1] != arr.offsets()[row_idx] { + return Ok(None); + } + } else { + all_null = false; + } + } + + if all_null { + null_builder.append_null(); + } else { + null_builder.append_non_null(); + } + } + + null_builder.finish() + } + } else { + None + }; + + let struct_columns = list_arrays + .iter() + .map(|arr| Arc::clone(arr.values())) + .collect::>(); + let struct_array = + StructArray::try_new(Fields::from(struct_fields), struct_columns, None)?; + let result = ListArray::try_new( + Arc::new(Field::new_list_field( + struct_array.data_type().clone(), + true, + )), + offsets, + Arc::new(struct_array), + nulls, + )?; + + Ok(Some(Arc::new(result))) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int64Array; + use arrow::buffer::NullBuffer; + + fn list(values: Vec, offsets: Vec) -> Arc { + list_with_validity(values, offsets, None) + } + + fn list_with_validity( + values: Vec, + offsets: Vec, + valid: Option>, + ) -> Arc { + Arc::new( + ListArray::try_new( + Arc::new(Field::new_list_field(DataType::Int64, true)), + OffsetBuffer::new(offsets.into()), + Arc::new(Int64Array::from(values)), + valid.map(NullBuffer::from), + ) + .unwrap(), + ) + } + + #[test] + fn perfect_zip_reuses_input_values_and_offsets() { + let left = list(vec![1, 2, 3, 4, 5, 6], vec![0, 2, 3, 6]); + let right = list(vec![10, 20, 30, 40, 50, 60], vec![0, 2, 3, 6]); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let values = result + .values() + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(result.offsets().ptr_eq(left.offsets())); + assert!(Arc::ptr_eq(values.column(0), left.values())); + assert!(Arc::ptr_eq(values.column(1), right.values())); + } + + #[test] + fn perfect_zip_uses_supplied_field_names() { + let left = list(vec![1, 2, 3], vec![0, 1, 3]); + let right = list(vec![10, 20, 30], vec![0, 1, 3]); + let field_names = vec!["left".to_string(), "right".to_string()]; + + let result = try_perfect_list_zip( + &[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ], + &field_names, + ) + .unwrap() + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let values = result + .values() + .as_any() + .downcast_ref::() + .unwrap(); + let names = values + .fields() + .iter() + .map(|field| field.name().as_str()) + .collect::>(); + + assert_eq!(names, vec!["left", "right"]); + } + + #[test] + fn perfect_zip_reuses_zero_length_null_rows() { + let left = list_with_validity( + vec![1, 2, 3, 4], + vec![0, 2, 2, 4], + Some(vec![true, false, true]), + ); + let right = list_with_validity( + vec![10, 20, 30, 40], + vec![0, 2, 2, 4], + Some(vec![true, false, true]), + ); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert!(result.offsets().ptr_eq(left.offsets())); + assert!(result.is_null(1)); + } + + #[test] + fn perfect_zip_preserves_mixed_null_empty_rows() { + let left = + list_with_validity(vec![], vec![0, 0, 0, 0], Some(vec![false, true, false])); + let right = + list_with_validity(vec![], vec![0, 0, 0, 0], Some(vec![true, false, false])); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert!(result.offsets().ptr_eq(left.offsets())); + assert!(!result.is_null(0)); + assert!(!result.is_null(1)); + assert!(result.is_null(2)); + } + + #[test] + fn perfect_zip_reuses_null_rows_with_hidden_values() { + let left = + list_with_validity(vec![1, 2, 3, 4], vec![0, 2, 4], Some(vec![true, false])); + let right = list_with_validity( + vec![10, 20, 30, 40], + vec![0, 2, 4], + Some(vec![true, false]), + ); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + + assert!(result.offsets().ptr_eq(left.offsets())); + assert_eq!(result.value_offsets(), &[0, 2, 4]); + assert!(result.is_null(1)); + } + + #[test] + fn mixed_null_row_with_hidden_values_uses_general_path() { + let left = + list_with_validity(vec![1, 2, 3, 4], vec![0, 2, 4], Some(vec![true, false])); + let right = list_with_validity( + vec![10, 20, 30, 40], + vec![0, 2, 4], + Some(vec![true, true]), + ); + + let result = arrays_zip_inner(&[ + Arc::clone(&left) as ArrayRef, + Arc::clone(&right) as ArrayRef, + ]) + .unwrap(); + let result = result.as_any().downcast_ref::().unwrap(); + let values = result + .values() + .as_any() + .downcast_ref::() + .unwrap(); + + assert!(!result.offsets().ptr_eq(left.offsets())); + assert_eq!(result.value_offsets(), &[0, 2, 4]); + assert!(values.column(0).is_null(2)); + assert!(values.column(0).is_null(3)); + assert!(!values.column(1).is_null(2)); + assert!(!values.column(1).is_null(3)); + } +}