diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 87a26a685331..17179f89cbdd 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -613,6 +613,10 @@ impl PhysicalIoExpr for PhysicalIoHelper { self.expr.evaluate(df, &state) } + fn live_variables(&self) -> Option>> { + Some(expr_to_leaf_column_names(self.expr.as_expression()?)) + } + #[cfg(feature = "parquet")] fn as_stats_evaluator(&self) -> Option<&dyn polars_io::predicates::StatsEvaluator> { self.expr.as_stats_evaluator() diff --git a/crates/polars-io/src/parquet/read/mmap.rs b/crates/polars-io/src/parquet/read/mmap.rs index 429de4d95350..69ba42ac4c29 100644 --- a/crates/polars-io/src/parquet/read/mmap.rs +++ b/crates/polars-io/src/parquet/read/mmap.rs @@ -65,7 +65,7 @@ fn _mmap_single_column<'a>( pub(super) fn to_deserializer<'a>( columns: Vec<(&ColumnChunkMetaData, MemSlice)>, field: Field, - num_rows: usize, + filter: Option, ) -> PolarsResult> { let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() @@ -87,5 +87,5 @@ pub(super) fn to_deserializer<'a>( }) .unzip(); - column_iter_to_arrays(columns, types, field, Some(Filter::new_limited(num_rows))) + column_iter_to_arrays(columns, types, field, filter) } diff --git a/crates/polars-io/src/parquet/read/options.rs b/crates/polars-io/src/parquet/read/options.rs index 26a042b42871..37357af89d67 100644 --- a/crates/polars-io/src/parquet/read/options.rs +++ b/crates/polars-io/src/parquet/read/options.rs @@ -18,6 +18,14 @@ pub enum ParallelStrategy { Columns, /// Parallelize over the row groups RowGroups, + /// First evaluates the pushed-down predicates in parallel and determines a mask of which rows + /// to read. Then, it parallelizes over both the columns and the row groups while filtering out + /// rows that do not need to be read. This can provide significant speedups for large files + /// (i.e. many row-groups) with a predicate that filters clustered rows or filters heavily. In + /// other cases, this may slow down the scan compared other strategies. + /// + /// If no predicate is given, this falls back to back to [`ParallelStrategy::Auto`]. + Prefiltered, /// Automatically determine over which unit to parallelize /// This will choose the most occurring unit. #[default] diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index b41db039bad6..0fbb760ffb30 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -3,12 +3,14 @@ use std::collections::VecDeque; use std::ops::{Deref, Range}; use arrow::array::new_empty_array; +use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::ArrowSchemaRef; use polars_core::prelude::*; use polars_core::utils::{accumulate_dataframes_vertical, split_df}; use polars_core::POOL; -use polars_parquet::read::{self, ArrayIter, FileMetaData, PhysicalType, RowGroupMetaData}; +use polars_parquet::read::{self, ArrayIter, FileMetaData, Filter, PhysicalType, RowGroupMetaData}; use polars_utils::mmap::MemSlice; +use polars_utils::vec::inplace_zip_filtermap; use rayon::prelude::*; #[cfg(feature = "cloud")] @@ -55,7 +57,7 @@ fn assert_dtypes(data_type: &ArrowDataType) { fn column_idx_to_series( column_i: usize, md: &RowGroupMetaData, - remaining_rows: usize, + filter: Option, file_schema: &ArrowSchema, store: &mmap::ColumnStore, ) -> PolarsResult { @@ -67,13 +69,9 @@ fn column_idx_to_series( } let columns = mmap_columns(store, md.columns(), &field.name); - let iter = mmap::to_deserializer(columns, field.clone(), remaining_rows)?; + let iter = mmap::to_deserializer(columns, field.clone(), filter)?; - let mut series = if remaining_rows < md.num_rows() { - array_iter_to_series(iter, field, Some(remaining_rows)) - } else { - array_iter_to_series(iter, field, None) - }?; + let mut series = array_iter_to_series(iter, field, None)?; // See if we can find some statistics for this series. If we cannot find anything just return // the series as is. @@ -167,8 +165,31 @@ fn rg_to_dfs( use_statistics: bool, hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { - if let ParallelStrategy::Columns | ParallelStrategy::None = parallel { - rg_to_dfs_optionally_par_over_columns( + use ParallelStrategy as S; + + if parallel == S::Prefiltered { + if let Some(predicate) = predicate { + if let Some(live_variables) = predicate.live_variables() { + return rg_to_dfs_prefiltered( + store, + previous_row_count, + row_group_start, + row_group_end, + file_metadata, + schema, + live_variables, + predicate, + row_index, + projection, + use_statistics, + hive_partition_columns, + ); + } + } + } + + match parallel { + S::Columns | S::None => rg_to_dfs_optionally_par_over_columns( store, previous_row_count, row_group_start, @@ -182,9 +203,8 @@ fn rg_to_dfs( projection, use_statistics, hive_partition_columns, - ) - } else { - rg_to_dfs_par_over_rg( + ), + _ => rg_to_dfs_par_over_rg( store, row_group_start, row_group_end, @@ -197,8 +217,230 @@ fn rg_to_dfs( projection, use_statistics, hive_partition_columns, - ) + ), + } +} + +#[allow(clippy::too_many_arguments)] +fn rg_to_dfs_prefiltered( + store: &mmap::ColumnStore, + previous_row_count: &mut IdxSize, + row_group_start: usize, + row_group_end: usize, + file_metadata: &FileMetaData, + schema: &ArrowSchemaRef, + live_variables: Vec>, + predicate: &dyn PhysicalIoExpr, + row_index: Option, + projection: &[usize], + use_statistics: bool, + hive_partition_columns: Option<&[Series]>, +) -> PolarsResult> { + struct RowGroupInfo { + index: u32, + row_offset: IdxSize, + } + + if row_group_end > u32::MAX as usize { + polars_bail!(ComputeError: "Parquet file contains too many row groups (> {})", u32::MAX); + } + + let mut row_offset = *previous_row_count; + let mut row_groups: Vec = (row_group_start..row_group_end) + .filter_map(|index| { + let md = &file_metadata.row_groups[index]; + + let current_offset = row_offset; + let current_row_count = md.num_rows() as IdxSize; + row_offset += current_row_count; + + if use_statistics { + match read_this_row_group(Some(predicate), &file_metadata.row_groups[index], schema) + { + Ok(false) => return None, + Ok(true) => {}, + Err(e) => return Some(Err(e)), + } + } + + Some(Ok(RowGroupInfo { + index: index as u32, + row_offset: current_offset, + })) + }) + .collect::>>()?; + + let num_live_columns = live_variables.len(); + let num_dead_columns = projection.len() - num_live_columns; + + let live_variables = live_variables + .iter() + .map(Deref::deref) + .collect::>(); + + // We create two look-up tables that map indexes offsets into the live- and dead-set onto + // column indexes of the schema. + let mut live_idx_to_col_idx = Vec::with_capacity(num_live_columns); + let mut dead_idx_to_col_idx = Vec::with_capacity(num_dead_columns); + for (i, col) in file_metadata.schema().columns().iter().enumerate() { + if live_variables.contains(col.path_in_schema[0].deref()) { + live_idx_to_col_idx.push(i); + } else { + dead_idx_to_col_idx.push(i); + } } + debug_assert_eq!(live_variables.len(), num_live_columns); + debug_assert_eq!(dead_idx_to_col_idx.len(), num_dead_columns); + + POOL.install(|| { + // Collect the data for the live columns + let mut live_columns = (0..row_groups.len() * num_live_columns) + .into_par_iter() + .map(|i| { + let col_idx = live_idx_to_col_idx[i % num_live_columns]; + let rg_idx = row_groups[i / num_live_columns].index as usize; + + let md = &file_metadata.row_groups[rg_idx]; + column_idx_to_series(col_idx, md, None, schema, store) + }) + .collect::>>()?; + + // Apply the predicate to the live columns and save the dataframe and the bitmask + let mut dfs = live_columns + .par_chunks_exact_mut(num_live_columns) + .enumerate() + .map(|(i, columns)| { + let rg = &row_groups[i]; + let rg_idx = rg.index as usize; + + let columns = columns.iter_mut().map(std::mem::take).collect::>(); + + let md = &file_metadata.row_groups[rg_idx]; + let mut df = unsafe { DataFrame::new_no_checks(columns) }; + + materialize_hive_partitions( + &mut df, + schema.as_ref(), + hive_partition_columns, + md.num_rows(), + ); + let s = predicate.evaluate_io(&df)?; + let mask = s.bool().expect("filter predicates was not of type boolean"); + + if let Some(rc) = &row_index { + df.with_row_index_mut(&rc.name, Some(rg.row_offset + rc.offset)); + } + df = df.filter(mask)?; + + let mut bitmap = MutableBitmap::with_capacity(mask.len()); + + for chunk in mask.downcast_iter() { + bitmap.extend_from_bitmap(chunk.values()); + } + + let bitmap = bitmap.freeze(); + + debug_assert_eq!(md.num_rows(), bitmap.len()); + debug_assert_eq!(df.height(), bitmap.set_bits()); + + Ok((bitmap, df)) + }) + .collect::>>()?; + + // Filter out the row-groups that do not include any rows that match the predicate. + inplace_zip_filtermap(&mut dfs, &mut row_groups, |(mask, df), rg| { + (mask.set_bits() > 0).then_some(((mask, df), rg)) + }); + + for (_, df) in &dfs { + *previous_row_count += df.height() as IdxSize; + } + + // @TODO: Incorporate this if we how we can properly use it. The problem here is that + // different columns really have a different cost when it comes to collecting them. We + // would need a cost model to properly estimate this. + // + // // For bitmasks that are seemingly random (i.e. not clustered or biased towards 0 or 1), + // // filtering with a bitmask in the Parquet reader is actually around 1.5 - 2.2 times slower + // // than collecting everything and filtering afterwards. This is because stopping and + // // starting decoding is not free. + // // + // // To combat this we try to detect here how biased our data is. We do this with a bithack + // // that estimates the amount of switches from 0 to 1 and from 1 to 0. This can be SIMD-ed + // // very well and gives us quite good estimate of how random our bitmask is. Then, we select + // // the filter if the bitmask is not that random. + // let do_filter_rg = dfs + // .par_iter() + // .map(|(mask, _)| { + // let iter = mask.fast_iter_u64(); + // + // // The iter is TrustedLen so the size_hint is exact. + // let num_items = iter.size_hint().0; + // let num_switches = iter + // .map(|v| (v ^ v.rotate_right(1)).count_ones() as u64) + // .sum::(); + // + // // We ignore the iter remainder since we only really care about the average. + // let avg_num_switches_per_element = num_switches / num_items as u64; + // + // // We select the filter if the average amount of switches per 64 elements is less + // // than or equal to 2. + // avg_num_switches_per_element <= 2 + // }) + // .collect::>(); + + let mut rg_columns = (0..dfs.len() * num_dead_columns) + .into_par_iter() + .map(|i| { + let col_idx = dead_idx_to_col_idx[i % num_dead_columns]; + let rg_idx = row_groups[i / num_dead_columns].index as usize; + + let (mask, _) = &dfs[i / num_dead_columns]; + + let md = &file_metadata.row_groups[rg_idx]; + debug_assert_eq!(md.num_rows(), mask.len()); + column_idx_to_series( + col_idx, + md, + Some(Filter::new_masked(mask.clone())), + schema, + store, + ) + }) + .collect::>>()?; + + let mut rearranged_schema: Schema = Schema::new(); + if let Some(rc) = &row_index { + rearranged_schema.insert_at_index( + 0, + SmartString::from(rc.name.deref()), + IdxType::get_dtype(), + )?; + } + for i in live_idx_to_col_idx.iter().copied() { + rearranged_schema.insert_at_index( + rearranged_schema.len(), + schema.fields[i].name.clone().into(), + schema.fields[i].data_type().into(), + )?; + } + rearranged_schema.merge(Schema::from(schema.as_ref())); + + rg_columns + .par_chunks_exact_mut(num_dead_columns) + .zip(dfs) + .map(|(rg_cols, (_, mut df))| { + let rg_cols = rg_cols.iter_mut().map(std::mem::take).collect::>(); + + // We first add the columns with the live columns at the start. Then, we do a + // projections that puts the columns at the right spot. + df._add_columns(rg_cols, &rearranged_schema)?; + let df = df.select(schema.get_names())?; + + PolarsResult::Ok(df) + }) + .collect::>>() + }) } #[allow(clippy::too_many_arguments)] @@ -243,7 +485,6 @@ fn rg_to_dfs_optionally_par_over_columns( assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err()) } - let idx_to_series_projection_height = rg_slice.0 + rg_slice.1; let columns = if let ParallelStrategy::Columns = parallel { POOL.install(|| { projection @@ -252,11 +493,10 @@ fn rg_to_dfs_optionally_par_over_columns( column_idx_to_series( *column_i, md, - idx_to_series_projection_height, + Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, ) - .map(|s| s.slice(rg_slice.0 as i64, rg_slice.1)) }) .collect::>>() })? @@ -267,11 +507,10 @@ fn rg_to_dfs_optionally_par_over_columns( column_idx_to_series( *column_i, md, - idx_to_series_projection_height, + Some(Filter::new_ranged(rg_slice.0, rg_slice.0 + rg_slice.1)), schema, store, ) - .map(|s| s.slice(rg_slice.0 as i64, rg_slice.1)) }) .collect::>>()? }; @@ -356,8 +595,13 @@ fn rg_to_dfs_par_over_rg( let columns = projection .iter() .map(|column_i| { - column_idx_to_series(*column_i, md, slice.0 + slice.1, schema, store) - .map(|x| x.slice(slice.0 as i64, slice.1)) + column_idx_to_series( + *column_i, + md, + Some(Filter::new_ranged(slice.0, slice.0 + slice.1)), + schema, + store, + ) }) .collect::>>()?; diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index be3bf0ebff03..08ad7685461c 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -7,6 +7,9 @@ pub trait PhysicalIoExpr: Send + Sync { /// as a predicate mask fn evaluate_io(&self, df: &DataFrame) -> PolarsResult; + /// Get the variables that are used in the expression i.e. live variables. + fn live_variables(&self) -> Option>>; + /// Can take &dyn Statistics and determine of a file should be /// read -> `true` /// or not -> `false` diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 19ce681d0523..2a1afc353183 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -26,6 +26,10 @@ impl PhysicalIoExpr for Wrap { }; h.evaluate_io(df) } + fn live_variables(&self) -> Option>> { + // @TODO: This should not unwrap + Some(expr_to_leaf_column_names(self.0.as_expression()?)) + } fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.0.as_stats_evaluator() } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs index 4fa516aa763f..4c17b7bd2982 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/basic.rs @@ -6,7 +6,6 @@ use arrow::array::{Array, BinaryArray, DictionaryArray, DictionaryKey, Primitive use arrow::bitmap::MutableBitmap; use arrow::datatypes::{ArrowDataType, PhysicalType}; use arrow::offset::Offset; -use polars_error::PolarsResult; use super::super::utils; use super::super::utils::extend_from_decoder; @@ -33,7 +32,7 @@ impl<'a, O: Offset> StateTranslation<'a, BinaryDecoder> for BinaryStateTransl page: &'a DataPage, dict: Option<&'a as utils::Decoder>::Dict>, page_validity: Option<&utils::PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { let is_string = matches!( page.descriptor.primitive_type.logical_type, Some(PrimitiveLogicalType::String) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs index 646eac58d2ec..27ee2a9a251c 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binary/decoders.rs @@ -150,7 +150,7 @@ impl<'a> BinaryStateTranslation<'a> { dict: Option<&'a BinaryDict>, _page_validity: Option<&PageValidity<'a>>, is_string: bool, - ) -> PolarsResult { + ) -> ParquetResult { match (page.encoding(), dict) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => { if is_string { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs index 0e1e1df8f681..c9d2f6486017 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/binview.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/binview.rs @@ -6,7 +6,6 @@ use arrow::array::{ }; use arrow::bitmap::MutableBitmap; use arrow::datatypes::{ArrowDataType, PhysicalType}; -use polars_error::PolarsResult; use super::binary::decoders::*; use super::utils::freeze_validity; @@ -30,7 +29,7 @@ impl<'a> StateTranslation<'a, BinViewDecoder> for BinaryStateTranslation<'a> { page: &'a DataPage, dict: Option<&'a ::Dict>, page_validity: Option<&PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { let is_string = matches!( page.descriptor.primitive_type.logical_type, Some(PrimitiveLogicalType::String) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs index 97af4e3aac3d..1f33da0678d6 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/boolean.rs @@ -2,7 +2,6 @@ use arrow::array::BooleanArray; use arrow::bitmap::utils::BitmapIter; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; -use polars_error::PolarsResult; use super::utils::{self, extend_from_decoder, freeze_validity, Decoder, ExactSize}; use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer; @@ -27,7 +26,7 @@ impl<'a> utils::StateTranslation<'a, BooleanDecoder> for StateTranslation<'a> { page: &'a DataPage, _dict: Option<&'a ::Dict>, page_validity: Option<&PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { let values = split_buffer(page)?.values; match page.encoding() { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs b/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs index bef857cd8291..4a7b8f740063 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/dictionary.rs @@ -3,7 +3,6 @@ use std::sync::atomic::AtomicUsize; use arrow::array::{DictionaryArray, DictionaryKey, PrimitiveArray}; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; -use polars_error::PolarsResult; use super::utils::{ self, dict_indices_decoder, extend_from_decoder, freeze_validity, BatchableCollector, Decoder, @@ -25,7 +24,7 @@ impl<'a, K: DictionaryKey, D: utils::DictDecodable> StateTranslation<'a, Diction page: &'a DataPage, _dict: Option<&'a as Decoder>::Dict>, _page_validity: Option<&PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { if !matches!( page.encoding(), Encoding::PlainDictionary | Encoding::RleDictionary @@ -33,7 +32,7 @@ impl<'a, K: DictionaryKey, D: utils::DictDecodable> StateTranslation<'a, Diction return Err(utils::not_implemented(page)); } - Ok(dict_indices_decoder(page)?) + dict_indices_decoder(page) } fn len_when_not_nullable(&self) -> usize { diff --git a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs index 6e004f56e741..747243ce26ef 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/fixed_size_binary.rs @@ -1,11 +1,8 @@ use arrow::array::{DictionaryArray, DictionaryKey, FixedSizeBinaryArray, PrimitiveArray}; use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; -use polars_error::PolarsResult; -use super::utils::{ - dict_indices_decoder, extend_from_decoder, freeze_validity, not_implemented, Decoder, -}; +use super::utils::{dict_indices_decoder, extend_from_decoder, freeze_validity, Decoder}; use crate::parquet::encoding::hybrid_rle::gatherer::HybridRleGatherer; use crate::parquet::encoding::{hybrid_rle, Encoding}; use crate::parquet::error::{ParquetError, ParquetResult}; @@ -32,7 +29,7 @@ impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> { page: &'a DataPage, dict: Option<&'a ::Dict>, _page_validity: Option<&PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { match (page.encoding(), dict) { (Encoding::Plain, _) => { let values = split_buffer(page)?.values; @@ -41,8 +38,7 @@ impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> { "Fixed size binary data length {} is not divisible by size {}", values.len(), decoder.size - )) - .into()); + ))); } Ok(Self::Plain(values, decoder.size)) }, @@ -50,7 +46,7 @@ impl<'a> utils::StateTranslation<'a, BinaryDecoder> for StateTranslation<'a> { let values = dict_indices_decoder(page)?; Ok(Self::Dictionary(values, dict)) }, - _ => Err(not_implemented(page)), + _ => Err(utils::not_implemented(page)), } } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/null.rs b/crates/polars-parquet/src/arrow/read/deserialize/null.rs index 4ca4a573c0b5..74defc1d3b74 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/null.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/null.rs @@ -4,7 +4,6 @@ use arrow::array::{Array, NullArray}; use arrow::datatypes::ArrowDataType; -use polars_error::PolarsResult; use super::utils; use super::utils::filter::Filter; @@ -31,7 +30,7 @@ impl<'a> utils::StateTranslation<'a, NullDecoder> for () { _page: &'a DataPage, _dict: Option<&'a ::Dict>, _page_validity: Option<&utils::PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { Ok(()) } diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs index 62932aad583a..ce658b764412 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/basic.rs @@ -171,7 +171,7 @@ where page: &'a DataPage, dict: Option<&'a as utils::Decoder>::Dict>, _page_validity: Option<&PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { match (page.encoding(), dict) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => { Ok(Self::Dictionary(ValuesDictionary::try_new(page, dict)?)) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs index a69f3e4adaf3..10aeb5b9f640 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/primitive/integer.rs @@ -3,7 +3,6 @@ use arrow::bitmap::MutableBitmap; use arrow::datatypes::ArrowDataType; use arrow::types::NativeType; use num_traits::AsPrimitive; -use polars_error::PolarsResult; use super::super::utils; use super::basic::{ @@ -43,7 +42,7 @@ where page: &'a DataPage, dict: Option<&'a as utils::Decoder>::Dict>, _page_validity: Option<&PageValidity<'a>>, - ) -> PolarsResult { + ) -> ParquetResult { match (page.encoding(), dict) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict)) => { Ok(Self::Dictionary(ValuesDictionary::try_new(page, dict)?)) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs index 1111cd467369..ff4cf72023d2 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/mod.rs @@ -8,7 +8,6 @@ use arrow::bitmap::{Bitmap, MutableBitmap}; use arrow::datatypes::ArrowDataType; use arrow::pushable::Pushable; use arrow::types::Offset; -use polars_error::{polars_err, PolarsError, PolarsResult}; use self::filter::Filter; use super::binary::utils::Binary; @@ -17,7 +16,7 @@ use crate::parquet::encoding::hybrid_rle::gatherer::{ HybridRleGatherer, ZeroCount, ZeroCountGatherer, }; use crate::parquet::encoding::hybrid_rle::{self, HybridRleDecoder, Translator}; -use crate::parquet::error::ParquetResult; +use crate::parquet::error::{ParquetError, ParquetResult}; use crate::parquet::page::{split_buffer, DataPage, DictPage}; use crate::parquet::schema::Repetition; @@ -35,7 +34,7 @@ pub(crate) trait StateTranslation<'a, D: Decoder>: Sized { page: &'a DataPage, dict: Option<&'a D::Dict>, page_validity: Option<&PageValidity<'a>>, - ) -> PolarsResult; + ) -> ParquetResult; fn len_when_not_nullable(&self) -> usize; fn skip_in_place(&mut self, n: usize) -> ParquetResult<()>; @@ -51,7 +50,7 @@ pub(crate) trait StateTranslation<'a, D: Decoder>: Sized { } impl<'a, D: Decoder> State<'a, D> { - pub fn new(decoder: &D, page: &'a DataPage, dict: Option<&'a D::Dict>) -> PolarsResult { + pub fn new(decoder: &D, page: &'a DataPage, dict: Option<&'a D::Dict>) -> ParquetResult { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; @@ -71,7 +70,7 @@ impl<'a, D: Decoder> State<'a, D> { decoder: &D, page: &'a DataPage, dict: Option<&'a D::Dict>, - ) -> PolarsResult { + ) -> ParquetResult { let translation = D::Translation::new(decoder, page, dict, None)?; Ok(Self { @@ -150,7 +149,7 @@ impl<'a, D: Decoder> State<'a, D> { num_ones, )?; - if self.len() == 0 { + if iter.num_remaining() == 0 || self.len() == 0 { break; } @@ -170,18 +169,16 @@ impl<'a, D: Decoder> State<'a, D> { } } -pub fn not_implemented(page: &DataPage) -> PolarsError { +pub fn not_implemented(page: &DataPage) -> ParquetError { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; let is_filtered = page.selected_rows().is_some(); let required = if is_optional { "optional" } else { "required" }; let is_filtered = if is_filtered { ", index-filtered" } else { "" }; - polars_err!(ComputeError: - "Decoding {:?} \"{:?}\"-encoded {} {} parquet pages not yet implemented", + ParquetError::not_supported(format!( + "Decoding {:?} \"{:?}\"-encoded {required}{is_filtered} parquet pages not yet supported", page.descriptor.primitive_type.physical_type, page.encoding(), - required, - is_filtered, - ) + )) } pub trait BatchableCollector { diff --git a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs index a85616a6c300..ce7fa301a7b4 100644 --- a/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs +++ b/crates/polars-parquet/src/parquet/encoding/bitpacked/decode.rs @@ -63,6 +63,10 @@ impl<'a, T: Unpackable> Decoder<'a, T> { DecoderIter { buffer, idx: 0 } } + pub fn num_bits(&self) -> usize { + self.num_bits + } + /// Returns a [`Decoder`] with `T` encoded in `packed` with `num_bits`. pub fn try_new(packed: &'a [u8], num_bits: usize, length: usize) -> Result { let block_size = std::mem::size_of::() * num_bits; @@ -113,12 +117,8 @@ impl<'a, 'b, T: Unpackable> Iterator for ChunkedDecoder<'a, 'b, T> { } fn size_hint(&self) -> (usize, Option) { - let is_exact = self.decoder.len() % T::Unpacked::LENGTH == 0; - let (low, high) = self.decoder.packed.size_hint(); - - let delta = usize::from(!is_exact); - - (low - delta, high.map(|h| h - delta)) + let len = self.decoder.len() / T::Unpacked::LENGTH; + (len, Some(len)) } } @@ -160,7 +160,10 @@ impl<'a, T: Unpackable> Decoder<'a, T> { } pub fn skip_chunks(&mut self, n: usize) { + debug_assert!(n * T::Unpacked::LENGTH <= self.length); + for _ in (&mut self.packed).take(n) {} + self.length -= n * T::Unpacked::LENGTH; } pub fn take(&mut self) -> Self { @@ -169,8 +172,6 @@ impl<'a, T: Unpackable> Decoder<'a, T> { let length = self.length; self.length = 0; - debug_assert_eq!(self.len(), 0); - Self { packed, num_bits: self.num_bits, diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/buffered.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/buffered.rs index 3908b5b35af5..824638d253ad 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/buffered.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/buffered.rs @@ -163,6 +163,7 @@ impl<'a> BufferedBitpacked<'a> { let unpacked_offset = n % ::Unpacked::LENGTH; self.decoder.skip_chunks(num_chunks); let (unpacked, unpacked_length) = self.decoder.chunked().next_inexact().unwrap(); + debug_assert!(unpacked_offset < unpacked_length); self.unpacked = unpacked; self.unpacked_start = unpacked_offset; @@ -171,7 +172,12 @@ impl<'a> BufferedBitpacked<'a> { return unpacked_num_elements + n; } - self.decoder.len() + unpacked_num_elements + // We skip the entire decoder. Essentially, just zero it out. + let decoder = self.decoder.take(); + self.unpacked_start = 0; + self.unpacked_end = 0; + + decoder.len() + unpacked_num_elements } } @@ -262,7 +268,12 @@ impl<'a> HybridRleBuffered<'a> { }; debug_assert!(num_skipped <= n); - debug_assert_eq!(num_skipped, start_length - self.len()); + debug_assert_eq!( + num_skipped, + start_length - self.len(), + "{self:?}: {num_skipped} != {start_length} - {}", + self.len() + ); num_skipped } diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs index 23983104d1f2..f4a980fb5062 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/fuzz.rs @@ -329,3 +329,62 @@ fn small_fuzz() -> ParquetResult<()> { fn large_fuzz() -> ParquetResult<()> { fuzz_loops(1_000_000) } + +#[test] +#[ignore = "Large fuzz test. Too slow"] +fn skip_fuzz() -> ParquetResult<()> { + let mut rng = rand::thread_rng(); + + const MAX_LENGTH: usize = 10_000; + + let mut encoded = Vec::with_capacity(10000); + + let mut bs: Vec = Vec::with_capacity(MAX_LENGTH); + let mut skips: VecDeque = VecDeque::with_capacity(2000); + + let num_loops = 100_000; + + for _ in 0..num_loops { + skips.clear(); + bs.clear(); + + let num_bits = rng.gen_range(0..=32); + let mask = 1u32.wrapping_shl(num_bits).wrapping_sub(1); + + let length = rng.gen_range(1..=MAX_LENGTH); + + unsafe { bs.set_len(length) }; + rng.fill(&mut bs[..]); + + let mut filled = 0; + while filled < bs.len() { + if rng.gen() { + let num_repeats = rng.gen_range(0..=(bs.len() - filled)); + let value = bs[filled] & mask; + for j in 0..num_repeats { + bs[filled + j] = value; + } + filled += num_repeats; + } else { + bs[filled] &= mask; + filled += 1; + } + } + + let mut num_done = 0; + while num_done < filled { + let num_skip = rng.gen_range(1..=filled - num_done); + num_done += num_skip; + skips.push_back(num_skip); + } + + encoder::encode(&mut encoded, bs.iter().copied(), num_bits).unwrap(); + let mut decoder = HybridRleDecoder::new(&encoded, num_bits, filled); + + for s in &skips { + decoder.skip_in_place(*s).unwrap(); + } + } + + Ok(()) +} diff --git a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs index 4ee48ef18e81..c71b6455ddf4 100644 --- a/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs +++ b/crates/polars-parquet/src/parquet/encoding/hybrid_rle/mod.rs @@ -399,7 +399,7 @@ impl<'a> HybridRleDecoder<'a> { debug_assert_eq!(num_skipped, start_num_values - self.num_values); debug_assert!(num_skipped <= n, "{num_skipped} <= {n}"); - debug_assert!(num_skipped > 0, "{num_skipped} > 0"); + debug_assert!(indicator >> 1 == 0 || num_skipped > 0); n -= num_skipped; } diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 1c967d9e3044..1603de3729fa 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -30,6 +30,10 @@ impl PhysicalIoExpr for Len { fn evaluate_io(&self, _df: &DataFrame) -> PolarsResult { unimplemented!() } + + fn live_variables(&self) -> Option>> { + Some(vec![]) + } } impl PhysicalPipedExpr for Len { fn evaluate(&self, chunk: &DataChunk, _lazy_state: &ExecutionState) -> PolarsResult { diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index b5f32b185311..799e956c1378 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -130,6 +130,11 @@ where fn evaluate_io(&self, df: &DataFrame) -> PolarsResult { self.p.evaluate_io(df) } + + fn live_variables(&self) -> Option>> { + None + } + fn as_stats_evaluator(&self) -> Option<&dyn StatsEvaluator> { self.p.as_stats_evaluator() } diff --git a/crates/polars-plan/src/plans/optimizer/cluster_with_columns.rs b/crates/polars-plan/src/plans/optimizer/cluster_with_columns.rs index 2a7548468d2b..160a9cbdbc0f 100644 --- a/crates/polars-plan/src/plans/optimizer/cluster_with_columns.rs +++ b/crates/polars-plan/src/plans/optimizer/cluster_with_columns.rs @@ -4,6 +4,7 @@ use arrow::bitmap::MutableBitmap; use polars_core::schema::Schema; use polars_utils::aliases::{InitHashMaps, PlHashMap}; use polars_utils::arena::{Arena, Node}; +use polars_utils::vec::inplace_zip_filtermap; use super::aexpr::AExpr; use super::ir::IR; @@ -299,84 +300,3 @@ pub fn optimize(root: Node, lp_arena: &mut Arena, expr_arena: &Arena) } } } - -/// Perform a inplace `filtermap` over two vectors at the same time. -fn inplace_zip_filtermap( - x: &mut Vec, - y: &mut Vec, - mut f: impl FnMut(T, U) -> Option<(T, U)>, -) { - assert_eq!(x.len(), y.len()); - - let length = x.len(); - - struct OwnedBuffer { - end: *mut T, - length: usize, - } - - impl Drop for OwnedBuffer { - fn drop(&mut self) { - for i in 0..self.length { - unsafe { self.end.wrapping_sub(i + 1).read() }; - } - } - } - - let x_ptr = x.as_mut_ptr(); - let y_ptr = y.as_mut_ptr(); - - let mut x_buf = OwnedBuffer { - end: x_ptr.wrapping_add(length), - length, - }; - let mut y_buf = OwnedBuffer { - end: y_ptr.wrapping_add(length), - length, - }; - - // SAFETY: All items are now owned by `x_buf` and `y_buf`. Since we know that `x_buf` and - // `y_buf` will be dropped before the vecs representing `x` and `y`, this is safe. - unsafe { - x.set_len(0); - y.set_len(0); - } - - // SAFETY: - // - // We know we have a exclusive reference to x and y. - // - // We know that `i` is always smaller than `x.len()` and `y.len()`. Furthermore, we also know - // that `i - num_deleted > 0`. - // - // Items are dropped exactly once, even if `f` panics. - for i in 0..length { - let xi = unsafe { x_ptr.wrapping_add(i).read() }; - let yi = unsafe { y_ptr.wrapping_add(i).read() }; - - x_buf.length -= 1; - y_buf.length -= 1; - - // We hold the invariant here that all items that are not yet deleted are either in - // - `xi` or `yi` - // - `x_buf` or `y_buf` - // ` `x` or `y` - // - // This way if `f` ever panics, we are sure that all items are dropped exactly once. - // Deleted items will be dropped when they are deleted. - let result = f(xi, yi); - - if let Some((xi, yi)) = result { - x.push(xi); - y.push(yi); - } - } - - debug_assert_eq!(x_buf.length, 0); - debug_assert_eq!(y_buf.length, 0); - - // We are safe to forget `x_buf` and `y_buf` here since they will not deallocate anything - // anymore. - std::mem::forget(x_buf); - std::mem::forget(y_buf); -} diff --git a/crates/polars-utils/src/vec.rs b/crates/polars-utils/src/vec.rs index 54eb8c1277e0..108e7d573d1c 100644 --- a/crates/polars-utils/src/vec.rs +++ b/crates/polars-utils/src/vec.rs @@ -97,3 +97,84 @@ impl ConvertVec for Vec { self.iter().map(f).collect() } } + +/// Perform an in-place `Iterator::filter_map` over two vectors at the same time. +pub fn inplace_zip_filtermap( + x: &mut Vec, + y: &mut Vec, + mut f: impl FnMut(T, U) -> Option<(T, U)>, +) { + assert_eq!(x.len(), y.len()); + + let length = x.len(); + + struct OwnedBuffer { + end: *mut T, + length: usize, + } + + impl Drop for OwnedBuffer { + fn drop(&mut self) { + for i in 0..self.length { + unsafe { self.end.wrapping_sub(i + 1).read() }; + } + } + } + + let x_ptr = x.as_mut_ptr(); + let y_ptr = y.as_mut_ptr(); + + let mut x_buf = OwnedBuffer { + end: x_ptr.wrapping_add(length), + length, + }; + let mut y_buf = OwnedBuffer { + end: y_ptr.wrapping_add(length), + length, + }; + + // SAFETY: All items are now owned by `x_buf` and `y_buf`. Since we know that `x_buf` and + // `y_buf` will be dropped before the vecs representing `x` and `y`, this is safe. + unsafe { + x.set_len(0); + y.set_len(0); + } + + // SAFETY: + // + // We know we have a exclusive reference to x and y. + // + // We know that `i` is always smaller than `x.len()` and `y.len()`. Furthermore, we also know + // that `i - num_deleted > 0`. + // + // Items are dropped exactly once, even if `f` panics. + for i in 0..length { + let xi = unsafe { x_ptr.wrapping_add(i).read() }; + let yi = unsafe { y_ptr.wrapping_add(i).read() }; + + x_buf.length -= 1; + y_buf.length -= 1; + + // We hold the invariant here that all items that are not yet deleted are either in + // - `xi` or `yi` + // - `x_buf` or `y_buf` + // ` `x` or `y` + // + // This way if `f` ever panics, we are sure that all items are dropped exactly once. + // Deleted items will be dropped when they are deleted. + let result = f(xi, yi); + + if let Some((xi, yi)) = result { + x.push(xi); + y.push(yi); + } + } + + debug_assert_eq!(x_buf.length, 0); + debug_assert_eq!(y_buf.length, 0); + + // We are safe to forget `x_buf` and `y_buf` here since they will not deallocate anything + // anymore. + std::mem::forget(x_buf); + std::mem::forget(y_buf); +} diff --git a/py-polars/polars/_typing.py b/py-polars/polars/_typing.py index 2cd9c3955085..428c13da0e96 100644 --- a/py-polars/polars/_typing.py +++ b/py-polars/polars/_typing.py @@ -113,7 +113,9 @@ def __arrow_c_stream__(self, requested_schema: object | None = None) -> object: Label: TypeAlias = Literal["left", "right", "datapoint"] NonExistent: TypeAlias = Literal["raise", "null"] NullBehavior: TypeAlias = Literal["ignore", "drop"] -ParallelStrategy: TypeAlias = Literal["auto", "columns", "row_groups", "none"] +ParallelStrategy: TypeAlias = Literal[ + "auto", "columns", "row_groups", "prefiltered", "none" +] ParquetCompression: TypeAlias = Literal[ "lz4", "uncompressed", "snappy", "gzip", "lzo", "brotli", "zstd" ] diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index 99084afa38d0..90b6137c4924 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -332,9 +332,25 @@ def scan_parquet( DataFrame row_index_offset Offset to start the row index column (only used if the name is set) - parallel : {'auto', 'columns', 'row_groups', 'none'} - This determines the direction of parallelism. 'auto' will try to determine the - optimal direction. + parallel : {'auto', 'columns', 'row_groups', 'prefiltered', 'none'} + This determines the direction and strategy of parallelism. 'auto' will + try to determine the optimal direction. + + The `prefiltered` strategy first evaluates the pushed-down predicates in + parallel and determines a mask of which rows to read. Then, it + parallelizes over both the columns and the row groups while filtering + out rows that do not need to be read. This can provide significant + speedups for large files (i.e. many row-groups) with a predicate that + filters clustered rows or filters heavily. In other cases, + `prefiltered` may slow down the scan compared other strategies. + + The `prefiltered` settings falls back to `auto` if no predicate is + given. + + .. warning:: + The `prefiltered` strategy is considered **unstable**. It may be + changed at any point without it being considered a breaking change. + use_statistics Use statistics in the parquet to determine if pages can be skipped from reading. diff --git a/py-polars/src/conversion/mod.rs b/py-polars/src/conversion/mod.rs index 6056103b3251..c179729514ab 100644 --- a/py-polars/src/conversion/mod.rs +++ b/py-polars/src/conversion/mod.rs @@ -871,10 +871,11 @@ impl<'py> FromPyObject<'py> for Wrap { "auto" => ParallelStrategy::Auto, "columns" => ParallelStrategy::Columns, "row_groups" => ParallelStrategy::RowGroups, + "prefiltered" => ParallelStrategy::Prefiltered, "none" => ParallelStrategy::None, v => { return Err(PyValueError::new_err(format!( - "`parallel` must be one of {{'auto', 'columns', 'row_groups', 'none'}}, got {v}", + "`parallel` must be one of {{'auto', 'columns', 'row_groups', 'prefiltered', 'none'}}, got {v}", ))) }, };