Skip to content

Commit

Permalink
refactor: Add general filters in Parquet (#17910)
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite authored Jul 29, 2024
1 parent 6e4b481 commit 82b6388
Show file tree
Hide file tree
Showing 32 changed files with 1,105 additions and 1,546 deletions.
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/dictionary/typed_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl<O: Offset> DictValue for Utf8Array<O> {
arr.null_count(),
0,
"null values in values not supported in iteration"
)
);
})
}
}
Expand All @@ -69,7 +69,7 @@ impl DictValue for Utf8ViewArray {
arr.null_count(),
0,
"null values in values not supported in iteration"
)
);
})
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-expr/src/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,10 +435,10 @@ mod stats {
// Default: read the file
_ => Ok(true),
};
out.inspect(|read| {
if state.verbose() && *read {
out.inspect(|&read| {
if state.verbose() && read {
eprintln!("parquet file must be read, statistics not sufficient for predicate.")
} else if state.verbose() && !*read {
} else if state.verbose() && !read {
eprintln!("parquet file can be skipped, the statistics were sufficient to apply the predicate.")
}
})
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/read/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_core::datatypes::PlHashMap;
use polars_error::PolarsResult;
use polars_parquet::read::{
column_iter_to_arrays, get_field_columns, ArrayIter, BasicDecompressor, ColumnChunkMetaData,
PageReader,
Filter, PageReader,
};
use polars_utils::mmap::{MemReader, MemSlice};

Expand Down Expand Up @@ -87,5 +87,5 @@ pub(super) fn to_deserializer<'a>(
})
.unzip();

column_iter_to_arrays(columns, types, field, num_rows)
column_iter_to_arrays(columns, types, field, Some(Filter::new_limited(num_rows)))
}
15 changes: 8 additions & 7 deletions crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicBool, Ordering};

use arrow::array::specification::try_check_utf8;
use arrow::array::{Array, BinaryArray, DictionaryArray, DictionaryKey, PrimitiveArray, Utf8Array};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::{ArrowDataType, PhysicalType};
use arrow::offset::Offset;
use polars_error::PolarsResult;
Expand Down Expand Up @@ -33,14 +33,13 @@ impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder<O>> for BinaryStateTransl
page: &'a DataPage,
dict: Option<&'a <BinaryDecoder<O> as utils::Decoder>::Dict>,
page_validity: Option<&utils::PageValidity<'a>>,
filter: Option<&utils::filter::Filter<'a>>,
) -> PolarsResult<Self> {
let is_string = matches!(
page.descriptor.primitive_type.logical_type,
Some(PrimitiveLogicalType::String)
);
decoder.check_utf8.store(is_string, Ordering::Relaxed);
BinaryStateTranslation::new(page, dict, page_validity, filter, is_string)
BinaryStateTranslation::new(page, dict, page_validity, is_string)
}

fn len_when_not_nullable(&self) -> usize {
Expand Down Expand Up @@ -154,6 +153,7 @@ impl<O: Offset> utils::Decoder for BinaryDecoder<O> {
type Translation<'a> = BinaryStateTranslation<'a>;
type Dict = BinaryDict;
type DecodedState = (Binary<O>, MutableBitmap);
type Output = Box<dyn Array>;

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Expand Down Expand Up @@ -272,16 +272,19 @@ impl<O: Offset> utils::Decoder for BinaryDecoder<O> {
fn finalize(
&self,
data_type: ArrowDataType,
_dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<Box<dyn Array>> {
super::finalize(data_type, values, validity)
}
}

impl<O: Offset> utils::DictDecodable for BinaryDecoder<O> {
fn finalize_dict_array<K: DictionaryKey>(
&self,
data_type: ArrowDataType,
dict: Self::Dict,
(values, validity): (Vec<K>, Option<Bitmap>),
keys: PrimitiveArray<K>,
) -> ParquetResult<DictionaryArray<K>> {
let value_data_type = match data_type.clone() {
ArrowDataType::Dictionary(_, values, _) => *values,
Expand All @@ -299,10 +302,8 @@ impl<O: Offset> utils::Decoder for BinaryDecoder<O> {
_ => unreachable!(),
};

let indices = PrimitiveArray::new(K::PRIMITIVE.into(), values.into(), validity);

// @TODO: Is this datatype correct?
Ok(DictionaryArray::try_new(data_type, indices, dict).unwrap())
Ok(DictionaryArray::try_new(data_type, keys, dict).unwrap())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use super::utils::*;
use crate::parquet::encoding::{delta_bitpacked, delta_length_byte_array, hybrid_rle, Encoding};
use crate::parquet::error::ParquetResult;
use crate::parquet::page::{split_buffer, DataPage};
use crate::read::deserialize::utils::filter::Filter;
use crate::read::deserialize::utils::PageValidity;

pub(crate) type BinaryDict = BinaryArray<i64>;
Expand Down Expand Up @@ -150,7 +149,6 @@ impl<'a> BinaryStateTranslation<'a> {
page: &'a DataPage,
dict: Option<&'a BinaryDict>,
_page_validity: Option<&PageValidity<'a>>,
_filter: Option<&Filter<'a>>,
is_string: bool,
) -> PolarsResult<Self> {
match (page.encoding(), dict) {
Expand Down
13 changes: 7 additions & 6 deletions crates/polars-parquet/src/arrow/read/deserialize/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::parquet::encoding::hybrid_rle::{self, DictionaryTranslator};
use crate::parquet::error::{ParquetError, ParquetResult};
use crate::parquet::page::{DataPage, DictPage};
use crate::read::deserialize::binary::utils::BinaryIter;
use crate::read::deserialize::utils::filter::Filter;
use crate::read::deserialize::utils::{
self, binary_views_dict, extend_from_decoder, Decoder, PageValidity, StateTranslation,
TranslatedHybridRle,
Expand All @@ -30,14 +29,13 @@ impl<'a> StateTranslation<'a, BinViewDecoder> for BinaryStateTranslation<'a> {
page: &'a DataPage,
dict: Option<&'a <BinViewDecoder as utils::Decoder>::Dict>,
page_validity: Option<&PageValidity<'a>>,
filter: Option<&Filter<'a>>,
) -> PolarsResult<Self> {
let is_string = matches!(
page.descriptor.primitive_type.logical_type,
Some(PrimitiveLogicalType::String)
);
decoder.check_utf8.store(is_string, Ordering::Relaxed);
Self::new(page, dict, page_validity, filter, is_string)
Self::new(page, dict, page_validity, is_string)
}

fn len_when_not_nullable(&self) -> usize {
Expand Down Expand Up @@ -149,6 +147,7 @@ impl utils::Decoder for BinViewDecoder {
type Translation<'a> = BinaryStateTranslation<'a>;
type Dict = BinaryDict;
type DecodedState = DecodedStateTuple;
type Output = Box<dyn Array>;

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Expand Down Expand Up @@ -232,6 +231,7 @@ impl utils::Decoder for BinViewDecoder {
fn finalize(
&self,
data_type: ArrowDataType,
_dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<Box<dyn Array>> {
let mut array: BinaryViewArray = values.freeze();
Expand Down Expand Up @@ -260,12 +260,14 @@ impl utils::Decoder for BinViewDecoder {
_ => unreachable!(),
}
}
}

impl utils::DictDecodable for BinViewDecoder {
fn finalize_dict_array<K: DictionaryKey>(
&self,
data_type: ArrowDataType,
dict: Self::Dict,
(values, validity): (Vec<K>, Option<Bitmap>),
keys: PrimitiveArray<K>,
) -> ParquetResult<DictionaryArray<K>> {
let value_data_type = match &data_type {
ArrowDataType::Dictionary(_, values, _) => values.as_ref().clone(),
Expand All @@ -278,14 +280,13 @@ impl utils::Decoder for BinViewDecoder {
}
let view_dict = view_dict.freeze();

let array = PrimitiveArray::<K>::new(K::PRIMITIVE.into(), values.into(), validity);
let dict = match value_data_type.to_physical_type() {
PhysicalType::Utf8View => view_dict.to_utf8view().unwrap().boxed(),
PhysicalType::BinaryView => view_dict.boxed(),
_ => unreachable!(),
};

Ok(DictionaryArray::try_new(data_type, array, dict).unwrap())
Ok(DictionaryArray::try_new(data_type, keys, dict).unwrap())
}
}

Expand Down
21 changes: 4 additions & 17 deletions crates/polars-parquet/src/arrow/read/deserialize/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::parquet::encoding::hybrid_rle::HybridRleDecoder;
use crate::parquet::encoding::Encoding;
use crate::parquet::error::ParquetResult;
use crate::parquet::page::{split_buffer, DataPage, DictPage};
use crate::read::deserialize::utils::filter::Filter;
use crate::read::deserialize::utils::{BatchableCollector, PageValidity};

#[allow(clippy::large_enum_variant)]
Expand All @@ -29,7 +28,6 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> {
page: &'a DataPage,
_dict: Option<&'a <BooleanDecoder as Decoder>::Dict>,
page_validity: Option<&PageValidity<'a>>,
_filter: Option<&Filter<'a>>,
) -> PolarsResult<Self> {
let values = split_buffer(page)?.values;

Expand Down Expand Up @@ -189,6 +187,7 @@ impl Decoder for BooleanDecoder {
type Translation<'a> = StateTranslation<'a>;
type Dict = ();
type DecodedState = (MutableBitmap, MutableBitmap);
type Output = BooleanArray;

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
(
Expand Down Expand Up @@ -230,22 +229,10 @@ impl Decoder for BooleanDecoder {
fn finalize(
&self,
data_type: ArrowDataType,
_dict: Option<Self::Dict>,
(values, validity): Self::DecodedState,
) -> ParquetResult<Box<dyn arrow::array::Array>> {
Ok(Box::new(BooleanArray::new(
data_type,
values.into(),
validity.into(),
)))
}

fn finalize_dict_array<K: arrow::array::DictionaryKey>(
&self,
_data_type: ArrowDataType,
_dict: Self::Dict,
_decoded: (Vec<K>, Option<arrow::bitmap::Bitmap>),
) -> ParquetResult<arrow::array::DictionaryArray<K>> {
unimplemented!()
) -> ParquetResult<Self::Output> {
Ok(BooleanArray::new(data_type, values.into(), validity.into()))
}
}

Expand Down
Loading

0 comments on commit 82b6388

Please sign in to comment.