diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index bf35337a313d..d1c50fa9010d 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -19,7 +19,7 @@ use crate::cloud::{ }; use crate::parquet::metadata::FileMetadataRef; use crate::pl_async::get_runtime; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::IOPredicate; type DownloadedRowGroup = PlHashMap; type QueuePayload = (usize, DownloadedRowGroup); @@ -231,7 +231,7 @@ impl FetchRowGroupsFromObjectStore { reader: ParquetObjectStore, schema: ArrowSchemaRef, projection: Option<&[usize]>, - predicate: Option>, + predicate: Option, row_group_range: Range, row_groups: &[RowGroupMetadata], ) -> PolarsResult { @@ -244,7 +244,7 @@ impl FetchRowGroupsFromObjectStore { let mut prefetched: PlHashMap = PlHashMap::new(); - let mut row_groups = if let Some(pred) = predicate.as_deref() { + let mut row_groups = if let Some(pred) = predicate.as_ref() { row_group_range .filter_map(|i| { let rg = &row_groups[i]; diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index ea3bc7efb3e4..3861047a6686 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -3,7 +3,7 @@ use polars_core::prelude::*; use polars_parquet::read::statistics::{deserialize, Statistics}; use polars_parquet::read::RowGroupMetadata; -use crate::predicates::{BatchStats, ColumnStats, PhysicalIoExpr}; +use crate::predicates::{BatchStats, ColumnStats, IOPredicate}; /// Collect the statistics in a row-group pub(crate) fn collect_statistics( @@ -58,7 +58,7 @@ pub(crate) fn collect_statistics( } pub fn read_this_row_group( - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&IOPredicate>, md: &RowGroupMetadata, schema: &ArrowSchema, ) -> PolarsResult { @@ -69,16 +69,31 @@ pub fn read_this_row_group( let mut should_read = true; if let Some(pred) = predicate { - if let Some(pred) = pred.as_stats_evaluator() { + if let Some(pred) = &pred.skip_batch_predicate { if let Some(stats) = collect_statistics(md, schema)? { - let pred_result = pred.should_read(&stats); + let stats = PlIndexMap::from_iter(stats.column_stats().iter().map(|col| { + ( + col.field_name().clone(), + crate::predicates::ColumnStatistics { + dtype: stats.schema().get(col.field_name()).unwrap().clone(), + min: col + .to_min() + .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()), + max: col + .to_max() + .map_or(AnyValue::Null, |s| s.get(0).unwrap().into_static()), + null_count: col.null_count().map(|nc| nc as IdxSize), + }, + ) + })); + let pred_result = pred.can_skip_batch(md.num_rows() as IdxSize, stats); // a parquet file may not have statistics of all columns match pred_result { Err(PolarsError::ColumnNotFound(errstr)) => { return Err(PolarsError::ColumnNotFound(errstr)) }, - Ok(false) => should_read = false, + Ok(true) => should_read = false, _ => {}, } } diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index e80ef5595a72..3889fd713235 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -23,7 +23,7 @@ use crate::hive::{self, materialize_hive_partitions}; use crate::mmap::{MmapBytesReader, ReaderBytes}; use crate::parquet::metadata::FileMetadataRef; use crate::parquet::read::ROW_COUNT_OVERFLOW_ERR; -use crate::predicates::{apply_predicate, PhysicalIoExpr}; +use crate::predicates::{apply_predicate, IOPredicate}; use crate::utils::get_reader_bytes; use crate::utils::slice::split_slice_at_file; use crate::RowIndex; @@ -137,7 +137,7 @@ fn rg_to_dfs( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&IOPredicate>, row_index: Option, parallel: ParallelStrategy, projection: &[usize], @@ -168,9 +168,7 @@ fn rg_to_dfs( if parallel == S::Prefiltered { if let Some(predicate) = predicate { - let mut live_columns = PlIndexSet::new(); - predicate.collect_live_columns(&mut live_columns); - if !live_columns.is_empty() { + if !predicate.live_columns.is_empty() { return rg_to_dfs_prefiltered( store, previous_row_count, @@ -178,7 +176,6 @@ fn rg_to_dfs( row_group_end, file_metadata, schema, - live_columns, predicate, row_index, projection, @@ -242,8 +239,7 @@ fn rg_to_dfs_prefiltered( row_group_end: usize, file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - live_columns: PlIndexSet, - predicate: &dyn PhysicalIoExpr, + predicate: &IOPredicate, row_index: Option, projection: &[usize], use_statistics: bool, @@ -270,7 +266,7 @@ fn rg_to_dfs_prefiltered( }; // Get the number of live columns - let num_live_columns = live_columns.len(); + let num_live_columns = predicate.live_columns.len(); let num_dead_columns = projection.len() + hive_partition_columns.map_or(0, |x| x.len()) - num_live_columns; @@ -286,7 +282,7 @@ fn rg_to_dfs_prefiltered( for &i in projection.iter() { let name = schema.get_at_index(i).unwrap().0.as_str(); - if live_columns.contains(name) { + if predicate.live_columns.contains(name) { live_idx_to_col_idx.push(i); } else { dead_idx_to_col_idx.push(i); @@ -353,7 +349,7 @@ fn rg_to_dfs_prefiltered( hive_partition_columns, md.num_rows(), ); - let s = predicate.evaluate_io(&df)?; + let s = predicate.expr.evaluate_io(&df)?; let mask = s.bool().expect("filter predicates was not of type boolean"); // Create without hive columns - the first merge phase does not handle hive partitions. This also saves @@ -540,7 +536,7 @@ fn rg_to_dfs_optionally_par_over_columns( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&IOPredicate>, row_index: Option, parallel: ParallelStrategy, projection: &[usize], @@ -646,7 +642,7 @@ fn rg_to_dfs_optionally_par_over_columns( } materialize_hive_partitions(&mut df, schema.as_ref(), hive_partition_columns, rg_slice.1); - apply_predicate(&mut df, predicate, true)?; + apply_predicate(&mut df, predicate.as_ref().map(|p| p.expr.as_ref()), true)?; *previous_row_count = previous_row_count.checked_add(current_row_count).ok_or_else(|| polars_err!( @@ -675,7 +671,7 @@ fn rg_to_dfs_par_over_rg( slice: (usize, usize), file_metadata: &FileMetadata, schema: &ArrowSchemaRef, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&IOPredicate>, row_index: Option, projection: &[usize], use_statistics: bool, @@ -785,7 +781,7 @@ fn rg_to_dfs_par_over_rg( hive_partition_columns, slice.1, ); - apply_predicate(&mut df, predicate, false)?; + apply_predicate(&mut df, predicate.as_ref().map(|p| p.expr.as_ref()), false)?; Ok(Some(df)) }) @@ -801,7 +797,7 @@ pub fn read_parquet( projection: Option<&[usize]>, reader_schema: &ArrowSchemaRef, metadata: Option, - predicate: Option<&dyn PhysicalIoExpr>, + predicate: Option<&IOPredicate>, mut parallel: ParallelStrategy, row_index: Option, use_statistics: bool, @@ -846,9 +842,7 @@ pub fn read_parquet( let prefilter_env = std::env::var("POLARS_PARQUET_PREFILTER"); let prefilter_env = prefilter_env.as_deref(); - let mut live_columns = PlIndexSet::new(); - predicate.collect_live_columns(&mut live_columns); - let num_live_variables = live_columns.len(); + let num_live_variables = predicate.live_columns.len(); let mut do_prefilter = false; do_prefilter |= prefilter_env == Ok("1"); // Force enable @@ -1012,7 +1006,7 @@ pub struct BatchedParquetReader { projection: Arc<[usize]>, schema: ArrowSchemaRef, metadata: FileMetadataRef, - predicate: Option>, + predicate: Option, row_index: Option, rows_read: IdxSize, row_group_offset: usize, @@ -1035,7 +1029,7 @@ impl BatchedParquetReader { schema: ArrowSchemaRef, slice: (usize, usize), projection: Option>, - predicate: Option>, + predicate: Option, row_index: Option, chunk_size: usize, use_statistics: bool, @@ -1153,7 +1147,7 @@ impl BatchedParquetReader { slice, &metadata, &schema, - predicate.as_deref(), + predicate.as_ref(), row_index, parallel, &projection, diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 25d1f51b098b..5b38d34fa897 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -20,7 +20,7 @@ use super::utils::{ensure_matching_dtypes_if_found, projected_arrow_schema_to_pr use crate::cloud::CloudOptions; use crate::mmap::MmapBytesReader; use crate::parquet::metadata::FileMetadataRef; -use crate::predicates::PhysicalIoExpr; +use crate::predicates::IOPredicate; use crate::prelude::*; use crate::RowIndex; @@ -37,7 +37,7 @@ pub struct ParquetReader { row_index: Option, low_memory: bool, metadata: Option, - predicate: Option>, + predicate: Option, hive_partition_columns: Option>, include_file_path: Option<(PlSmallStr, Arc)>, use_statistics: bool, @@ -189,7 +189,7 @@ impl ParquetReader { Ok(self.metadata.as_ref().unwrap()) } - pub fn with_predicate(mut self, predicate: Option>) -> Self { + pub fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } @@ -261,7 +261,7 @@ impl SerReader for ParquetReader { self.projection.as_deref(), &schema, Some(metadata), - self.predicate.as_deref(), + self.predicate.as_ref(), self.parallel, self.row_index, self.use_statistics, @@ -297,7 +297,7 @@ pub struct ParquetAsyncReader { slice: (usize, usize), rechunk: bool, projection: Option>, - predicate: Option>, + predicate: Option, row_index: Option, use_statistics: bool, hive_partition_columns: Option>, @@ -423,7 +423,7 @@ impl ParquetAsyncReader { self } - pub fn with_predicate(mut self, predicate: Option>) -> Self { + pub fn with_predicate(mut self, predicate: Option) -> Self { self.predicate = predicate; self } diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index c455ae0966c8..53a702fd2d12 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -204,6 +204,28 @@ fn use_min_max(dtype: &DataType) -> bool { ) } +pub struct ColumnStatistics { + pub dtype: DataType, + pub min: AnyValue<'static>, + pub max: AnyValue<'static>, + pub null_count: Option, +} + +pub trait SkipBatchPredicate: Send + Sync { + fn can_skip_batch( + &self, + batch_size: IdxSize, + statistics: PlIndexMap, + ) -> PolarsResult; +} + +#[derive(Clone)] +pub struct IOPredicate { + pub expr: Arc, + pub live_columns: Arc>, + pub skip_batch_predicate: Option>, +} + /// A collection of column stats with a known schema. #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))] #[derive(Debug, Clone)] diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index b365e7284566..c3673bf470b9 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -686,7 +686,7 @@ impl LazyFrame { } else { true }; - let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &expr_arena)?; + let physical_plan = create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?; let state = ExecutionState::new(); Ok((state, physical_plan, no_file_sink)) @@ -738,7 +738,7 @@ impl LazyFrame { let mut physical_plan = create_physical_plan( alp_plan.lp_top, &mut alp_plan.lp_arena, - &alp_plan.expr_arena, + &mut alp_plan.expr_arena, )?; let mut state = ExecutionState::new(); physical_plan.execute(&mut state) diff --git a/crates/polars-mem-engine/src/executors/multi_file_scan.rs b/crates/polars-mem-engine/src/executors/multi_file_scan.rs index 873b8cbbbaae..99b9023046e3 100644 --- a/crates/polars-mem-engine/src/executors/multi_file_scan.rs +++ b/crates/polars-mem-engine/src/executors/multi_file_scan.rs @@ -6,7 +6,7 @@ use polars_core::frame::column::ScalarColumn; use polars_core::utils::{ accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked, }; -use polars_io::predicates::BatchStats; +use polars_io::predicates::SkipBatchPredicate; use polars_io::RowIndex; use super::Executor; @@ -19,55 +19,7 @@ use crate::executors::JsonExec; #[cfg(feature = "parquet")] use crate::executors::ParquetExec; use crate::prelude::*; - -pub struct PhysicalExprWithConstCols { - constants: Vec<(PlSmallStr, Scalar)>, - child: Arc, -} - -impl PhysicalExpr for PhysicalExprWithConstCols { - fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { - let mut df = df.clone(); - for (name, scalar) in &self.constants { - df.with_column(Column::new_scalar( - name.clone(), - scalar.clone(), - df.height(), - ))?; - } - - self.child.evaluate(&df, state) - } - - fn evaluate_on_groups<'a>( - &self, - df: &DataFrame, - groups: &'a GroupPositions, - state: &ExecutionState, - ) -> PolarsResult> { - let mut df = df.clone(); - for (name, scalar) in &self.constants { - df.with_column(Column::new_scalar( - name.clone(), - scalar.clone(), - df.height(), - ))?; - } - - self.child.evaluate_on_groups(&df, groups, state) - } - - fn to_field(&self, input_schema: &Schema) -> PolarsResult { - self.child.to_field(input_schema) - } - - fn collect_live_columns(&self, lv: &mut PlIndexSet) { - self.child.collect_live_columns(lv) - } - fn is_scalar(&self) -> bool { - self.child.is_scalar() - } -} +use crate::FilePredicate; /// An [`Executor`] that scans over some IO. pub trait ScanExec { @@ -76,7 +28,8 @@ pub trait ScanExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult; @@ -206,7 +159,7 @@ pub struct MultiScanExec { sources: ScanSources, file_info: FileInfo, hive_parts: Option>>, - predicate: Option>, + predicate: Option, file_options: FileScanOptions, scan_type: FileScan, } @@ -216,7 +169,7 @@ impl MultiScanExec { sources: ScanSources, file_info: FileInfo, hive_parts: Option>>, - predicate: Option>, + predicate: Option, file_options: FileScanOptions, scan_type: FileScan, ) -> Self { @@ -284,11 +237,8 @@ impl MultiScanExec { // Look through the predicate and assess whether hive columns are being used in it. let mut has_live_hive_columns = false; if let Some(predicate) = &predicate { - let mut live_columns = PlIndexSet::new(); - predicate.collect_live_columns(&mut live_columns); - for hive_column in &hive_column_set { - has_live_hive_columns |= live_columns.contains(hive_column); + has_live_hive_columns |= predicate.live_columns.contains(hive_column); } } @@ -388,43 +338,36 @@ impl MultiScanExec { // used. if has_live_hive_columns { let hive_part = hive_part.unwrap(); - let child = file_predicate.unwrap(); - - file_predicate = Some(Arc::new(PhysicalExprWithConstCols { - constants: hive_column_set - .iter() - .enumerate() - .map(|(idx, column)| { - let series = hive_part.get_statistics().column_stats()[idx] - .to_min() - .unwrap(); - ( - column.clone(), - Scalar::new( - series.dtype().clone(), - series.get(0).unwrap().into_static(), - ), - ) - }) - .collect(), - child, - })); + file_predicate = Some(file_predicate.unwrap().with_constant_columns( + hive_column_set.iter().enumerate().map(|(idx, column)| { + let series = hive_part.get_statistics().column_stats()[idx] + .to_min() + .unwrap(); + ( + column.clone(), + Scalar::new( + series.dtype().clone(), + series.get(0).unwrap().into_static(), + ), + ) + }), + )); } - let stats_evaluator = file_predicate.as_ref().and_then(|p| p.as_stats_evaluator()); - let stats_evaluator = stats_evaluator.filter(|_| use_statistics); - - if let Some(stats_evaluator) = stats_evaluator { - let allow_predicate_skip = !stats_evaluator - .should_read(&BatchStats::default()) - .unwrap_or(true); - if allow_predicate_skip && verbose { + let skip_batch_predicate = file_predicate + .as_ref() + .take_if(|_| use_statistics) + .and_then(|p| p.to_dyn_skip_batch_predicate(self.file_info.schema.clone())); + if let Some(skip_batch_predicate) = &skip_batch_predicate { + let can_skip_batch = skip_batch_predicate + .can_skip_batch(exec_source.num_unfiltered_rows()?, PlIndexMap::default())?; + if can_skip_batch && verbose { eprintln!( "File statistics allows skipping of '{}'", source.to_include_path_name() ); } - do_skip_file |= allow_predicate_skip; + do_skip_file |= can_skip_batch; } if do_skip_file { @@ -494,6 +437,7 @@ impl MultiScanExec { current_source_with_columns.into_owned(), slice, file_predicate, + skip_batch_predicate, row_index.clone(), )?; diff --git a/crates/polars-mem-engine/src/executors/scan/csv.rs b/crates/polars-mem-engine/src/executors/scan/csv.rs index 4dc700878592..7908f687c433 100644 --- a/crates/polars-mem-engine/src/executors/scan/csv.rs +++ b/crates/polars-mem-engine/src/executors/scan/csv.rs @@ -4,16 +4,18 @@ use polars_core::config; use polars_core::utils::{ accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked, }; +use polars_io::predicates::SkipBatchPredicate; use polars_io::utils::compression::maybe_decompress_bytes; use super::*; +use crate::FilePredicate; pub struct CsvExec { pub sources: ScanSources, pub file_info: FileInfo, pub options: CsvReadOptions, pub file_options: FileScanOptions, - pub predicate: Option>, + pub predicate: Option, } impl CsvExec { @@ -29,7 +31,10 @@ impl CsvExec { assert_eq!(x.0, 0); x.1 })); - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let predicate = self + .predicate + .as_ref() + .map(|p| phys_expr_to_io_expr(p.predicate.clone())); let options_base = self .options .clone() @@ -214,7 +219,8 @@ impl ScanExec for CsvExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + _skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index 9c305949f060..0d763d6dd79f 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -4,17 +4,18 @@ use polars_core::utils::accumulate_dataframes_vertical; use polars_error::feature_gated; use polars_io::cloud::CloudOptions; use polars_io::path_utils::is_cloud_url; -use polars_io::predicates::apply_predicate; +use polars_io::predicates::{apply_predicate, SkipBatchPredicate}; use polars_utils::mmap::MemSlice; use polars_utils::open_file; use rayon::prelude::*; use super::*; +use crate::FilePredicate; pub struct IpcExec { pub(crate) sources: ScanSources, pub(crate) file_info: FileInfo, - pub(crate) predicate: Option>, + pub(crate) predicate: Option, #[allow(dead_code)] pub(crate) options: IpcScanOptions, pub(crate) file_options: FileScanOptions, @@ -148,7 +149,7 @@ impl IpcExec { }; let dfs = if let Some(predicate) = self.predicate.clone() { - let predicate = phys_expr_to_io_expr(predicate); + let predicate = phys_expr_to_io_expr(predicate.predicate); let predicate = Some(predicate.as_ref()); POOL.install(|| { @@ -198,7 +199,8 @@ impl ScanExec for IpcExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + _skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; diff --git a/crates/polars-mem-engine/src/executors/scan/mod.rs b/crates/polars-mem-engine/src/executors/scan/mod.rs index ff37e2f588d3..6a645a0ecb71 100644 --- a/crates/polars-mem-engine/src/executors/scan/mod.rs +++ b/crates/polars-mem-engine/src/executors/scan/mod.rs @@ -29,6 +29,7 @@ use polars_plan::global::_set_n_rows_for_scan; pub(crate) use self::python_scan::*; use super::*; use crate::prelude::*; +use crate::FilePredicate; /// Producer of an in memory DataFrame pub struct DataFrameExec { @@ -58,7 +59,7 @@ pub(crate) struct AnonymousScanExec { pub(crate) function: Arc, pub(crate) file_options: FileScanOptions, pub(crate) file_info: FileInfo, - pub(crate) predicate: Option>, + pub(crate) predicate: Option, pub(crate) output_schema: Option, pub(crate) predicate_has_windows: bool, } @@ -82,7 +83,7 @@ impl Executor for AnonymousScanExec { match (self.function.allows_predicate_pushdown(), &self.predicate) { (true, Some(predicate)) => state.record( || { - args.predicate = predicate.as_expression().cloned(); + args.predicate = predicate.predicate.as_expression().cloned(); self.function.scan(args) }, "anonymous_scan".into(), @@ -90,7 +91,7 @@ impl Executor for AnonymousScanExec { (false, Some(predicate)) => state.record( || { let mut df = self.function.scan(args)?; - let s = predicate.evaluate(&df, state)?; + let s = predicate.predicate.evaluate(&df, state)?; if self.predicate_has_windows { state.clear_window_expr_cache() } diff --git a/crates/polars-mem-engine/src/executors/scan/ndjson.rs b/crates/polars-mem-engine/src/executors/scan/ndjson.rs index 7cbcb89810a6..804dad7d5ae9 100644 --- a/crates/polars-mem-engine/src/executors/scan/ndjson.rs +++ b/crates/polars-mem-engine/src/executors/scan/ndjson.rs @@ -1,16 +1,18 @@ use polars_core::config; use polars_core::utils::accumulate_dataframes_vertical; +use polars_io::predicates::SkipBatchPredicate; use polars_io::prelude::{JsonLineReader, SerReader}; use polars_io::utils::compression::maybe_decompress_bytes; use super::*; +use crate::FilePredicate; pub struct JsonExec { sources: ScanSources, options: NDJsonReadOptions, file_options: FileScanOptions, file_info: FileInfo, - predicate: Option>, + predicate: Option, } impl JsonExec { @@ -19,7 +21,7 @@ impl JsonExec { options: NDJsonReadOptions, file_options: FileScanOptions, file_info: FileInfo, - predicate: Option>, + predicate: Option, ) -> Self { Self { sources, @@ -93,7 +95,11 @@ impl JsonExec { .with_rechunk(self.file_options.rechunk) .with_chunk_size(Some(self.options.chunk_size)) .with_row_index(row_index) - .with_predicate(self.predicate.clone().map(phys_expr_to_io_expr)) + .with_predicate( + self.predicate + .as_ref() + .map(|p| phys_expr_to_io_expr(p.predicate.clone())), + ) .with_projection(self.file_options.with_columns.clone()) .low_memory(self.options.low_memory) .with_n_rows(n_rows) @@ -133,7 +139,8 @@ impl ScanExec for JsonExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + _skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 5a6efdaa608f..6ec1708f051d 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -6,10 +6,12 @@ use polars_core::utils::accumulate_dataframes_vertical; use polars_error::feature_gated; use polars_io::cloud::CloudOptions; use polars_io::parquet::metadata::FileMetadataRef; +use polars_io::predicates::{IOPredicate, SkipBatchPredicate}; use polars_io::utils::slice::split_slice_at_file; use polars_io::RowIndex; use super::*; +use crate::FilePredicate; pub struct ParquetExec { sources: ScanSources, @@ -17,7 +19,9 @@ pub struct ParquetExec { hive_parts: Option>>, - predicate: Option>, + predicate: Option, + skip_batch_predicate: Option>, + pub(crate) options: ParquetOptions, #[allow(dead_code)] cloud_options: Option, @@ -32,7 +36,7 @@ impl ParquetExec { sources: ScanSources, file_info: FileInfo, hive_parts: Option>>, - predicate: Option>, + predicate: Option, options: ParquetOptions, cloud_options: Option, file_options: FileScanOptions, @@ -45,6 +49,8 @@ impl ParquetExec { hive_parts, predicate, + skip_batch_predicate: None, + options, cloud_options, file_options, @@ -75,7 +81,14 @@ impl ParquetExec { None } }; - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let predicate = self.predicate.as_ref().map(|p| IOPredicate { + expr: phys_expr_to_io_expr(p.predicate.clone()), + live_columns: p.live_columns.clone(), + skip_batch_predicate: self + .skip_batch_predicate + .clone() + .or_else(|| p.to_dyn_skip_batch_predicate(self.file_info.schema.clone())), + }); let mut base_row_index = self.file_options.row_index.take(); // (offset, end) @@ -279,7 +292,14 @@ impl ParquetExec { None } }; - let predicate = self.predicate.clone().map(phys_expr_to_io_expr); + let predicate = self.predicate.as_ref().map(|p| IOPredicate { + expr: phys_expr_to_io_expr(p.predicate.clone()), + live_columns: p.live_columns.clone(), + skip_batch_predicate: self + .skip_batch_predicate + .clone() + .or_else(|| p.to_dyn_skip_batch_predicate(self.file_info.schema.clone())), + }); let mut base_row_index = self.file_options.row_index.take(); // Modified if we have a negative slice @@ -487,7 +507,7 @@ impl ParquetExec { .row_index .as_ref() .and_then(|_| self.predicate.take()) - .map(phys_expr_to_io_expr); + .map(|p| phys_expr_to_io_expr(p.predicate)); let is_cloud = self.sources.is_cloud_url(); let force_async = config::force_async(); @@ -560,12 +580,14 @@ impl ScanExec for ParquetExec { &mut self, with_columns: Option>, slice: Option<(usize, usize)>, - predicate: Option>, + predicate: Option, + skip_batch_predicate: Option>, row_index: Option, ) -> PolarsResult { self.file_options.with_columns = with_columns; self.file_options.slice = slice.map(|(o, l)| (o as i64, l)); self.predicate = predicate; + self.skip_batch_predicate = skip_batch_predicate; self.file_options.row_index = row_index; if self.file_info.reader_schema.is_none() { diff --git a/crates/polars-mem-engine/src/lib.rs b/crates/polars-mem-engine/src/lib.rs index 7129cbaee660..f8c0843414d1 100644 --- a/crates/polars-mem-engine/src/lib.rs +++ b/crates/polars-mem-engine/src/lib.rs @@ -1,7 +1,9 @@ mod executors; mod planner; +mod predicate; mod prelude; mod utils; pub use executors::Executor; pub use planner::create_physical_plan; +pub use predicate::FilePredicate; diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 5310296f54c4..9bd65d7ae962 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -1,12 +1,16 @@ use polars_core::prelude::*; use polars_core::POOL; use polars_expr::state::ExecutionState; +use polars_io::prelude::ParquetOptions; use polars_plan::global::_set_n_rows_for_scan; use polars_plan::plans::expr_ir::ExprIR; +use polars_utils::format_pl_smallstr; +use self::predicates::aexpr_to_skip_batch_predicate; use super::super::executors::{self, Executor}; use super::*; use crate::utils::*; +use crate::FilePredicate; fn partitionable_gb( keys: &[ExprIR], @@ -56,7 +60,7 @@ impl ConversionState { pub fn create_physical_plan( root: Node, lp_arena: &mut Arena, - expr_arena: &Arena, + expr_arena: &mut Arena, ) -> PolarsResult> { let state = ConversionState::new()?; create_physical_plan_impl(root, lp_arena, expr_arena, &state) @@ -65,7 +69,7 @@ pub fn create_physical_plan( fn create_physical_plan_impl( root: Node, lp_arena: &mut Arena, - expr_arena: &Arena, + expr_arena: &mut Arena, state: &ConversionState, ) -> PolarsResult> { use IR::*; @@ -203,22 +207,91 @@ fn create_physical_plan_impl( }; let mut state = ExpressionConversionState::new(true, state.expr_depth); + let do_new_multifile = (sources.len() > 1 || hive_parts.is_some()) + && !matches!(scan_type, FileScan::Anonymous { .. }) + && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1"); + let predicate = predicate .map(|pred| { - create_physical_expr( + let predicate = create_physical_expr( &pred, Context::Default, expr_arena, output_schema.as_ref().unwrap_or(&file_info.schema), &mut state, - ) + )?; + let live_columns = Arc::new(PlIndexSet::from_iter(aexpr_to_leaf_names_iter( + pred.node(), + expr_arena, + ))); + + let mut skip_batch_predicate = None; + + let mut create_skip_batch_predicate = false; + + create_skip_batch_predicate |= do_new_multifile; + create_skip_batch_predicate |= matches!( + scan_type, + FileScan::Parquet { + options: ParquetOptions { + use_statistics: true, + .. + }, + .. + } + ); + + if create_skip_batch_predicate { + if let Some(node) = aexpr_to_skip_batch_predicate( + pred.node(), + expr_arena, + &file_info.schema, + ) { + let expr = ExprIR::new(node, pred.output_name_inner().clone()); + + if std::env::var("POLARS_OUTPUT_SKIP_BATCH_PRED").as_deref() == Ok("1") + { + eprintln!("predicate: {}", pred.display(expr_arena)); + eprintln!("skip_batch_predicate: {}", expr.display(expr_arena)); + } + + let schema = output_schema.as_ref().unwrap_or(&file_info.schema); + let mut skip_batch_schema = + Schema::with_capacity(1 + live_columns.len()); + + skip_batch_schema.insert(PlSmallStr::from_static("len"), IDX_DTYPE); + for (col, dtype) in schema.iter() { + if !live_columns.contains(col) { + continue; + } + + skip_batch_schema + .insert(format_pl_smallstr!("{col}_min"), dtype.clone()); + skip_batch_schema + .insert(format_pl_smallstr!("{col}_max"), dtype.clone()); + skip_batch_schema + .insert(format_pl_smallstr!("{col}_nc"), IDX_DTYPE); + } + + skip_batch_predicate = Some(create_physical_expr( + &expr, + Context::Default, + expr_arena, + &Arc::new(skip_batch_schema), + &mut state, + )?); + } + } + + PolarsResult::Ok(FilePredicate { + predicate, + live_columns, + skip_batch_predicate, + }) }) .map_or(Ok(None), |v| v.map(Some))?; - if sources.len() > 1 - && std::env::var("POLARS_NEW_MULTIFILE").as_deref() == Ok("1") - && !matches!(scan_type, FileScan::Anonymous { .. }) - { + if do_new_multifile { return Ok(Box::new(executors::MultiScanExec::new( sources, file_info, diff --git a/crates/polars-mem-engine/src/predicate.rs b/crates/polars-mem-engine/src/predicate.rs new file mode 100644 index 000000000000..5bf98a2ad3f4 --- /dev/null +++ b/crates/polars-mem-engine/src/predicate.rs @@ -0,0 +1,230 @@ +use std::sync::Arc; + +use polars_core::frame::DataFrame; +use polars_core::prelude::{ + AnyValue, Column, Field, GroupPositions, PlIndexMap, PlIndexSet, IDX_DTYPE, +}; +use polars_core::scalar::Scalar; +use polars_core::schema::{Schema, SchemaRef}; +use polars_error::PolarsResult; +use polars_expr::prelude::{AggregationContext, PhysicalExpr}; +use polars_expr::state::ExecutionState; +use polars_io::predicates::{ColumnStatistics, SkipBatchPredicate}; +use polars_utils::pl_str::PlSmallStr; +use polars_utils::{format_pl_smallstr, IdxSize}; + +pub struct PhysicalExprWithConstCols { + constants: Vec<(PlSmallStr, Scalar)>, + child: Arc, +} + +impl PhysicalExpr for PhysicalExprWithConstCols { + fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { + let mut df = df.clone(); + for (name, scalar) in &self.constants { + df.with_column(Column::new_scalar( + name.clone(), + scalar.clone(), + df.height(), + ))?; + } + + self.child.evaluate(&df, state) + } + + fn evaluate_on_groups<'a>( + &self, + df: &DataFrame, + groups: &'a GroupPositions, + state: &ExecutionState, + ) -> PolarsResult> { + let mut df = df.clone(); + for (name, scalar) in &self.constants { + df.with_column(Column::new_scalar( + name.clone(), + scalar.clone(), + df.height(), + ))?; + } + + self.child.evaluate_on_groups(&df, groups, state) + } + + fn to_field(&self, input_schema: &Schema) -> PolarsResult { + self.child.to_field(input_schema) + } + + fn collect_live_columns(&self, lv: &mut PlIndexSet) { + self.child.collect_live_columns(lv) + } + fn is_scalar(&self) -> bool { + self.child.is_scalar() + } +} + +/// All the expressions and metadata used to filter out rows using predicates. +#[derive(Clone)] +pub struct FilePredicate { + pub predicate: Arc, + + /// Column names that are used in the predicate. + pub live_columns: Arc>, + + /// A predicate expression used to skip record batches based on its statistics. + /// + /// This expression will be given a `min`, `max` and `null count` for each live column (set to + /// `null` when it is not known) and the expression evaluates to `true` if the whole batch can for + /// sure be skipped. This may be conservative and evaluate to `false` even when the batch could + /// theorically be skipped. + pub skip_batch_predicate: Option>, +} + +struct SkipBatchPredicateHelper { + skip_batch_predicate: Arc, + live_columns: Arc>, + schema: SchemaRef, +} + +impl FilePredicate { + pub fn with_constant_columns( + &self, + constant_columns: impl IntoIterator, + ) -> Self { + let constant_columns = constant_columns.into_iter(); + + let mut live_columns = self.live_columns.as_ref().clone(); + let mut skip_batch_predicate_constants = Vec::with_capacity( + self.skip_batch_predicate + .is_some() + .then_some(1 + constant_columns.size_hint().0 * 3) + .unwrap_or_default(), + ); + + let predicate_constants = constant_columns + .filter_map(|(name, scalar): (PlSmallStr, Scalar)| { + if !live_columns.swap_remove(&name) { + return None; + } + + if self.skip_batch_predicate.is_some() { + let mut null_count: Scalar = (0 as IdxSize).into(); + + // If the constant value is Null, we don't know how many nulls there are + // because the length of the batch may vary. + if scalar.is_null() { + null_count.update(AnyValue::Null); + } + + skip_batch_predicate_constants.extend([ + (format_pl_smallstr!("{name}_min"), scalar.clone()), + (format_pl_smallstr!("{name}_max"), scalar.clone()), + (format_pl_smallstr!("{name}_nc"), null_count), + ]); + } + + Some((name, scalar)) + }) + .collect(); + + let predicate = Arc::new(PhysicalExprWithConstCols { + constants: predicate_constants, + child: self.predicate.clone(), + }); + let skip_batch_predicate = self.skip_batch_predicate.as_ref().map(|skp| { + Arc::new(PhysicalExprWithConstCols { + constants: skip_batch_predicate_constants, + child: skp.clone(), + }) as _ + }); + + Self { + predicate, + live_columns: Arc::new(live_columns), + skip_batch_predicate, + } + } + + pub(crate) fn to_dyn_skip_batch_predicate( + &self, + schema: SchemaRef, + ) -> Option> { + let skip_batch_predicate = self.skip_batch_predicate.as_ref()?; + + Some(Arc::new(SkipBatchPredicateHelper { + skip_batch_predicate: skip_batch_predicate.clone(), + live_columns: self.live_columns.clone(), + schema, + })) + } +} + +impl SkipBatchPredicate for SkipBatchPredicateHelper { + fn can_skip_batch( + &self, + batch_size: IdxSize, + statistics: PlIndexMap, + ) -> PolarsResult { + let mut columns = Vec::with_capacity(1 + self.live_columns.len() * 3); + columns.push(Column::new_scalar( + PlSmallStr::from_static("len"), + batch_size.into(), + 1, + )); + + for col in self.live_columns.as_ref() { + if statistics.contains_key(col) { + continue; + } + + let dtype = self.schema.get(col).unwrap(); + columns.extend([ + Column::new_scalar( + format_pl_smallstr!("{col}_min"), + Scalar::new(dtype.clone(), AnyValue::Null), + 1, + ), + Column::new_scalar( + format_pl_smallstr!("{col}_max"), + Scalar::new(dtype.clone(), AnyValue::Null), + 1, + ), + Column::new_scalar( + format_pl_smallstr!("{col}_nc"), + Scalar::new(IDX_DTYPE, AnyValue::Null), + 1, + ), + ]); + } + + for (col, stat) in statistics { + columns.extend([ + Column::new_scalar( + format_pl_smallstr!("{col}_min"), + Scalar::new(stat.dtype.clone(), stat.min), + 1, + ), + Column::new_scalar( + format_pl_smallstr!("{col}_max"), + Scalar::new(stat.dtype, stat.max), + 1, + ), + Column::new_scalar( + format_pl_smallstr!("{col}_nc"), + Scalar::new( + IDX_DTYPE, + stat.null_count.map_or(AnyValue::Null, |nc| nc.into()), + ), + 1, + ), + ]); + } + + let df = DataFrame::new(columns).unwrap(); + Ok(self + .skip_batch_predicate + .evaluate(&df, &Default::default())? + .bool()? + .first() + .unwrap()) + } +} diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index 058563b91b61..62e064d2bf79 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -140,7 +140,7 @@ impl ParquetSource { ri.offset += self.processed_rows.load(Ordering::Relaxed) as IdxSize; ri })) - .with_predicate(predicate.clone()) + .with_predicate(None) // @TODO!!! Fix!!! .use_statistics(options.use_statistics) .with_hive_partition_columns(hive_partitions) .with_include_file_path( @@ -209,7 +209,7 @@ impl ParquetSource { self.file_options.allow_missing_columns, ) .await? - .with_predicate(predicate.clone()) + .with_predicate(None) // @TODO!!! Fix!!! .use_statistics(options.use_statistics) .with_hive_partition_columns(hive_partitions) .with_include_file_path( diff --git a/crates/polars-plan/src/plans/aexpr/predicates.rs b/crates/polars-plan/src/plans/aexpr/predicates.rs index 78c92c60e608..2e2445dd5ef9 100644 --- a/crates/polars-plan/src/plans/aexpr/predicates.rs +++ b/crates/polars-plan/src/plans/aexpr/predicates.rs @@ -12,24 +12,24 @@ use crate::dsl::FunctionExpr; use crate::plans::{ExprIR, LiteralValue}; use crate::prelude::FunctionOptions; -/// Return a new boolean expression determines whether a batch can be skipped based on min and max -/// statistics. +/// Return a new boolean expression determines whether a batch can be skipped based on min, max and +/// null count statistics. /// /// This is conversative and may return `None` or `false` when an expression is not yet supported. /// /// To evaluate, the expression it is given all the original column appended with `_min` and /// `_max`. The `min` or `max` cannot be null and when they are null it is assumed they are not /// known. -pub fn aexpr_to_skip_batch_expr( +pub fn aexpr_to_skip_batch_predicate( e: Node, expr_arena: &mut Arena, schema: &Schema, ) -> Option { - aexpr_to_skip_batch_expr_rec(e, expr_arena, schema, 0) + aexpr_to_skip_batch_predicate_rec(e, expr_arena, schema, 0) } #[recursive::recursive] -fn aexpr_to_skip_batch_expr_rec( +fn aexpr_to_skip_batch_predicate_rec( e: Node, expr_arena: &mut Arena, schema: &Schema, @@ -39,7 +39,7 @@ fn aexpr_to_skip_batch_expr_rec( macro_rules! rec { ($node:expr) => {{ - aexpr_to_skip_batch_expr_rec($node, expr_arena, schema, depth + 1) + aexpr_to_skip_batch_predicate_rec($node, expr_arena, schema, depth + 1) }}; } macro_rules! and { @@ -102,11 +102,6 @@ fn aexpr_to_skip_batch_expr_rec( binexpr!(EqValidity, $l, $r) }}; } - macro_rules! ne_missing { - ($l:expr, $r:expr) => {{ - binexpr!(NeValidity, $l, $r) - }}; - } macro_rules! all { ($i:expr) => {{ expr_arena.add(AExpr::Function { @@ -117,14 +112,23 @@ fn aexpr_to_skip_batch_expr_rec( }}; } macro_rules! is_stat_defined { - ($i:expr) => {{ - let nc = expr_arena.add(AExpr::Function { + ($i:expr, $dtype:expr) => {{ + let mut expr = expr_arena.add(AExpr::Function { input: vec![ExprIR::new($i, OutputName::Alias(PlSmallStr::EMPTY))], - function: FunctionExpr::NullCount, + function: FunctionExpr::Boolean(BooleanFunction::IsNotNull), options: FunctionOptions::default(), }); - let idx_zero = lv!(idx: 0); - ne_missing!(nc, idx_zero) + + if $dtype.is_float() { + let is_not_nan = expr_arena.add(AExpr::Function { + input: vec![ExprIR::new($i, OutputName::Alias(PlSmallStr::EMPTY))], + function: FunctionExpr::Boolean(BooleanFunction::IsNotNan), + options: FunctionOptions::default(), + }); + expr = and!(is_not_nan, expr); + } + + expr }}; } macro_rules! col { @@ -195,8 +199,9 @@ fn aexpr_to_skip_batch_expr_rec( let col_min = col!(min: col); let col_max = col!(max: col); - let min_is_defined = is_stat_defined!(col_min); - let max_is_defined = is_stat_defined!(col_max); + let dtype = schema.get(&col)?; + let min_is_defined = is_stat_defined!(col_min, dtype); + let max_is_defined = is_stat_defined!(col_max, dtype); let min_gt = gt!(col_min, lv_node); let min_gt = and!(min_is_defined, min_gt); @@ -242,16 +247,17 @@ fn aexpr_to_skip_batch_expr_rec( let is_col_less_than_lv = matches!(op, O::Lt) == (col_node == left); let col = col.clone(); + let dtype = schema.get(&col)?; if is_col_less_than_lv { // col(A) < B --> min(A) >= B let col_min = col!(min: col); - let min_is_defined = is_stat_defined!(col_min); + let min_is_defined = is_stat_defined!(col_min, dtype); let min_ge = ge!(col_min, lv_node); Some(and!(min_is_defined, min_ge)) } else { // col(A) >= B --> max(A) < B let col_max = col!(max: col); - let max_is_defined = is_stat_defined!(col_max); + let max_is_defined = is_stat_defined!(col_max, dtype); let max_lt = lt!(col_max, lv_node); Some(and!(max_is_defined, max_lt)) } @@ -267,16 +273,17 @@ fn aexpr_to_skip_batch_expr_rec( let is_col_greater_than_lv = matches!(op, O::Gt) == (col_node == left); let col = col.clone(); + let dtype = schema.get(&col)?; if is_col_greater_than_lv { // col(A) > B --> max(A) <= B let col_max = col!(max: col); - let max_is_defined = is_stat_defined!(col_max); + let max_is_defined = is_stat_defined!(col_max, dtype); let max_le = le!(col_max, lv_node); Some(and!(max_is_defined, max_le)) } else { // col(A) <= B --> min(A) > B let col_min = col!(min: col); - let min_is_defined = is_stat_defined!(col_min); + let min_is_defined = is_stat_defined!(col_min, dtype); let min_gt = gt!(col_min, lv_node); Some(and!(min_is_defined, min_gt)) } @@ -303,11 +310,11 @@ fn aexpr_to_skip_batch_expr_rec( | O::Xor => None, } }, - AExpr::Cast { .. } => todo!(), - AExpr::Sort { .. } => todo!(), - AExpr::Gather { .. } => todo!(), - AExpr::SortBy { .. } => todo!(), - AExpr::Filter { .. } => todo!(), + AExpr::Cast { .. } => None, + AExpr::Sort { .. } => None, + AExpr::Gather { .. } => None, + AExpr::SortBy { .. } => None, + AExpr::Filter { .. } => None, AExpr::Agg(..) => None, AExpr::Ternary { .. } => None, AExpr::AnonymousFunction { .. } => None, @@ -323,13 +330,14 @@ fn aexpr_to_skip_batch_expr_rec( (Some(col), Some(lv)) => match lv.as_ref() { LiteralValue::Series(s) => { let col = col.clone(); + let dtype = schema.get(&col)?; let has_nulls = s.has_nulls(); let col_min = col!(min: col); let col_max = col!(max: col); - let min_is_defined = is_stat_defined!(col_min); - let max_is_defined = is_stat_defined!(col_max); + let min_is_defined = is_stat_defined!(col_min, dtype); + let max_is_defined = is_stat_defined!(col_max, dtype); let min_gt = gt!(col_min, lv_node); let min_gt = all!(min_gt); @@ -364,6 +372,7 @@ fn aexpr_to_skip_batch_expr_rec( } } +#[allow(clippy::type_complexity)] fn get_binary_expr_col_and_lv<'a>( left: Node, right: Node, diff --git a/crates/polars-plan/src/plans/optimizer/mod.rs b/crates/polars-plan/src/plans/optimizer/mod.rs index 603984c5719b..19ea4e9c5317 100644 --- a/crates/polars-plan/src/plans/optimizer/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/mod.rs @@ -20,7 +20,6 @@ mod predicate_pushdown; mod projection_pushdown; mod set_order; mod simplify_expr; -mod skip_batches; mod slice_pushdown_expr; mod slice_pushdown_lp; mod stack_opt; @@ -222,10 +221,6 @@ pub fn optimize( })?; } - if opt_state.predicate_pushdown() { - skip_batches::optimize(lp_top, lp_arena, expr_arena); - } - // During debug we check if the optimizations have not modified the final schema. #[cfg(debug_assertions)] { diff --git a/crates/polars-plan/src/plans/optimizer/skip_batches.rs b/crates/polars-plan/src/plans/optimizer/skip_batches.rs deleted file mode 100644 index 636c1e19470a..000000000000 --- a/crates/polars-plan/src/plans/optimizer/skip_batches.rs +++ /dev/null @@ -1,35 +0,0 @@ -use polars_utils::arena::{Arena, Node}; -use polars_utils::pl_str::PlSmallStr; - -use super::predicates::aexpr_to_skip_batch_expr; -use super::{AExpr, FileScan, IR}; -use crate::plans::{ExprIR, OutputName}; - -pub fn optimize(root: Node, lp_arena: &mut Arena, expr_arena: &mut Arena) { - let mut ir_stack = Vec::with_capacity(16); - ir_stack.push(root); - - while let Some(current) = ir_stack.pop() { - let current_ir = lp_arena.get(current); - current_ir.copy_inputs(&mut ir_stack); - let IR::Scan { - scan_type: FileScan::Parquet { .. }, - predicate: Some(expr), - file_info, - .. - } = current_ir - else { - continue; - }; - - let skip_batches_expr = - aexpr_to_skip_batch_expr(expr.node(), expr_arena, &file_info.schema); - if let Some(skip_batches_expr) = skip_batches_expr { - eprintln!( - "{}", - ExprIR::new(skip_batches_expr, OutputName::Alias(PlSmallStr::EMPTY)) - .display(expr_arena) - ); - } - } -} diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index 372e260b1d1b..a0e4dfc536cf 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -40,9 +40,12 @@ pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec, py: Python) -> PyResult