Skip to content
Open
2 changes: 1 addition & 1 deletion datafusion/functions-nested/benches/arrays_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ fn bench_arrays_zip(c: &mut Criterion, name: &str, null_density: f64) {
}

fn criterion_benchmark(c: &mut Criterion) {
bench_arrays_zip(c, "arrays_zip_no_nulls_8192", 0.0);
bench_arrays_zip(c, "arrays_zip_perfect_zip_8192", 0.0);
bench_arrays_zip(c, "arrays_zip_10pct_nulls_8192", 0.1);
}

Expand Down
294 changes: 289 additions & 5 deletions datafusion/functions-nested/src/arrays_zip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow::array::{
Array, ArrayRef, Capacities, ListArray, MutableArrayData, NullBufferBuilder,
StructArray, new_null_array,
};
use arrow::buffer::OffsetBuffer;
use arrow::buffer::{NullBuffer, OffsetBuffer};
use arrow::datatypes::DataType::{FixedSizeList, LargeList, List, Null};
use arrow::datatypes::{DataType, Field, Fields};
use datafusion_common::cast::{
Expand All @@ -44,7 +44,7 @@ struct ListColumnView {
/// Pre-computed per-row start offsets (length = num_rows + 1).
offsets: Vec<usize>,
/// Null bitmap from the input array (None means no nulls).
nulls: Option<arrow::buffer::NullBuffer>,
nulls: Option<NullBuffer>,
}

impl ListColumnView {
Expand Down Expand Up @@ -130,7 +130,7 @@ impl ScalarUDFImpl for ArraysZip {
return exec_err!("arrays_zip expects array arguments, got {dt}");
}
};
fields.push(Field::new(format!("{}", i + 1), element_type, true));
fields.push(Field::new(arrays_zip_field_name(i), element_type, true));
}

Ok(List(Arc::new(Field::new_list_field(
Expand Down Expand Up @@ -163,8 +163,13 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
return exec_err!("arrays_zip requires at least one argument");
}

let field_names = arrays_zip_field_names(args.len());
let num_rows = args[0].len();

if let Some(result) = try_perfect_list_zip(args, &field_names)? {
return Ok(result);
}

// Build a type-erased ListColumnView for each argument.
// None means the argument is Null-typed (all nulls, no backing data).
let mut views: Vec<Option<ListColumnView>> = Vec::with_capacity(args.len());
Expand Down Expand Up @@ -225,8 +230,8 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {

let struct_fields: Fields = element_types
.iter()
.enumerate()
.map(|(i, dt)| Field::new(format!("{}", i + 1), dt.clone(), true))
.zip(field_names.iter())
.map(|(dt, name)| Field::new(name.clone(), dt.clone(), true))
.collect::<Vec<_>>()
.into();

Expand Down Expand Up @@ -327,3 +332,282 @@ fn arrays_zip_inner(args: &[ArrayRef]) -> Result<ArrayRef> {

Ok(Arc::new(result))
}

fn arrays_zip_field_name(index: usize) -> String {
(index + 1).to_string()
}

fn arrays_zip_field_names(len: usize) -> Vec<String> {
(0..len).map(arrays_zip_field_name).collect()
}

/// Fast path for regular List inputs whose existing buffers already match the
/// zipped output: all offsets and values lengths match, and null rows cover no
/// values. This lets us reuse offsets and child values instead of rebuilding.
fn try_perfect_list_zip(
args: &[ArrayRef],
field_names: &[String],
) -> Result<Option<ArrayRef>> {
debug_assert_eq!(args.len(), field_names.len());

let mut list_arrays = Vec::with_capacity(args.len());
let mut struct_fields = Vec::with_capacity(args.len());

for (arg, field_name) in args.iter().zip(field_names) {
let arr = match arg.data_type() {
List(field) => {
struct_fields.push(Field::new(
field_name.clone(),
field.data_type().clone(),
true,
));
as_list_array(arg)?
}
_ => return Ok(None),
};

list_arrays.push(arr);
}

let first = list_arrays[0];
let num_rows = first.len();
let offsets = first.offsets().clone();
let values_len = first.values().len();

// Reusing the child arrays is only valid when every list uses the exact
// same row boundaries and exposes the same total number of child values.
for arr in &list_arrays {
if arr.values().len() != values_len || arr.offsets() != &offsets {
return Ok(None);
}
}

let nulls = if list_arrays.iter().any(|arr| arr.null_count() != 0) {
Comment thread
puneetdixit200 marked this conversation as resolved.
let first_nulls = first.nulls();
if list_arrays.iter().all(|arr| arr.nulls() == first_nulls) {
first_nulls.cloned()
} else {
// Match the general path: arrays_zip only marks an output row null
// when every concrete input list is null. Mixed null and non-null
// empty lists still produce a non-null empty list, but mixed null
// rows with values must fall back to preserve field-level nulls.
let mut null_builder = NullBufferBuilder::new(num_rows);
for row_idx in 0..num_rows {
let mut all_null = true;

for arr in &list_arrays {
if arr.is_null(row_idx) {
if arr.offsets()[row_idx + 1] != arr.offsets()[row_idx] {
return Ok(None);
}
} else {
all_null = false;
}
}

if all_null {
null_builder.append_null();
} else {
null_builder.append_non_null();
}
}

null_builder.finish()
}
} else {
None
};

let struct_columns = list_arrays
.iter()
.map(|arr| Arc::clone(arr.values()))
.collect::<Vec<_>>();
let struct_array =
StructArray::try_new(Fields::from(struct_fields), struct_columns, None)?;
let result = ListArray::try_new(
Arc::new(Field::new_list_field(
struct_array.data_type().clone(),
true,
)),
offsets,
Arc::new(struct_array),
nulls,
)?;

Ok(Some(Arc::new(result)))
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int64Array;
use arrow::buffer::NullBuffer;

fn list(values: Vec<i64>, offsets: Vec<i32>) -> Arc<ListArray> {
list_with_validity(values, offsets, None)
}

fn list_with_validity(
values: Vec<i64>,
offsets: Vec<i32>,
valid: Option<Vec<bool>>,
) -> Arc<ListArray> {
Arc::new(
ListArray::try_new(
Arc::new(Field::new_list_field(DataType::Int64, true)),
OffsetBuffer::new(offsets.into()),
Arc::new(Int64Array::from(values)),
valid.map(NullBuffer::from),
)
.unwrap(),
)
}

#[test]
fn perfect_zip_reuses_input_values_and_offsets() {
let left = list(vec![1, 2, 3, 4, 5, 6], vec![0, 2, 3, 6]);
let right = list(vec![10, 20, 30, 40, 50, 60], vec![0, 2, 3, 6]);

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();
let values = result
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();

assert!(result.offsets().ptr_eq(left.offsets()));
assert!(Arc::ptr_eq(values.column(0), left.values()));
assert!(Arc::ptr_eq(values.column(1), right.values()));
}

#[test]
fn perfect_zip_uses_supplied_field_names() {
let left = list(vec![1, 2, 3], vec![0, 1, 3]);
let right = list(vec![10, 20, 30], vec![0, 1, 3]);
let field_names = vec!["left".to_string(), "right".to_string()];

let result = try_perfect_list_zip(
&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
],
&field_names,
)
.unwrap()
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();
let values = result
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
let names = values
.fields()
.iter()
.map(|field| field.name().as_str())
.collect::<Vec<_>>();

assert_eq!(names, vec!["left", "right"]);
}

#[test]
fn perfect_zip_reuses_zero_length_null_rows() {
let left = list_with_validity(
vec![1, 2, 3, 4],
vec![0, 2, 2, 4],
Some(vec![true, false, true]),
);
let right = list_with_validity(
vec![10, 20, 30, 40],
vec![0, 2, 2, 4],
Some(vec![true, false, true]),
);

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();

assert!(result.offsets().ptr_eq(left.offsets()));
assert!(result.is_null(1));
}

#[test]
fn perfect_zip_preserves_mixed_null_empty_rows() {
let left =
list_with_validity(vec![], vec![0, 0, 0, 0], Some(vec![false, true, false]));
let right =
list_with_validity(vec![], vec![0, 0, 0, 0], Some(vec![true, false, false]));

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();

assert!(result.offsets().ptr_eq(left.offsets()));
assert!(!result.is_null(0));
assert!(!result.is_null(1));
assert!(result.is_null(2));
}

#[test]
fn perfect_zip_reuses_null_rows_with_hidden_values() {
let left =
list_with_validity(vec![1, 2, 3, 4], vec![0, 2, 4], Some(vec![true, false]));
let right = list_with_validity(
vec![10, 20, 30, 40],
vec![0, 2, 4],
Some(vec![true, false]),
);

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();

assert!(result.offsets().ptr_eq(left.offsets()));
assert_eq!(result.value_offsets(), &[0, 2, 4]);
assert!(result.is_null(1));
}

#[test]
fn mixed_null_row_with_hidden_values_uses_general_path() {
let left =
list_with_validity(vec![1, 2, 3, 4], vec![0, 2, 4], Some(vec![true, false]));
let right = list_with_validity(
vec![10, 20, 30, 40],
vec![0, 2, 4],
Some(vec![true, true]),
);

let result = arrays_zip_inner(&[
Arc::clone(&left) as ArrayRef,
Arc::clone(&right) as ArrayRef,
])
.unwrap();
let result = result.as_any().downcast_ref::<ListArray>().unwrap();
let values = result
.values()
.as_any()
.downcast_ref::<StructArray>()
.unwrap();

assert!(!result.offsets().ptr_eq(left.offsets()));
assert_eq!(result.value_offsets(), &[0, 2, 4]);
assert!(values.column(0).is_null(2));
assert!(values.column(0).is_null(3));
assert!(!values.column(1).is_null(2));
assert!(!values.column(1).is_null(3));
}
}
Loading