Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(rust): Add elementwise select and with_columns to new streaming engine #17185

Merged
merged 5 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions crates/polars-core/src/frame/from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,24 +28,3 @@ impl TryFrom<StructArray> for DataFrame {
DataFrame::new(columns)
}
}

impl From<&Schema> for DataFrame {
fn from(schema: &Schema) -> Self {
let cols = schema
.iter()
.map(|(name, dtype)| Series::new_empty(name, dtype))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}
}

impl From<&ArrowSchema> for DataFrame {
fn from(schema: &ArrowSchema) -> Self {
let cols = schema
.fields
.iter()
.map(|fld| Series::new_empty(fld.name.as_str(), &(fld.data_type().into())))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}
}
19 changes: 19 additions & 0 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ impl DataFrame {
unsafe { DataFrame::new_no_checks(Vec::new()) }
}

/// Create an empty `DataFrame` with empty columns as per the `schema`.
pub fn empty_with_schema(schema: &Schema) -> Self {
let cols = schema
.iter()
.map(|(name, dtype)| Series::new_empty(name, dtype))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}

/// Create an empty `DataFrame` with empty columns as per the `schema`.
pub fn empty_with_arrow_schema(schema: &ArrowSchema) -> Self {
let cols = schema
.fields
.iter()
.map(|fld| Series::new_empty(fld.name.as_str(), &(fld.data_type().into())))
.collect();
unsafe { DataFrame::new_no_checks(cols) }
}

/// Removes the last `Series` from the `DataFrame` and returns it, or [`None`] if it is empty.
///
/// # Example
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-io/src/csv/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,9 @@ impl<'a> CoreReader<'a> {
// An empty file with a schema should return an empty DataFrame with that schema
if bytes.is_empty() {
let mut df = if projection.len() == self.schema.len() {
DataFrame::from(self.schema.as_ref())
DataFrame::empty_with_schema(self.schema.as_ref())
} else {
DataFrame::from(
DataFrame::empty_with_schema(
&projection
.iter()
.map(|&i| self.schema.get_at_index(i).unwrap())
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn materialize_empty_df(
} else {
Cow::Borrowed(reader_schema)
};
let mut df = DataFrame::from(schema.as_ref());
let mut df = DataFrame::empty_with_arrow_schema(&schema);

if let Some(row_index) = row_index {
df.insert_column(0, Series::new_empty(&row_index.name, &IDX_DTYPE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl<const FIXED: bool> AggHashTable<FIXED> {
let (skip_len, take_len) = if let Some((offset, slice_len)) = slice {
if *offset as usize >= local_len {
*offset -= local_len as i64;
return DataFrame::from(self.output_schema.as_ref());
return DataFrame::empty_with_schema(&self.output_schema);
} else {
let out = (*offset as usize, *slice_len);
*offset = 0;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sinks/group_by/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub(super) fn finalize_group_by(
ooc_payload: Option<(IOThread, Box<dyn Sink>)>,
) -> PolarsResult<FinalizedSink> {
let df = if dfs.is_empty() {
DataFrame::from(output_schema)
DataFrame::empty_with_schema(output_schema)
} else {
let mut df = accumulate_dataframes_vertical_unchecked(dfs);
// re init to check duplicates
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ impl Sink for OrderedSink {
}
fn finalize(&mut self, _context: &PExecutionContext) -> PolarsResult<FinalizedSink> {
if self.chunks.is_empty() {
return Ok(FinalizedSink::Finished(DataFrame::from(
self.schema.as_ref(),
return Ok(FinalizedSink::Finished(DataFrame::empty_with_schema(
&self.schema,
)));
}
self.sort();
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sinks/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl Sink for SliceSink {
let mut chunks = chunks.lock().unwrap();
let chunks: Vec<DataChunk> = std::mem::take(chunks.as_mut());
if chunks.is_empty() {
return Ok(FinalizedSink::Finished(DataFrame::from(
self.schema.as_ref(),
return Ok(FinalizedSink::Finished(DataFrame::empty_with_schema(
&self.schema,
)));
}

Expand Down
3 changes: 3 additions & 0 deletions crates/polars-plan/src/plans/ir/schema.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use recursive::recursive;

use super::*;

impl IR {
Expand Down Expand Up @@ -61,6 +63,7 @@ impl IR {
}

/// Get the schema of the logical plan node.
#[recursive]
pub fn schema<'a>(&'a self, arena: &'a Arena<IR>) -> Cow<'a, SchemaRef> {
use IR::*;
let schema = match self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ impl<'a> PredicatePushDown<'a> {
}
if new_paths.is_empty() {
let schema = output_schema.as_ref().unwrap_or(&file_info.schema);
let df = DataFrame::from(schema.as_ref());
let df = DataFrame::empty_with_schema(schema);

return Ok(DataFrameScan {
df: Arc::new(df),
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ impl SQLContext {
None => {
let tbl = table_name.to_string();
if let Some(lf) = self.table_map.get_mut(&tbl) {
*lf = DataFrame::from(
*lf = DataFrame::empty_with_schema(
lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)
.unwrap()
.as_ref(),
Expand Down
1 change: 1 addition & 0 deletions crates/polars-stream/src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct GraphNode {
}

/// A pipe sends data between nodes.
#[allow(unused)] // TODO: remove.
pub struct LogicalPipe {
// Node that we send data to.
sender: GraphNodeKey,
Expand Down
1 change: 0 additions & 1 deletion crates/polars-stream/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#![allow(unused)] // TODO: remove.

#[allow(unused)] // TODO: remove.
mod async_executor;
#[allow(unused)] // TODO: remove.
mod async_primitives;
Expand Down
28 changes: 19 additions & 9 deletions crates/polars-stream/src/nodes/in_memory_sink.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
use std::cmp::Reverse;
use std::collections::{BinaryHeap, VecDeque};
use std::sync::Arc;

use parking_lot::Mutex;
use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_core::series::Series;
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_core::utils::rayon::iter::{IntoParallelIterator, ParallelIterator};
use polars_core::POOL;
use polars_error::PolarsResult;
use polars_expr::state::ExecutionState;
use polars_utils::priority::Priority;
use polars_utils::sync::SyncPtr;

use super::ComputeNode;
use crate::async_executor::{JoinHandle, TaskScope};
use crate::async_primitives::pipe::{Receiver, Sender};
use crate::morsel::Morsel;
use crate::utils::in_memory_linearize::linearize;

#[derive(Default)]
pub struct InMemorySink {
morsels_per_pipe: Mutex<Vec<Vec<Morsel>>>,
schema: Arc<Schema>,
}

impl InMemorySink {
pub fn new(schema: Arc<Schema>) -> Self {
Self {
morsels_per_pipe: Mutex::default(),
schema,
}
}
}

impl ComputeNode for InMemorySink {
Expand Down Expand Up @@ -51,8 +57,12 @@ impl ComputeNode for InMemorySink {
}

fn finalize(&mut self) -> PolarsResult<Option<DataFrame>> {
let mut morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());
let morsels_per_pipe = core::mem::take(&mut *self.morsels_per_pipe.get_mut());
let dataframes = linearize(morsels_per_pipe);
Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))
if dataframes.is_empty() {
Ok(Some(DataFrame::empty_with_schema(&self.schema)))
} else {
Ok(Some(accumulate_dataframes_vertical_unchecked(dataframes)))
}
}
}
1 change: 1 addition & 0 deletions crates/polars-stream/src/nodes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::morsel::Morsel;
pub mod filter;
pub mod in_memory_sink;
pub mod in_memory_source;
pub mod select;
pub mod simple_projection;

pub trait ComputeNode {
Expand Down
90 changes: 90 additions & 0 deletions crates/polars-stream/src/nodes/select.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::sync::Arc;

use polars_core::frame::DataFrame;
use polars_core::schema::Schema;
use polars_core::series::Series;
use polars_error::PolarsResult;
use polars_expr::prelude::PhysicalExpr;
use polars_expr::state::ExecutionState;

use super::ComputeNode;
use crate::async_executor::{JoinHandle, TaskScope};
use crate::async_primitives::pipe::{Receiver, Sender};
use crate::morsel::Morsel;

pub struct SelectNode {
selectors: Vec<Arc<dyn PhysicalExpr>>,
schema: Arc<Schema>,
extend_original: bool,
}

impl SelectNode {
pub fn new(
selectors: Vec<Arc<dyn PhysicalExpr>>,
schema: Arc<Schema>,
extend_original: bool,
) -> Self {
Self {
selectors,
schema,
extend_original,
}
}
}

impl ComputeNode for SelectNode {
fn spawn<'env, 's>(
&'env self,
scope: &'s TaskScope<'s, 'env>,
_pipeline: usize,
recv: Vec<Receiver<Morsel>>,
send: Vec<Sender<Morsel>>,
state: &'s ExecutionState,
) -> JoinHandle<PolarsResult<()>> {
let [mut recv] = <[_; 1]>::try_from(recv).ok().unwrap();
let [mut send] = <[_; 1]>::try_from(send).ok().unwrap();

scope.spawn_task(true, async move {
while let Ok(morsel) = recv.recv().await {
let morsel = morsel.try_map(|df| {
// Select columns.
let mut selected: Vec<Series> = self
.selectors
.iter()
.map(|s| s.evaluate(&df, state))
.collect::<PolarsResult<_>>()?;

// Extend or create new dataframe.
let ret = if self.extend_original {
let mut out = df.clone();
out._add_columns(selected, &self.schema)?;
out
} else {
// Broadcast scalars.
let max_non_unit_length = selected
.iter()
.map(|s| s.len())
.filter(|l| *l != 1)
.max()
.unwrap_or(1);
for s in &mut selected {
if s.len() != max_non_unit_length {
assert!(s.len() == 1, "got series of incompatible lengths");
*s = s.new_from_index(0, max_non_unit_length);
}
}
unsafe { DataFrame::new_no_checks(selected) }
};

PolarsResult::Ok(ret)
})?;

if send.send(morsel).await.is_err() {
break;
}
}

Ok(())
})
}
}
48 changes: 46 additions & 2 deletions crates/polars-stream/src/physical_plan/lower_ir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,50 @@ pub fn lower_ir(
expr_arena: &mut Arena<AExpr>,
phys_sm: &mut SlotMap<PhysNodeKey, PhysNode>,
) -> PolarsResult<PhysNodeKey> {
match ir_arena.get(node) {
let ir_node = ir_arena.get(node);
match ir_node {
IR::SimpleProjection { input, columns } => {
let schema = columns.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::SimpleProjection { input, schema }))
},

// TODO: split partially streamable selections to avoid fallback as much as possible.
IR::Select {
input,
expr,
schema,
..
} if expr.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
let selectors = expr.clone();
let schema = schema.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::Select {
input,
selectors,
schema,
extend_original: false,
}))
},

// TODO: split partially streamable selections to avoid fallback as much as possible.
IR::HStack {
input,
exprs,
schema,
..
} if exprs.iter().all(|e| is_streamable(e.node(), expr_arena)) => {
let selectors = exprs.clone();
let schema = schema.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Ok(phys_sm.insert(PhysNode::Select {
input,
selectors,
schema,
extend_original: true,
}))
},

orlp marked this conversation as resolved.
Show resolved Hide resolved
IR::Filter { input, predicate } if is_streamable(predicate.node(), expr_arena) => {
let predicate = predicate.clone();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
Expand Down Expand Up @@ -57,8 +100,9 @@ pub fn lower_ir(

IR::Sink { input, payload } => {
if *payload == SinkType::Memory {
let schema = ir_node.schema(ir_arena).into_owned();
let input = lower_ir(*input, ir_arena, expr_arena, phys_sm)?;
return Ok(phys_sm.insert(PhysNode::InMemorySink { input }));
return Ok(phys_sm.insert(PhysNode::InMemorySink { input, schema }));
}

todo!()
Expand Down
Loading
Loading