Skip to content

Commit

Permalink
fix: properly write nest-nulled values in Parquet
Browse files Browse the repository at this point in the history
Fixes #17805.

This fixes an issue on the Parquet writer where values that would be valid in
the primitive array but invalid at a higher nesting level would still be
written. This could for example be true when do `x = (x == "") ? [ x ] : None`.
In this case, the empty string might still be valid but the above list is not
valid anymore.

This is solved by walking through the structure and propagating the nulls to
the lower levels in the parquet writer.
  • Loading branch information
coastalwhite committed Jul 24, 2024
1 parent 954538e commit 57ae3b3
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 41 deletions.
181 changes: 140 additions & 41 deletions crates/polars-parquet/src/arrow/write/pages.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::VecDeque;
use std::fmt::Debug;

use arrow::array::{Array, FixedSizeListArray, ListArray, MapArray, StructArray};
use arrow::bitmap::Bitmap;
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::datatypes::PhysicalType;
use arrow::offset::{Offset, OffsetsBuffer};
use polars_error::{polars_bail, PolarsResult};
Expand Down Expand Up @@ -268,42 +269,133 @@ fn to_nested_recursive(
Ok(())
}

/// Convert [`Array`] to `Vec<&dyn Array>` leaves in DFS order.
pub fn to_leaves(array: &dyn Array) -> Vec<&dyn Array> {
let mut leaves = vec![];
to_leaves_recursive(array, &mut leaves);
leaves
fn expand_list_validity<'a, O: Offset>(
array: &'a ListArray<O>,
validity: Option<Bitmap>,
arrays: &mut VecDeque<(&'a dyn Array, Option<Bitmap>)>,
) {
let Some(list_validity) = validity else {
arrays.push_back((array.values().as_ref(), validity));
return;
};

let offsets = array.offsets().buffer();
let mut validity = MutableBitmap::with_capacity(array.values().len());
let mut list_validity_iter = list_validity.iter();

let mut idx = 0;
while list_validity_iter.num_remaining() > 0 {
let num_ones = list_validity_iter.take_leading_ones();
let num_elements = offsets[idx + num_ones] - offsets[idx];
validity.extend_constant(num_elements.to_usize(), true);

idx += num_ones;

let num_zeros = list_validity_iter.take_leading_zeros();
let num_elements = offsets[idx + num_zeros] - offsets[idx];
validity.extend_constant(num_elements.to_usize(), false);

idx += num_zeros;
}

debug_assert_eq!(idx, array.len());

let validity = validity.freeze();

arrays.push_back((array.values().as_ref(), Some(validity)));
}

fn to_leaves_recursive<'a>(array: &'a dyn Array, leaves: &mut Vec<&'a dyn Array>) {
use PhysicalType::*;
match array.data_type().to_physical_type() {
Struct => {
let array = array.as_any().downcast_ref::<StructArray>().unwrap();
array
.values()
.iter()
.for_each(|a| to_leaves_recursive(a.as_ref(), leaves));
},
List => {
let array = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();
to_leaves_recursive(array.values().as_ref(), leaves);
},
LargeList => {
let array = array.as_any().downcast_ref::<ListArray<i64>>().unwrap();
to_leaves_recursive(array.values().as_ref(), leaves);
},
FixedSizeList => {
let array = array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();
to_leaves_recursive(array.values().as_ref(), leaves);
},
Map => {
let array = array.as_any().downcast_ref::<MapArray>().unwrap();
to_leaves_recursive(array.field().as_ref(), leaves);
},
Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8
| LargeUtf8 | Dictionary(_) | BinaryView | Utf8View => leaves.push(array),
other => todo!("Writing {:?} to parquet not yet implemented", other),
/// Convert [`Array`] to a `Vec<Box<dyn Array>>` leaves in DFS order.
///
/// Each leaf array has the validity propagated from the nesting levels above.
pub fn to_leaves(array: &dyn Array, leaves: &mut Vec<Box<dyn Array>>) {
use PhysicalType as P;

leaves.clear();
let mut array_stack: VecDeque<(&dyn Array, Option<Bitmap>)> = VecDeque::new();

array_stack.push_back((array, None));

while let Some((array, parent_validity)) = array_stack.pop_front() {
let child_validity = array.validity().cloned();
let validity = match (parent_validity, child_validity) {
(Some(parent), Some(child)) => Some((&parent) & (&child)),
(Some(v), None) | (None, Some(v)) => Some(v),
(None, None) => None,
};

match array.data_type().to_physical_type() {
P::Struct => {
let array = array.as_any().downcast_ref::<StructArray>().unwrap();

leaves.reserve(array.len().saturating_sub(1));
array_stack.extend(
array
.values()
.iter()
.map(|field| (field.as_ref(), validity.clone())),
);
},
P::List => {
let array = array.as_any().downcast_ref::<ListArray<i32>>().unwrap();
expand_list_validity(array, validity, &mut array_stack);
},
P::LargeList => {
let array = array.as_any().downcast_ref::<ListArray<i64>>().unwrap();
expand_list_validity(array, validity, &mut array_stack);
},
P::FixedSizeList => {
let array = array.as_any().downcast_ref::<FixedSizeListArray>().unwrap();

let num_values = array.values().len();
let size = array.size();

let validity = validity.map(|fsl_validity| {
let mut validity = MutableBitmap::with_capacity(num_values);
let mut fsl_validity_iter = fsl_validity.iter();

let mut idx = 0;
while fsl_validity_iter.num_remaining() > 0 {
let num_ones = fsl_validity_iter.take_leading_ones();
let num_elements = num_ones * size;
validity.extend_constant(num_elements, true);

idx += num_ones;

let num_zeros = fsl_validity_iter.take_leading_zeros();
let num_elements = num_zeros * size;
validity.extend_constant(num_elements, false);

idx += num_zeros;
}

debug_assert_eq!(idx, array.len());

validity.freeze()
});

array_stack.push_back((array.values().as_ref(), validity));
},
P::Map => {
let array = array.as_any().downcast_ref::<MapArray>().unwrap();
array_stack.push_back((array.field().as_ref(), validity));
},
P::Null
| P::Boolean
| P::Primitive(_)
| P::Binary
| P::FixedSizeBinary
| P::LargeBinary
| P::Utf8
| P::LargeUtf8
| P::Dictionary(_)
| P::BinaryView
| P::Utf8View => {
leaves.push(array.with_validity(validity));
},

other => todo!("Writing {:?} to parquet not yet implemented", other),
}
}
}

Expand Down Expand Up @@ -333,11 +425,13 @@ pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(
encoding: &[Encoding],
) -> PolarsResult<Vec<DynIter<'static, PolarsResult<Page>>>> {
let array = array.as_ref();

let nested = to_nested(array, &type_)?;

let types = to_parquet_leaves(type_);

let values = to_leaves(array);
let mut values = Vec::new();
to_leaves(array, &mut values);

assert_eq!(encoding.len(), types.len());

Expand All @@ -347,7 +441,7 @@ pub fn array_to_columns<A: AsRef<dyn Array> + Send + Sync>(
.zip(types)
.zip(encoding.iter())
.map(|(((values, nested), type_), encoding)| {
array_to_pages(*values, type_, &nested, options, *encoding)
array_to_pages(values.as_ref(), type_, &nested, options, *encoding)
})
.collect()
}
Expand All @@ -370,9 +464,8 @@ pub fn arrays_to_columns<A: AsRef<dyn Array> + Send + Sync>(
// Ensure we transpose the leaves. So that all the leaves from the same columns are at the same level vec.
let mut scratch = vec![];
for arr in arrays {
scratch.clear();
to_leaves_recursive(arr.as_ref(), &mut scratch);
for (i, leave) in scratch.iter().copied().enumerate() {
to_leaves(arr.as_ref(), &mut scratch);
for (i, leave) in std::mem::take(&mut scratch).into_iter().enumerate() {
while i < leaves.len() {
leaves.push(vec![]);
}
Expand All @@ -387,7 +480,13 @@ pub fn arrays_to_columns<A: AsRef<dyn Array> + Send + Sync>(
.zip(encoding.iter())
.map(move |(((values, nested), type_), encoding)| {
let iter = values.into_iter().map(|leave_values| {
array_to_pages(leave_values, type_.clone(), &nested, options, *encoding)
array_to_pages(
leave_values.as_ref(),
type_.clone(),
&nested,
options,
*encoding,
)
});

// Need a scratch to bubble up the error :/
Expand Down
23 changes: 23 additions & 0 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1270,3 +1270,26 @@ def test_parquet_list_element_field_name(tmp_path: Path) -> None:
schema_str = str(pq.read_schema(filename))
assert "<element: int64>" in schema_str
assert "child 0, element: int64" in schema_str


@pytest.mark.parametrize(
("s", "elem"),
[
(pl.Series(["", "hello", "hi", ""], dtype=pl.String), ""),
(pl.Series([0, 1, 2, 0], dtype=pl.Int64), 0),
(pl.Series([[0], [1], [2], [0]], dtype=pl.Array(pl.Int64, 1)), [0]),
(
pl.Series([[0, 1], [1, 2], [2, 3], [0, 1]], dtype=pl.Array(pl.Int64, 2)),
[0, 1],
),
],
)
def test_parquet_high_nested_null_17805(s, elem) -> None:
test_round_trip(
pl.DataFrame({"a": s}).select(
pl.when(pl.col("a") == elem)
.then(pl.lit(None))
.otherwise(pl.concat_list(pl.col("a").alias("b")))
.alias("c")
)
)

0 comments on commit 57ae3b3

Please sign in to comment.