Skip to content

Commit

Permalink
working source
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 30, 2024
1 parent 1b54dd9 commit db894ec
Show file tree
Hide file tree
Showing 13 changed files with 284 additions and 88 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 35 additions & 9 deletions crates/polars-mem-engine/src/executors/scan/python_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use polars_core::error::to_compute_err;
use polars_core::utils::accumulate_dataframes_vertical;
use pyo3::exceptions::PyStopIteration;
use pyo3::prelude::*;
use pyo3::types::PyBytes;
use pyo3::{intern, PyTypeInfo};

use super::*;

pub(crate) struct PythonScanExec {
pub(crate) options: PythonOptions,
pub(crate) predicate: Option<Arc<dyn PhysicalExpr>>,
pub(crate) predicate_serialized: Option<Vec<u8>>,
}

fn python_df_to_rust(py: Python, df: Bound<PyAny>) -> PolarsResult<DataFrame> {
Expand Down Expand Up @@ -51,22 +53,36 @@ impl Executor for PythonScanExec {

let predicate = match &self.options.predicate {
PythonPredicate::PyArrow(s) => s.into_py(py),
PythonPredicate::None => (None::<()>).into_py(py),
// Still todo, currently we apply the predicate on this side.
PythonPredicate::None => None::<()>.into_py(py),
PythonPredicate::Polars(_) => {
assert!(self.predicate.is_some(), "should be set");

(None::<()>).into_py(py)
match &self.predicate_serialized {
None => None::<()>.into_py(py),
Some(buf) => PyBytes::new_bound(py, buf).to_object(py),
}
},
};

let generator = callable
.call1((python_scan_function, with_columns, predicate, n_rows))
let batch_size = if self.options.is_pyarrow {
None
} else {
Some(100_000usize)
};

let generator_init = callable
.call1((
python_scan_function,
with_columns,
predicate,
n_rows,
batch_size,
))
.map_err(to_compute_err)?;

// This isn't a generator, but a `DataFrame`.
if generator.getattr(intern!(py, "_df")).is_ok() {
let df = python_df_to_rust(py, generator)?;
if generator_init.getattr(intern!(py, "_df")).is_ok() {
let df = python_df_to_rust(py, generator_init)?;
return if let Some(pred) = &self.predicate {
let mask = pred.evaluate(&df, state)?;
df.filter(mask.bool()?)
Expand All @@ -75,20 +91,30 @@ impl Executor for PythonScanExec {
};
}

let generator = generator_init
.get_item(0)
.map_err(|_| polars_err!(ComputeError: "expected tuple got {}", generator_init))?;
let can_parse_predicate = generator_init
.get_item(1)
.map_err(|_| polars_err!(ComputeError: "expected tuple got {}", generator))?;
let can_parse_predicate = can_parse_predicate.extract::<bool>().map_err(
|_| polars_err!(ComputeError: "expected bool got {}", can_parse_predicate),
)?;

let mut chunks = vec![];
loop {
match generator.call_method0(intern!(py, "__next__")) {
Ok(out) => {
let mut df = python_df_to_rust(py, out)?;
if let Some(pred) = &self.predicate {
if let (Some(pred), false) = (&self.predicate, can_parse_predicate) {
let mask = pred.evaluate(&df, state)?;
df = df.filter(mask.bool()?)?;
}
chunks.push(df)
},
Err(err) if err.matches(py, PyStopIteration::type_object_bound(py)) => break,
Err(err) => {
polars_bail!(ComputeError: "catched exception during execution of a Python source, exception: {}", err)
polars_bail!(ComputeError: "caught exception during execution of a Python source, exception: {}", err)
},
}
}
Expand Down
19 changes: 15 additions & 4 deletions crates/polars-mem-engine/src/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,15 @@ fn create_physical_plan_impl(
match logical_plan {
#[cfg(feature = "python")]
PythonScan { mut options } => {
let mut predicate_serialized = None;
let predicate = if let PythonPredicate::Polars(e) = &options.predicate {
// Convert to a pyarrow eval string.
if options.is_pyarrow {
if let Some(eval_str) =
pyarrow::predicate_to_pa(e.node(), expr_arena, Default::default())
{
if let Some(eval_str) = polars_plan::plans::python::pyarrow::predicate_to_pa(
e.node(),
expr_arena,
Default::default(),
) {
options.predicate = PythonPredicate::PyArrow(eval_str)
}

Expand All @@ -173,6 +176,10 @@ fn create_physical_plan_impl(
}
// Convert to physical expression for the case the reader cannot consume the predicate.
else {
let dsl_expr = e.to_expr(expr_arena);
predicate_serialized =
polars_plan::plans::python::predicate::serialize(&dsl_expr)?;

let mut state = ExpressionConversionState::new(true, state.expr_depth);
Some(create_physical_expr(
e,
Expand All @@ -185,7 +192,11 @@ fn create_physical_plan_impl(
} else {
None
};
Ok(Box::new(executors::PythonScanExec { options, predicate }))
Ok(Box::new(executors::PythonScanExec {
options,
predicate,
predicate_serialized,
}))
},
Sink { payload, .. } => match payload {
SinkType::Memory => {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ rayon = { workspace = true }
recursive = { workspace = true }
regex = { workspace = true, optional = true }
serde = { workspace = true, features = ["rc"], optional = true }
serde_json = { workspace = true, optional = true }
smartstring = { workspace = true }
strum_macros = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod lit;
pub(crate) mod optimizer;
pub(crate) mod options;
#[cfg(feature = "python")]
pub mod pyarrow;
pub mod python;
mod schema;
pub mod visitor;

Expand Down
7 changes: 7 additions & 0 deletions crates/polars-plan/src/plans/optimizer/fused.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ impl OptimizationRule for FusedArithmetic {
lp_arena: &Arena<IR>,
lp_node: Node,
) -> PolarsResult<Option<AExpr>> {
// We don't want to fuse arithmetic that we send to pyarrow.
#[cfg(feature = "python")]
if let IR::PythonScan { options } = lp_arena.get(lp_node) {
if options.is_pyarrow {
return Ok(None);
}
};
let expr = expr_arena.get(expr_node);

use AExpr::*;
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-plan/src/plans/python/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod predicate;
pub mod pyarrow;
69 changes: 69 additions & 0 deletions crates/polars-plan/src/plans/python/predicate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use polars_core::error::polars_err;
use polars_core::prelude::PolarsResult;

use crate::prelude::*;

fn accept_as_io_predicate(e: &Expr) -> bool {
const LIMIT: usize = 1 << 16;
match e {
Expr::Literal(lv) => match lv {
LiteralValue::Binary(v) => v.len() <= LIMIT,
LiteralValue::String(v) => v.len() <= LIMIT,
LiteralValue::Series(s) => s.estimated_size() < LIMIT,
// Don't accept dynamic types
LiteralValue::Int(_) => false,
LiteralValue::Float(_) => false,
_ => true,
},
Expr::Wildcard | Expr::Column(_) => true,
Expr::BinaryExpr { left, right, .. } => {
accept_as_io_predicate(left) && accept_as_io_predicate(right)
},
Expr::Ternary {
truthy,
falsy,
predicate,
} => {
accept_as_io_predicate(truthy)
&& accept_as_io_predicate(falsy)
&& accept_as_io_predicate(predicate)
},
Expr::Alias(_, _) => true,
Expr::Function {
function, input, ..
} => {
match function {
// we already checked if streaming, so we can all functions
FunctionExpr::Boolean(_) | FunctionExpr::BinaryExpr(_) | FunctionExpr::Coalesce => {
},
#[cfg(feature = "log")]
FunctionExpr::Entropy { .. }
| FunctionExpr::Log { .. }
| FunctionExpr::Log1p { .. }
| FunctionExpr::Exp { .. } => {},
#[cfg(feature = "abs")]
FunctionExpr::Abs => {},
#[cfg(feature = "trigonometry")]
FunctionExpr::Atan2 => {},
#[cfg(feature = "round_series")]
FunctionExpr::Clip { .. } => {},
#[cfg(feature = "fused")]
FunctionExpr::Fused(_) => {},
_ => return false,
}
input.iter().all(accept_as_io_predicate)
},
_ => false,
}
}

pub fn serialize(expr: &Expr) -> PolarsResult<Option<Vec<u8>>> {
if !accept_as_io_predicate(expr) {
return Ok(None);
}
let mut buf = vec![];
ciborium::into_writer(expr, &mut buf)
.map_err(|_| polars_err!(ComputeError: "could not serialize: {}", expr))?;

Ok(Some(buf))
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ pub fn predicate_to_pa(
}
},
AExpr::Column(name) => Some(format!("pa.compute.field('{}')", name.as_ref())),
AExpr::Alias(input, _) => predicate_to_pa(*input, expr_arena, args),
AExpr::Literal(LiteralValue::Series(s)) => {
if !args.allow_literal_series || s.is_empty() || s.len() > 100 {
None
Expand Down Expand Up @@ -115,33 +114,6 @@ pub fn predicate_to_pa(
},
}
},
AExpr::Function {
function: FunctionExpr::Boolean(BooleanFunction::Not),
input,
..
} => {
let input = input.first().unwrap().node();
let input = predicate_to_pa(input, expr_arena, args)?;
Some(format!("~({input})"))
},
AExpr::Function {
function: FunctionExpr::Boolean(BooleanFunction::IsNull),
input,
..
} => {
let input = input.first().unwrap().node();
let input = predicate_to_pa(input, expr_arena, args)?;
Some(format!("({input}).is_null()"))
},
AExpr::Function {
function: FunctionExpr::Boolean(BooleanFunction::IsNotNull),
input,
..
} => {
let input = input.first().unwrap().node();
let input = predicate_to_pa(input, expr_arena, args)?;
Some(format!("~({input}).is_null()"))
},
#[cfg(feature = "is_in")]
AExpr::Function {
function: FunctionExpr::Boolean(BooleanFunction::IsIn),
Expand Down Expand Up @@ -182,6 +154,23 @@ pub fn predicate_to_pa(
))
}
},
AExpr::Function {
function, input, ..
} => {
let input = input.first().unwrap().node();
let input = predicate_to_pa(input, expr_arena, args)?;

match function {
FunctionExpr::Boolean(BooleanFunction::Not) => Some(format!("~({input})")),
FunctionExpr::Boolean(BooleanFunction::IsNull) => {
Some(format!("({input}).is_null()"))
},
FunctionExpr::Boolean(BooleanFunction::IsNotNull) => {
Some(format!("~({input}).is_null()"))
},
_ => None,
}
},
_ => None,
}
}
7 changes: 6 additions & 1 deletion py-polars/polars/expr/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,10 @@ def function(s: Series) -> Series: # pragma: no cover

@classmethod
def deserialize(
cls, source: str | Path | IOBase, *, format: SerializationFormat = "binary"
cls,
source: str | Path | IOBase | bytes,
*,
format: SerializationFormat = "binary",
) -> Expr:
"""
Read a serialized expression from a file.
Expand Down Expand Up @@ -385,6 +388,8 @@ def deserialize(
source = BytesIO(source.getvalue().encode())
elif isinstance(source, (str, Path)):
source = normalize_filepath(source)
elif isinstance(source, bytes):
source = BytesIO(source)

if format == "binary":
deserializer = PyExpr.deserialize_binary
Expand Down
45 changes: 0 additions & 45 deletions py-polars/polars/io/plugin.py

This file was deleted.

Loading

0 comments on commit db894ec

Please sign in to comment.