Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Integer fast path Parquet dict encoding #18030

Merged
merged 2 commits into from
Aug 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 124 additions & 1 deletion crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use arrow::array::{Array, BinaryViewArray, DictionaryArray, DictionaryKey, Utf8ViewArray};
use arrow::array::{
Array, BinaryViewArray, DictionaryArray, DictionaryKey, PrimitiveArray, Utf8ViewArray,
};
use arrow::bitmap::{Bitmap, MutableBitmap};
use arrow::buffer::Buffer;
use arrow::datatypes::{ArrowDataType, IntegerType};
use arrow::types::NativeType;
use polars_compute::min_max::MinMaxKernel;
use polars_error::{polars_bail, PolarsResult};
use polars_utils::unwrap::UnwrapUncheckedRelease;

use super::binary::{
build_statistics as binary_build_statistics, encode_plain as binary_encode_plain,
Expand All @@ -24,12 +30,129 @@ use crate::parquet::statistics::ParquetStatistics;
use crate::parquet::CowBuffer;
use crate::write::DynIter;

trait MinMaxThreshold {
const DELTA_THRESHOLD: Self;
}

macro_rules! minmaxthreshold_impls {
($($t:ty => $threshold:literal,)+) => {
$(
impl MinMaxThreshold for $t {
const DELTA_THRESHOLD: Self = $threshold;
}
)+
};
}

minmaxthreshold_impls! {
i8 => 16,
i16 => 256,
i32 => 512,
i64 => 2048,
u8 => 16,
u16 => 256,
u32 => 512,
u64 => 2048,
}

fn min_max_integer_encode_as_dictionary_optional<'a, E, T>(
array: &'a dyn Array,
) -> Option<DictionaryArray<u32>>
where
E: std::fmt::Debug,
T: NativeType
+ MinMaxThreshold
+ std::cmp::Ord
+ TryInto<u32, Error = E>
+ std::ops::Sub<T, Output = T>
+ num_traits::CheckedSub,
std::ops::RangeInclusive<T>: Iterator<Item = T>,
PrimitiveArray<T>: MinMaxKernel<Scalar<'a> = T>,
{
use ArrowDataType as DT;
let (min, max): (T, T) = <PrimitiveArray<T> as MinMaxKernel>::min_max_ignore_nan_kernel(
array.as_any().downcast_ref().unwrap(),
)?;

debug_assert!(max >= min, "{max} >= {min}");
if !max
.checked_sub(&min)
.is_some_and(|v| v <= T::DELTA_THRESHOLD)
{
return None;
}

// @TODO: This currently overestimates the values, it might be interesting to use the unique
// kernel here.
let values = PrimitiveArray::new(DT::from(T::PRIMITIVE), (min..=max).collect(), None);
let values = Box::new(values);

let keys: Buffer<u32> = array
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap()
.values()
.iter()
.map(|v| unsafe {
// @NOTE:
// Since the values might contain nulls which have a undefined value. We just
// clamp the values to between the min and max value. This way, they will still
// be valid dictionary keys. This is mostly to make the
// unwrap_unchecked_release not produce any unsafety.
(*v.clamp(&min, &max) - min)
.try_into()
.unwrap_unchecked_release()
})
.collect();

let keys = PrimitiveArray::new(DT::UInt32, keys, array.validity().cloned());
Some(
DictionaryArray::<u32>::try_new(
ArrowDataType::Dictionary(
IntegerType::UInt32,
Box::new(DT::from(T::PRIMITIVE)),
false, // @TODO: This might be able to be set to true?
),
keys,
values,
)
.unwrap(),
)
}

pub(crate) fn encode_as_dictionary_optional(
array: &dyn Array,
nested: &[Nested],
type_: PrimitiveType,
options: WriteOptions,
) -> Option<PolarsResult<DynIter<'static, PolarsResult<Page>>>> {
use ArrowDataType as DT;
let fast_dictionary = match array.data_type() {
DT::Int8 => min_max_integer_encode_as_dictionary_optional::<_, i8>(array),
DT::Int16 => min_max_integer_encode_as_dictionary_optional::<_, i16>(array),
DT::Int32 | DT::Date32 | DT::Time32(_) => {
min_max_integer_encode_as_dictionary_optional::<_, i32>(array)
},
DT::Int64 | DT::Date64 | DT::Time64(_) | DT::Timestamp(_, _) | DT::Duration(_) => {
min_max_integer_encode_as_dictionary_optional::<_, i64>(array)
},
DT::UInt8 => min_max_integer_encode_as_dictionary_optional::<_, u8>(array),
DT::UInt16 => min_max_integer_encode_as_dictionary_optional::<_, u16>(array),
DT::UInt32 => min_max_integer_encode_as_dictionary_optional::<_, u32>(array),
DT::UInt64 => min_max_integer_encode_as_dictionary_optional::<_, u64>(array),
_ => None,
};

if let Some(fast_dictionary) = fast_dictionary {
return Some(array_to_pages(
&fast_dictionary,
type_,
nested,
options,
Encoding::RleDictionary,
));
}

let dtype = Box::new(array.data_type().clone());

let len_before = array.len();
Expand Down