diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index ab74c7c1d2d1..8cb0f27647d4 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1407,12 +1407,19 @@ impl DataFrame { self } + // Note: Schema can be both input or output_schema fn add_column_by_schema(&mut self, c: Column, schema: &Schema) -> PolarsResult<()> { let name = c.name(); if let Some((idx, _, _)) = schema.get_full(name.as_str()) { - // schema is incorrect fallback to search if self.columns.get(idx).map(|s| s.name()) != Some(name) { - self.add_column_by_search(c)?; + // Given schema is output_schema and we can push. + if idx == self.columns.len() { + self.columns.push(c); + } + // Schema is incorrect fallback to search + else { + self.add_column_by_search(c)?; + } } else { self.replace_column(idx, c)?; } @@ -1426,6 +1433,7 @@ impl DataFrame { Ok(()) } + // Note: Schema can be both input or output_schema pub fn _add_series(&mut self, series: Vec, schema: &Schema) -> PolarsResult<()> { for (i, s) in series.into_iter().enumerate() { // we need to branch here @@ -1455,6 +1463,8 @@ impl DataFrame { /// Add a new column to this [`DataFrame`] or replace an existing one. /// Uses an existing schema to amortize lookups. /// If the schema is incorrect, we will fallback to linear search. + /// + /// Note: Schema can be both input or output_schema pub fn with_column_and_schema( &mut self, column: C, diff --git a/crates/polars-mem-engine/src/executors/stack.rs b/crates/polars-mem-engine/src/executors/stack.rs index ba6fa8111402..a93d4fc72d89 100644 --- a/crates/polars-mem-engine/src/executors/stack.rs +++ b/crates/polars-mem-engine/src/executors/stack.rs @@ -8,6 +8,7 @@ pub struct StackExec { pub(crate) has_windows: bool, pub(crate) exprs: Vec>, pub(crate) input_schema: SchemaRef, + pub(crate) output_schema: SchemaRef, pub(crate) options: ProjectionOptions, // Can run all operations elementwise pub(crate) streamable: bool, @@ -19,7 +20,7 @@ impl StackExec { state: &ExecutionState, mut df: DataFrame, ) -> PolarsResult { - let schema = &*self.input_schema; + let schema = &*self.output_schema; // Vertical and horizontal parallelism. let df = if self.streamable diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 3a5e525867fb..0d438b5f5bd1 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -629,7 +629,7 @@ fn create_physical_plan_impl( HStack { input, exprs, - schema: _schema, + schema: output_schema, options, } => { let input_schema = lp_arena.get(input).schema(lp_arena).into_owned(); @@ -659,6 +659,7 @@ fn create_physical_plan_impl( has_windows: state.has_windows, exprs: phys_exprs, input_schema, + output_schema, options, streamable, })) diff --git a/py-polars/tests/benchmark/test_with_columns.py b/py-polars/tests/benchmark/test_with_columns.py new file mode 100644 index 000000000000..30f8b5474ec3 --- /dev/null +++ b/py-polars/tests/benchmark/test_with_columns.py @@ -0,0 +1,17 @@ +import time + +import polars as pl + + +def test_with_columns_quadratic_19503() -> None: + num_columns = 2000 + data1 = {f"col_{i}": [0] for i in range(num_columns)} + df1 = pl.DataFrame(data1) + + data2 = {f"feature_{i}": [0] for i in range(num_columns)} + df2 = pl.DataFrame(data2) + + t0 = time.time() + df1.with_columns(df2) + t1 = time.time() + assert t1 - t0 < 0.2