From 5e0fc1abbd46989957e03c2ff534fcb185339a5e Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 8 Nov 2024 17:05:58 +0100 Subject: [PATCH 1/5] perf: Fix quadratic 'with_columns' behavior --- crates/polars-core/src/frame/mod.rs | 14 ++++++++++++-- crates/polars-mem-engine/src/executors/stack.rs | 3 ++- crates/polars-mem-engine/src/planner/lp.rs | 3 ++- py-polars/tests/benchmark/test_with_columns.py | 17 +++++++++++++++++ 4 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 py-polars/tests/benchmark/test_with_columns.py diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index aa434fb07df7..5ae542d29a3e 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1382,12 +1382,19 @@ impl DataFrame { self } + // Note: Schema can be both input or output_schema fn add_column_by_schema(&mut self, c: Column, schema: &Schema) -> PolarsResult<()> { let name = c.name(); if let Some((idx, _, _)) = schema.get_full(name.as_str()) { - // schema is incorrect fallback to search if self.columns.get(idx).map(|s| s.name()) != Some(name) { - self.add_column_by_search(c)?; + // Given schema is output_schema and we can push. + if idx == self.columns.len() { + self.columns.push(c); + } + // Schema is incorrect fallback to search + else { + self.add_column_by_search(c)?; + } } else { self.replace_column(idx, c)?; } @@ -1401,6 +1408,7 @@ impl DataFrame { Ok(()) } + // Note: Schema can be both input or output_schema pub fn _add_series(&mut self, series: Vec, schema: &Schema) -> PolarsResult<()> { for (i, s) in series.into_iter().enumerate() { // we need to branch here @@ -1430,6 +1438,8 @@ impl DataFrame { /// Add a new column to this [`DataFrame`] or replace an existing one. /// Uses an existing schema to amortize lookups. /// If the schema is incorrect, we will fallback to linear search. + /// + /// Note: Schema can be both input or output_schema pub fn with_column_and_schema( &mut self, column: C, diff --git a/crates/polars-mem-engine/src/executors/stack.rs b/crates/polars-mem-engine/src/executors/stack.rs index ba6fa8111402..a93d4fc72d89 100644 --- a/crates/polars-mem-engine/src/executors/stack.rs +++ b/crates/polars-mem-engine/src/executors/stack.rs @@ -8,6 +8,7 @@ pub struct StackExec { pub(crate) has_windows: bool, pub(crate) exprs: Vec>, pub(crate) input_schema: SchemaRef, + pub(crate) output_schema: SchemaRef, pub(crate) options: ProjectionOptions, // Can run all operations elementwise pub(crate) streamable: bool, @@ -19,7 +20,7 @@ impl StackExec { state: &ExecutionState, mut df: DataFrame, ) -> PolarsResult { - let schema = &*self.input_schema; + let schema = &*self.output_schema; // Vertical and horizontal parallelism. let df = if self.streamable diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index 3a5e525867fb..0d438b5f5bd1 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -629,7 +629,7 @@ fn create_physical_plan_impl( HStack { input, exprs, - schema: _schema, + schema: output_schema, options, } => { let input_schema = lp_arena.get(input).schema(lp_arena).into_owned(); @@ -659,6 +659,7 @@ fn create_physical_plan_impl( has_windows: state.has_windows, exprs: phys_exprs, input_schema, + output_schema, options, streamable, })) diff --git a/py-polars/tests/benchmark/test_with_columns.py b/py-polars/tests/benchmark/test_with_columns.py new file mode 100644 index 000000000000..30f8b5474ec3 --- /dev/null +++ b/py-polars/tests/benchmark/test_with_columns.py @@ -0,0 +1,17 @@ +import time + +import polars as pl + + +def test_with_columns_quadratic_19503() -> None: + num_columns = 2000 + data1 = {f"col_{i}": [0] for i in range(num_columns)} + df1 = pl.DataFrame(data1) + + data2 = {f"feature_{i}": [0] for i in range(num_columns)} + df2 = pl.DataFrame(data2) + + t0 = time.time() + df1.with_columns(df2) + t1 = time.time() + assert t1 - t0 < 0.2 From 8dab5175d7a71004e7150a27f9523753a998c5ea Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 8 Nov 2024 18:05:46 +0100 Subject: [PATCH 2/5] fix empty case --- crates/polars-core/src/frame/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 5ae542d29a3e..cf5b8cc2943e 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1389,6 +1389,10 @@ impl DataFrame { if self.columns.get(idx).map(|s| s.name()) != Some(name) { // Given schema is output_schema and we can push. if idx == self.columns.len() { + if self.width() == 0 { + self.height = c.len(); + } + self.columns.push(c); } // Schema is incorrect fallback to search From 1be3aa12da32636ba42a271483156a5e1b04512c Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 8 Nov 2024 18:50:07 +0100 Subject: [PATCH 3/5] print --- py-polars/tests/benchmark/test_with_columns.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py-polars/tests/benchmark/test_with_columns.py b/py-polars/tests/benchmark/test_with_columns.py index 30f8b5474ec3..f4db29addeab 100644 --- a/py-polars/tests/benchmark/test_with_columns.py +++ b/py-polars/tests/benchmark/test_with_columns.py @@ -14,4 +14,5 @@ def test_with_columns_quadratic_19503() -> None: t0 = time.time() df1.with_columns(df2) t1 = time.time() + print(t1 - t0) assert t1 - t0 < 0.2 From 939e0e6115beb9cf5389024d550ad30d7b7c6564 Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 8 Nov 2024 20:11:34 +0100 Subject: [PATCH 4/5] skip streaming --- py-polars/tests/benchmark/test_with_columns.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/py-polars/tests/benchmark/test_with_columns.py b/py-polars/tests/benchmark/test_with_columns.py index f4db29addeab..0180c29b6874 100644 --- a/py-polars/tests/benchmark/test_with_columns.py +++ b/py-polars/tests/benchmark/test_with_columns.py @@ -1,8 +1,12 @@ import time +import pytest + import polars as pl +# TODO: this is slow in streaming +@pytest.mark.may_fail_auto_streaming def test_with_columns_quadratic_19503() -> None: num_columns = 2000 data1 = {f"col_{i}": [0] for i in range(num_columns)} From 4050961d34bb362d9afb8a42ac95e3ad7cf33dcb Mon Sep 17 00:00:00 2001 From: ritchie Date: Fri, 8 Nov 2024 20:27:48 +0100 Subject: [PATCH 5/5] add a debug assert --- crates/polars-core/src/frame/mod.rs | 1 + py-polars/tests/benchmark/test_with_columns.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index cf5b8cc2943e..e3b969a81756 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -1397,6 +1397,7 @@ impl DataFrame { } // Schema is incorrect fallback to search else { + debug_assert!(false); self.add_column_by_search(c)?; } } else { diff --git a/py-polars/tests/benchmark/test_with_columns.py b/py-polars/tests/benchmark/test_with_columns.py index 0180c29b6874..8ea3402ac696 100644 --- a/py-polars/tests/benchmark/test_with_columns.py +++ b/py-polars/tests/benchmark/test_with_columns.py @@ -18,5 +18,4 @@ def test_with_columns_quadratic_19503() -> None: t0 = time.time() df1.with_columns(df2) t1 = time.time() - print(t1 - t0) assert t1 - t0 < 0.2