Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Several parquet reader/writer regressions #17941

Merged
merged 8 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/fixed_size_binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ impl FixedSizeBinaryArray {
polars_ensure!(*size != 0, ComputeError: "FixedSizeBinaryArray expects a positive size");
Ok(*size)
},
_ => {
polars_bail!(ComputeError: "FixedSizeBinaryArray expects DataType::FixedSizeBinary")
other => {
polars_bail!(ComputeError: "FixedSizeBinaryArray expects DataType::FixedSizeBinary. found {other:?}")
},
}
}
Expand Down
10 changes: 3 additions & 7 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,17 @@ use arrow::types::Offset;
pub(crate) use basic::BinaryDecoder;

use self::utils::Binary;
use super::utils::freeze_validity;
use super::ParquetResult;

fn finalize<O: Offset>(
data_type: ArrowDataType,
mut values: Binary<O>,
mut validity: MutableBitmap,
validity: MutableBitmap,
) -> ParquetResult<Box<dyn Array>> {
values.offsets.shrink_to_fit();
values.values.shrink_to_fit();
let validity = if validity.is_empty() {
None
} else {
validity.shrink_to_fit();
Some(validity.freeze())
};
let validity = freeze_validity(validity);

match data_type.to_physical_type() {
PhysicalType::Binary | PhysicalType::LargeBinary => unsafe {
Expand Down
9 changes: 4 additions & 5 deletions crates/polars-parquet/src/arrow/read/deserialize/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use arrow::array::{
Array, BinaryViewArray, DictionaryArray, DictionaryKey, MutableBinaryViewArray, PrimitiveArray,
Utf8ViewArray, View,
};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::{ArrowDataType, PhysicalType};
use polars_error::PolarsResult;

use super::binary::decoders::*;
use super::utils::freeze_validity;
use crate::parquet::encoding::hybrid_rle::{self, DictionaryTranslator};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{DataPage, DictPage};
Expand Down Expand Up @@ -235,11 +236,9 @@ impl utils::Decoder for BinViewDecoder {
(values, validity): Self::DecodedState,
) -> ParquetResult<Box<dyn Array>> {
let mut array: BinaryViewArray = values.freeze();
let validity: Bitmap = validity.freeze();

if validity.unset_bits() != validity.len() {
array = array.with_validity(Some(validity))
}
let validity = freeze_validity(validity);
array = array.with_validity(validity);

match data_type.to_physical_type() {
PhysicalType::BinaryView => Ok(array.boxed()),
Expand Down
8 changes: 4 additions & 4 deletions crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;

use super::utils;
use super::utils::{extend_from_decoder, Decoder, ExactSize};
use super::utils::{self, extend_from_decoder, freeze_validity, Decoder, ExactSize};
use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer;
use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::encoding::Encoding;
Expand Down Expand Up @@ -50,7 +49,7 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> {
},
Encoding::Rle => {
// @NOTE: For a nullable list, we might very well overestimate the amount of
// values, but we never collect those items. We don't really have a way to now the
// values, but we never collect those items. We don't really have a way to know the
// number of valid items in the V1 specification.

// For RLE boolean values the length in bytes is pre-pended.
Expand Down Expand Up @@ -232,7 +231,8 @@ impl Decoder for BooleanDecoder {
_dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<Self::Output> {
Ok(BooleanArray::new(data_type, values.into(), validity.into()))
let validity = freeze_validity(validity);
Ok(BooleanArray::new(data_type, values.into(), validity))
}
}

Expand Down
11 changes: 3 additions & 8 deletions crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;

use super::utils::{
self, dict_indices_decoder, extend_from_decoder, BatchableCollector, Decoder, DictDecodable,
ExactSize, PageValidity, StateTranslation,
self, dict_indices_decoder, extend_from_decoder, freeze_validity, BatchableCollector, Decoder,
DictDecodable, ExactSize, PageValidity, StateTranslation,
};
use super::ParquetError;
use crate::parquet::encoding::hybrid_rle::{self, HybridRleDecoder, Translator};
Expand Down Expand Up @@ -118,12 +118,7 @@ impl<K: DictionaryKey, D: utils::DictDecodable> utils::Decoder for DictionaryDec
dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<DictionaryArray<K>> {
let validity = if validity.is_empty() || validity.unset_bits() == 0 {
None
} else {
Some(validity.freeze())
};

let validity = freeze_validity(validity);
let dict = dict.unwrap();
let keys = PrimitiveArray::new(K::PRIMITIVE.into(), values.into(), validity);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;

use super::utils::{dict_indices_decoder, extend_from_decoder, not_implemented, Decoder};
use super::utils::{
dict_indices_decoder, extend_from_decoder, freeze_validity, not_implemented, Decoder,
};
use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer;
use crate::parquet::encoding::{hybrid_rle, Encoding};
use crate::parquet::error::{ParquetError, ParquetResult};
Expand Down Expand Up @@ -224,6 +226,12 @@ impl Decoder for BinaryDecoder {
}

fn gather_one(&self, target: &mut Self::Target, value: &'a [u8]) -> ParquetResult<()> {
// We make the null value length 0, which allows us to do this.
if value.is_empty() {
target.resize(target.len() + self.size, 0);
return Ok(());
}

target.extend_from_slice(value);
Ok(())
}
Expand All @@ -234,9 +242,17 @@ impl Decoder for BinaryDecoder {
value: &'a [u8],
n: usize,
) -> ParquetResult<()> {
// We make the null value length 0, which allows us to do this.
if value.is_empty() {
target.resize(target.len() + n * self.size, 0);
return Ok(());
}

debug_assert_eq!(value.len(), self.size);
for _ in 0..n {
target.extend(value);
}

Ok(())
}
}
Expand All @@ -246,7 +262,10 @@ impl Decoder for BinaryDecoder {
size: self.size,
};

let null_value = &dict[..self.size];
// @NOTE:
// This is a special case in our gatherer. If the length of the value is 0, then we just
// resize with the appropriate size. Important is that this also works for FSL with size=0.
let null_value = &[];

match page_validity {
None => {
Expand Down Expand Up @@ -274,10 +293,11 @@ impl Decoder for BinaryDecoder {
_dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<Self::Output> {
let validity = freeze_validity(validity);
Ok(FixedSizeBinaryArray::new(
data_type,
values.values.into(),
validity.into(),
validity,
))
}
}
Expand Down
16 changes: 8 additions & 8 deletions crates/polars-parquet/src/arrow/read/deserialize/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use simple::page_iter_to_array;

pub use self::nested_utils::{init_nested, InitNested, NestedState};
pub use self::utils::filter::Filter;
use self::utils::freeze_validity;
use super::*;
use crate::parquet::read::get_page_iterator as _get_page_iterator;
use crate::parquet::schema::types::PrimitiveType;
Expand Down Expand Up @@ -48,6 +49,7 @@ pub fn create_list(
values: Box<dyn Array>,
) -> Box<dyn Array> {
let (mut offsets, validity) = nested.pop().unwrap();
let validity = validity.and_then(freeze_validity);
match data_type.to_logical_type() {
ArrowDataType::List(_) => {
offsets.push(values.len() as i64);
Expand All @@ -62,7 +64,7 @@ pub fn create_list(
data_type,
offsets.into(),
values,
validity.and_then(|x| x.into()),
validity,
))
},
ArrowDataType::LargeList(_) => {
Expand All @@ -72,14 +74,12 @@ pub fn create_list(
data_type,
offsets.try_into().expect("List too large"),
values,
validity.and_then(|x| x.into()),
validity,
))
},
ArrowDataType::FixedSizeList(_, _) => Box::new(FixedSizeListArray::new(
data_type,
values,
validity.and_then(|x| x.into()),
)),
ArrowDataType::FixedSizeList(_, _) => {
Box::new(FixedSizeListArray::new(data_type, values, validity))
},
_ => unreachable!(),
}
}
Expand All @@ -104,7 +104,7 @@ pub fn create_map(
data_type,
offsets.into(),
values,
validity.and_then(|x| x.into()),
validity.and_then(freeze_validity),
))
},
_ => unreachable!(),
Expand Down
Loading
Loading