Skip to content

Commit

Permalink
fully working with parquet and multifile
Browse files Browse the repository at this point in the history
[skip ci]
  • Loading branch information
coastalwhite committed Jan 22, 2025
1 parent 8deaea1 commit 1716ad7
Show file tree
Hide file tree
Showing 21 changed files with 518 additions and 228 deletions.
6 changes: 3 additions & 3 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::cloud::{
};
use crate::parquet::metadata::FileMetadataRef;
use crate::pl_async::get_runtime;
use crate::predicates::PhysicalIoExpr;
use crate::predicates::IOPredicate;

type DownloadedRowGroup = PlHashMap<u64, Bytes>;
type QueuePayload = (usize, DownloadedRowGroup);
Expand Down Expand Up @@ -231,7 +231,7 @@ impl FetchRowGroupsFromObjectStore {
reader: ParquetObjectStore,
schema: ArrowSchemaRef,
projection: Option<&[usize]>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<IOPredicate>,
row_group_range: Range<usize>,
row_groups: &[RowGroupMetadata],
) -> PolarsResult<Self> {
Expand All @@ -244,7 +244,7 @@ impl FetchRowGroupsFromObjectStore {

let mut prefetched: PlHashMap<usize, DownloadedRowGroup> = PlHashMap::new();

let mut row_groups = if let Some(pred) = predicate.as_deref() {
let mut row_groups = if let Some(pred) = predicate.as_ref() {
row_group_range
.filter_map(|i| {
let rg = &row_groups[i];
Expand Down
25 changes: 20 additions & 5 deletions crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use polars_core::prelude::*;
use polars_parquet::read::statistics::{deserialize, Statistics};
use polars_parquet::read::RowGroupMetadata;

use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr};
use crate::predicates::{BatchStats, ColumnStats, IOPredicate};

/// Collect the statistics in a row-group
pub(crate) fn collect_statistics(
Expand Down Expand Up @@ -58,7 +58,7 @@ pub(crate) fn collect_statistics(
}

pub fn read_this_row_group(
predicate: Option<&dyn PhysicalIoExpr>,
predicate: Option<&IOPredicate>,
md: &RowGroupMetadata,
schema: &ArrowSchema,
) -> PolarsResult<bool> {
Expand All @@ -69,16 +69,31 @@ pub fn read_this_row_group(
let mut should_read = true;

if let Some(pred) = predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(pred) = &pred.skip_batch_predicate {
if let Some(stats) = collect_statistics(md, schema)? {
let pred_result = pred.should_read(&stats);
let stats = PlIndexMap::from_iter(stats.column_stats().iter().map(|col| {
(
col.field_name().clone(),
crate::predicates::ColumnStatistics {
dtype: stats.schema().get(col.field_name()).unwrap().clone(),
min: col
.to_min()
.map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()),
max: col
.to_max()
.map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()),
null_count: col.null_count().map(|nc| nc as IdxSize),
},
)
}));
let pred_result = pred.can_skip_batch(md.num_rows() as IdxSize, stats);

// a parquet file may not have statistics of all columns
match pred_result {
Err(PolarsError::ColumnNotFound(errstr)) => {
return Err(PolarsError::ColumnNotFound(errstr))
},
Ok(false) => should_read = false,
Ok(true) => should_read = false,
_ => {},
}
}
Expand Down
38 changes: 16 additions & 22 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::hive::{self, materialize_hive_partitions};
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::metadata::FileMetadataRef;
use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR;
use crate::predicates::{apply_predicate, PhysicalIoExpr};
use crate::predicates::{apply_predicate, IOPredicate};
use crate::utils::get_reader_bytes;
use crate::utils::slice::split_slice_at_file;
use crate::RowIndex;
Expand Down Expand Up @@ -137,7 +137,7 @@ fn rg_to_dfs(
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
predicate: Option<&IOPredicate>,
row_index: Option<RowIndex>,
parallel: ParallelStrategy,
projection: &[usize],
Expand Down Expand Up @@ -168,17 +168,14 @@ fn rg_to_dfs(

if parallel == S::Prefiltered {
if let Some(predicate) = predicate {
let mut live_columns = PlIndexSet::new();
predicate.collect_live_columns(&mut live_columns);
if !live_columns.is_empty() {
if !predicate.live_columns.is_empty() {
return rg_to_dfs_prefiltered(
store,
previous_row_count,
row_group_start,
row_group_end,
file_metadata,
schema,
live_columns,
predicate,
row_index,
projection,
Expand Down Expand Up @@ -242,8 +239,7 @@ fn rg_to_dfs_prefiltered(
row_group_end: usize,
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
live_columns: PlIndexSet<PlSmallStr>,
predicate: &dyn PhysicalIoExpr,
predicate: &IOPredicate,
row_index: Option<RowIndex>,
projection: &[usize],
use_statistics: bool,
Expand All @@ -270,7 +266,7 @@ fn rg_to_dfs_prefiltered(
};

// Get the number of live columns
let num_live_columns = live_columns.len();
let num_live_columns = predicate.live_columns.len();
let num_dead_columns =
projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns;

Expand All @@ -286,7 +282,7 @@ fn rg_to_dfs_prefiltered(
for &i in projection.iter() {
let name = schema.get_at_index(i).unwrap().0.as_str();

if live_columns.contains(name) {
if predicate.live_columns.contains(name) {
live_idx_to_col_idx.push(i);
} else {
dead_idx_to_col_idx.push(i);
Expand Down Expand Up @@ -353,7 +349,7 @@ fn rg_to_dfs_prefiltered(
hive_partition_columns,
md.num_rows(),
);
let s = predicate.evaluate_io(&df)?;
let s = predicate.expr.evaluate_io(&df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");

// Create without hive columns - the first merge phase does not handle hive partitions. This also saves
Expand Down Expand Up @@ -540,7 +536,7 @@ fn rg_to_dfs_optionally_par_over_columns(
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
predicate: Option<&IOPredicate>,
row_index: Option<RowIndex>,
parallel: ParallelStrategy,
projection: &[usize],
Expand Down Expand Up @@ -646,7 +642,7 @@ fn rg_to_dfs_optionally_par_over_columns(
}

materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1);
apply_predicate(&mut df, predicate, true)?;
apply_predicate(&mut df, predicate.as_ref().map(|p| p.expr.as_ref()), true)?;

*previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(||
polars_err!(
Expand Down Expand Up @@ -675,7 +671,7 @@ fn rg_to_dfs_par_over_rg(
slice: (usize, usize),
file_metadata: &FileMetadata,
schema: &ArrowSchemaRef,
predicate: Option<&dyn PhysicalIoExpr>,
predicate: Option<&IOPredicate>,
row_index: Option<RowIndex>,
projection: &[usize],
use_statistics: bool,
Expand Down Expand Up @@ -785,7 +781,7 @@ fn rg_to_dfs_par_over_rg(
hive_partition_columns,
slice.1,
);
apply_predicate(&mut df, predicate, false)?;
apply_predicate(&mut df, predicate.as_ref().map(|p| p.expr.as_ref()), false)?;

Ok(Some(df))
})
Expand All @@ -801,7 +797,7 @@ pub fn read_parquet<R: MmapBytesReader>(
projection: Option<&[usize]>,
reader_schema: &ArrowSchemaRef,
metadata: Option<FileMetadataRef>,
predicate: Option<&dyn PhysicalIoExpr>,
predicate: Option<&IOPredicate>,
mut parallel: ParallelStrategy,
row_index: Option<RowIndex>,
use_statistics: bool,
Expand Down Expand Up @@ -846,9 +842,7 @@ pub fn read_parquet<R: MmapBytesReader>(
let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER");
let prefilter_env = prefilter_env.as_deref();

let mut live_columns = PlIndexSet::new();
predicate.collect_live_columns(&mut live_columns);
let num_live_variables = live_columns.len();
let num_live_variables = predicate.live_columns.len();
let mut do_prefilter = false;

do_prefilter |= prefilter_env == Ok("1"); // Force enable
Expand Down Expand Up @@ -1012,7 +1006,7 @@ pub struct BatchedParquetReader {
projection: Arc<[usize]>,
schema: ArrowSchemaRef,
metadata: FileMetadataRef,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<IOPredicate>,
row_index: Option<RowIndex>,
rows_read: IdxSize,
row_group_offset: usize,
Expand All @@ -1035,7 +1029,7 @@ impl BatchedParquetReader {
schema: ArrowSchemaRef,
slice: (usize, usize),
projection: Option<Vec<usize>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<IOPredicate>,
row_index: Option<RowIndex>,
chunk_size: usize,
use_statistics: bool,
Expand Down Expand Up @@ -1153,7 +1147,7 @@ impl BatchedParquetReader {
slice,
&metadata,
&schema,
predicate.as_deref(),
predicate.as_ref(),
row_index,
parallel,
&projection,
Expand Down
12 changes: 6 additions & 6 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_pr
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
use crate::parquet::metadata::FileMetadataRef;
use crate::predicates::PhysicalIoExpr;
use crate::predicates::IOPredicate;
use crate::prelude::*;
use crate::RowIndex;

Expand All @@ -37,7 +37,7 @@ pub struct ParquetReader<R: Read + Seek> {
row_index: Option<RowIndex>,
low_memory: bool,
metadata: Option<FileMetadataRef>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<IOPredicate>,
hive_partition_columns: Option<Vec<Series>>,
include_file_path: Option<(PlSmallStr, Arc<str>)>,
use_statistics: bool,
Expand Down Expand Up @@ -189,7 +189,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
Ok(self.metadata.as_ref().unwrap())
}

pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
pub fn with_predicate(mut self, predicate: Option<IOPredicate>) -> Self {
self.predicate = predicate;
self
}
Expand Down Expand Up @@ -261,7 +261,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
self.projection.as_deref(),
&schema,
Some(metadata),
self.predicate.as_deref(),
self.predicate.as_ref(),
self.parallel,
self.row_index,
self.use_statistics,
Expand Down Expand Up @@ -297,7 +297,7 @@ pub struct ParquetAsyncReader {
slice: (usize, usize),
rechunk: bool,
projection: Option<Vec<usize>>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
predicate: Option<IOPredicate>,
row_index: Option<RowIndex>,
use_statistics: bool,
hive_partition_columns: Option<Vec<Series>>,
Expand Down Expand Up @@ -423,7 +423,7 @@ impl ParquetAsyncReader {
self
}

pub fn with_predicate(mut self, predicate: Option<Arc<dyn PhysicalIoExpr>>) -> Self {
pub fn with_predicate(mut self, predicate: Option<IOPredicate>) -> Self {
self.predicate = predicate;
self
}
Expand Down
22 changes: 22 additions & 0 deletions crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,28 @@ fn use_min_max(dtype: &DataType) -> bool {
)
}

pub struct ColumnStatistics {
pub dtype: DataType,
pub min: AnyValue<'static>,
pub max: AnyValue<'static>,
pub null_count: Option<IdxSize>,
}

pub trait SkipBatchPredicate: Send + Sync {
fn can_skip_batch(
&self,
batch_size: IdxSize,
statistics: PlIndexMap<PlSmallStr, ColumnStatistics>,
) -> PolarsResult<bool>;
}

#[derive(Clone)]
pub struct IOPredicate {
pub expr: Arc<dyn PhysicalIoExpr>,
pub live_columns: Arc<PlIndexSet<PlSmallStr>>,
pub skip_batch_predicate: Option<Arc<dyn SkipBatchPredicate>>,
}

/// A collection of column stats with a known schema.
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
#[derive(Debug, Clone)]
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ impl LazyFrame {
} else {
true
};
let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &expr_arena)?;
let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;

let state = ExecutionState::new();
Ok((state, physical_plan, no_file_sink))
Expand Down Expand Up @@ -738,7 +738,7 @@ impl LazyFrame {
let mut physical_plan = create_physical_plan(
alp_plan.lp_top,
&mut alp_plan.lp_arena,
&alp_plan.expr_arena,
&mut alp_plan.expr_arena,
)?;
let mut state = ExecutionState::new();
physical_plan.execute(&mut state)
Expand Down
Loading

0 comments on commit 1716ad7

Please sign in to comment.