Skip to content

Commit

Permalink
feat(rust): read metadata for parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
coastalwhite committed May 23, 2024
1 parent 83b4c17 commit 2370526
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 4 deletions.
20 changes: 19 additions & 1 deletion crates/polars-core/src/series/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use num_traits::NumCast;
use rayon::prelude::*;
pub use series_trait::{IsSorted, *};

use crate::chunked_array::Settings;
use crate::chunked_array::{Metadata, Settings};
#[cfg(feature = "zip_with")]
use crate::series::arithmetic::coerce_lhs_rhs;
use crate::utils::{
Expand Down Expand Up @@ -238,6 +238,24 @@ impl Series {
self
}

/// Try to set the [`Metadata`] for the underlying [`ChunkedArray`]
///
/// This does not guarantee that the [`Metadata`] is always set. It returns whether it was
/// successful.
pub fn try_set_metadata<T: PolarsDataType + 'static>(&mut self, metadata: Metadata<T>) -> bool {
let inner = self._get_inner_mut();

// @NOTE: These types are not the same if they are logical for example. For now, we just
// say: do not set the metadata when you get into this situation. This can be a @TODO for
// later.
if &T::get_dtype() != inner.dtype() {
return false;
}

inner.as_mut().md = Some(Arc::new(metadata));
true
}

pub fn from_arrow(name: &str, array: ArrayRef) -> PolarsResult<Series> {
Self::try_from((name, array))
}
Expand Down
1 change: 1 addition & 0 deletions crates/polars-io/src/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod options;
mod predicates;
mod read_impl;
mod reader;
mod to_metadata;
mod utils;

pub use options::{ParallelStrategy, ParquetOptions};
Expand Down
58 changes: 55 additions & 3 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ use arrow::datatypes::ArrowSchemaRef;
use polars_core::prelude::*;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use polars_parquet::read;
use polars_parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use polars_parquet::parquet::statistics::{
BinaryStatistics, BooleanStatistics, PrimitiveStatistics,
};
use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData};
use rayon::prelude::*;

#[cfg(feature = "cloud")]
use super::async_impl::FetchRowGroupsFromObjectStore;
use super::mmap::{mmap_columns, ColumnStore};
use super::predicates::read_this_row_group;
use super::to_metadata::ToMetadata;
use super::utils::materialize_empty_df;
use super::{mmap, ParallelStrategy};
use crate::mmap::{MmapBytesReader, ReaderBytes};
Expand Down Expand Up @@ -67,11 +70,60 @@ fn column_idx_to_series(
let columns = mmap_columns(store, md.columns(), &field.name);
let iter = mmap::to_deserializer(columns, field.clone(), remaining_rows, Some(chunk_size))?;

if remaining_rows < md.num_rows() {
let mut series = if remaining_rows < md.num_rows() {
array_iter_to_series(iter, field, Some(remaining_rows))
} else {
array_iter_to_series(iter, field, None)
}?;

// See if we can find some statistics for this series. If we cannot find anything just return
// the series as is.
let Some(Ok(stats)) = md.columns()[column_i].statistics() else {
return Ok(series);
};

let series_trait = series.as_ref();
let stats = stats.as_ref();

macro_rules! match_dtypes_into_metadata {
($(($dtype:pat, $phystype:pat) => ($stats:ty, $pldtype:ty),)+) => {
match (series_trait.dtype(), stats.physical_type()) {
$(
($dtype, $phystype) => {
let stats = stats.as_any().downcast_ref::<$stats>().expect(concat!(
"Failed to cast Statistics to ",
stringify!($stats),
" for ",
stringify!($pldtype),
));
let md = ToMetadata::<$pldtype>::to_metadata(stats);
series.try_set_metadata(md);
})+
_ => {},
}
};
}

// Match the data types used by the Series and by the Statistics. If we find a match, set some
// Metadata for the underlying ChunkedArray.
use {DataType as D, PhysicalType as P};
match_dtypes_into_metadata! {
(D::Boolean, P::Boolean ) => (BooleanStatistics, BooleanType),
(D::UInt8, P::Int32 ) => (PrimitiveStatistics<i32>, UInt8Type ),
(D::UInt16, P::Int32 ) => (PrimitiveStatistics<i32>, UInt16Type ),
(D::UInt32, P::Int32 ) => (PrimitiveStatistics<i32>, UInt32Type ),
(D::UInt64, P::Int64 ) => (PrimitiveStatistics<i64>, UInt64Type ),
(D::Int8, P::Int32 ) => (PrimitiveStatistics<i32>, Int8Type ),
(D::Int16, P::Int32 ) => (PrimitiveStatistics<i32>, Int16Type ),
(D::Int32, P::Int32 ) => (PrimitiveStatistics<i32>, Int32Type ),
(D::Int64, P::Int64 ) => (PrimitiveStatistics<i64>, Int64Type ),
(D::Float32, P::Float ) => (PrimitiveStatistics<f32>, Float32Type),
(D::Float64, P::Double ) => (PrimitiveStatistics<f64>, Float64Type),
(D::String, P::ByteArray) => (BinaryStatistics, StringType ),
(D::Binary, P::ByteArray) => (BinaryStatistics, BinaryType ),
}

Ok(series)
}

pub(super) fn array_iter_to_series(
Expand Down
114 changes: 114 additions & 0 deletions crates/polars-io/src/parquet/read/to_metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use polars_core::chunked_array::Metadata;
use polars_core::datatypes::{
BinaryType, BooleanType, Float32Type, Float64Type, Int16Type, Int32Type, Int64Type, Int8Type,
PolarsDataType, StringType, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
};
use polars_parquet::parquet::statistics::{
BinaryStatistics, BooleanStatistics, PrimitiveStatistics, Statistics,
};

pub trait ToMetadata<D: PolarsDataType + 'static>: Statistics + Sized + 'static {
fn to_metadata(&self) -> Metadata<D>;
}

impl ToMetadata<BooleanType> for BooleanStatistics {
fn to_metadata(&self) -> Metadata<BooleanType> {
let mut md = Metadata::default();

if let Some(distinct_count) = self.distinct_count.and_then(|v| v.try_into().ok()) {
md.set_distinct_count(distinct_count);
}
if let Some(min_value) = self.min_value {
md.set_min_value(min_value);
}
if let Some(max_value) = self.max_value {
md.set_max_value(max_value);
}

md
}
}

impl ToMetadata<BinaryType> for BinaryStatistics {
fn to_metadata(&self) -> Metadata<BinaryType> {
let mut md = Metadata::default();

if let Some(distinct_count) = self.distinct_count.and_then(|v| v.try_into().ok()) {
md.set_distinct_count(distinct_count);
}
if let Some(min_value) = self.min_value.as_ref() {
md.set_min_value(min_value.clone().into_boxed_slice());
}
if let Some(max_value) = self.max_value.as_ref() {
md.set_max_value(max_value.clone().into_boxed_slice());
}

md
}
}

impl ToMetadata<StringType> for BinaryStatistics {
fn to_metadata(&self) -> Metadata<StringType> {
let mut md = Metadata::default();

if let Some(distinct_count) = self.distinct_count.and_then(|v| v.try_into().ok()) {
md.set_distinct_count(distinct_count);
}
if let Some(min_value) = self
.min_value
.as_ref()
.and_then(|s| String::from_utf8(s.clone()).ok())
{
md.set_min_value(min_value);
}
if let Some(max_value) = self
.max_value
.as_ref()
.and_then(|s| String::from_utf8(s.clone()).ok())
{
md.set_max_value(max_value);
}

md
}
}

macro_rules! prim_statistics {
($(($bstore:ty, $pltype:ty),)+) => {
$(
impl ToMetadata<$pltype> for PrimitiveStatistics<$bstore> {
fn to_metadata(&self) -> Metadata<$pltype> {
let mut md = Metadata::default();

if let Some(distinct_count) = self.distinct_count.and_then(|v| v.try_into().ok())
{
md.set_distinct_count(distinct_count);
}
if let Some(min_value) = self.min_value {
md.set_min_value(min_value as <$pltype as PolarsDataType>::OwnedPhysical);
}
if let Some(max_value) = self.max_value {
md.set_max_value(max_value as <$pltype as PolarsDataType>::OwnedPhysical);
}

md
}
}
)+
}
}

prim_statistics! {
(i32, Int8Type),
(i32, Int16Type),
(i32, Int32Type),
(i64, Int64Type),

(i32, UInt8Type),
(i32, UInt16Type),
(i32, UInt32Type),
(i64, UInt64Type),

(f32, Float32Type),
(f64, Float64Type),
}

0 comments on commit 2370526

Please sign in to comment.