Skip to content

Commit

Permalink
move pyarrow conversion to planner and ensure we only accept streamab…
Browse files Browse the repository at this point in the history
…le predicates
  • Loading branch information
ritchie46 committed Jul 28, 2024
1 parent 0170bfe commit 5e45a04
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 58 deletions.
32 changes: 23 additions & 9 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,30 @@ fn create_physical_plan_impl(
let logical_plan = lp_arena.take(root);
match logical_plan {
#[cfg(feature = "python")]
PythonScan { options } => {
PythonScan { mut options } => {
let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
let mut state = ExpressionConversionState::new(true, state.expr_depth);
Some(create_physical_expr(
e,
Context::Default,
expr_arena,
Some(&options.schema),
&mut state,
)?)
// Convert to a pyarrow eval string.
if options.is_pyarrow {
if let Some(eval_str) =
pyarrow::predicate_to_pa(e.node(), expr_arena, Default::default())
{
options.predicate = PythonPredicate::PyArrow(eval_str)
}

// We don't have to use a physical expression as pyarrow deals with the filter.
None
}
// Convert to physical expression for the case the reader cannot consume the predicate.
else {
let mut state = ExpressionConversionState::new(true, state.expr_depth);
Some(create_physical_expr(
e,
Context::Default,
expr_arena,
Some(&options.schema),
&mut state,
)?)
}
} else {
None
};
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod lit;
pub(crate) mod optimizer;
pub(crate) mod options;
#[cfg(feature = "python")]
mod pyarrow;
pub mod pyarrow;
mod schema;
pub mod visitor;

Expand Down
57 changes: 12 additions & 45 deletions crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,54 +659,21 @@ impl<'a> PredicatePushDown<'a> {
#[cfg(feature = "python")]
PythonScan { mut options } => {
let predicate = predicate_at_scan(acc_predicates, None, expr_arena);

if options.is_pyarrow {
if let Some(predicate) = predicate {
// Simplify expressions before we translate them to pyarrow
options.predicate = PythonPredicate::Polars(predicate);
let lp = PythonScan {
options: options.clone(),
};
let lp_top = lp_arena.add(lp);
let stack_opt = StackOptimizer {};
let lp_top = stack_opt
.optimize_loop(
&mut [Box::new(SimplifyExprRule {})],
expr_arena,
lp_arena,
lp_top,
)
.unwrap();
let PythonScan { mut options } = lp_arena.take(lp_top) else {
unreachable!()
};
let PythonPredicate::Polars(predicate) = &options.predicate else {
unreachable!()
};

match super::super::pyarrow::predicate_to_pa(
predicate.node(),
if let Some(predicate) = predicate {
// Only accept streamable expressions as we want to apply the predicates to the batches.
if !is_streamable(predicate.node(), expr_arena, Context::Default) {
let lp = PythonScan { options };
return Ok(self.optional_apply_predicate(
lp,
vec![predicate],
lp_arena,
expr_arena,
Default::default(),
) {
// We were able to create a pyarrow string, mutate the options.
Some(eval_str) => {
options.predicate = PythonPredicate::PyArrow(eval_str)
},
// We were not able to translate the predicate apply on the rust side in the scan.
None => {
let lp = PythonScan { options };
return Ok(lp);
},
}
));
}
Ok(PythonScan { options })
} else {
if let Some(predicate) = predicate {
options.predicate = PythonPredicate::Polars(predicate);
}
Ok(PythonScan { options })

options.predicate = PythonPredicate::Polars(predicate);
}
Ok(PythonScan { options })
},
Invalid => unreachable!(),
}
Expand Down
6 changes: 3 additions & 3 deletions crates/polars-plan/src/plans/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_core::prelude::{TimeUnit, TimeZone};
use crate::prelude::*;

#[derive(Default, Copy, Clone)]
pub(super) struct Args {
pub struct PyarrowArgs {
// pyarrow doesn't allow `filter([True, False])`
// but does allow `filter(field("a").isin([True, False]))`
allow_literal_series: bool,
Expand All @@ -22,10 +22,10 @@ fn to_py_datetime(v: i64, tu: &TimeUnit, tz: Option<&TimeZone>) -> String {
}

// convert to a pyarrow expression that can be evaluated with pythons eval
pub(super) fn predicate_to_pa(
pub fn predicate_to_pa(
predicate: Node,
expr_arena: &Arena<AExpr>,
args: Args,
args: PyarrowArgs,
) -> Option<String> {
match expr_arena.get(predicate) {
AExpr::BinaryExpr { left, right, op } => {
Expand Down

0 comments on commit 5e45a04

Please sign in to comment.