diff --git a/crates/polars-python/src/cloud.rs b/crates/polars-python/src/cloud.rs index 39410a6fa7a1..19d4f6dfda07 100644 --- a/crates/polars-python/src/cloud.rs +++ b/crates/polars-python/src/cloud.rs @@ -49,9 +49,7 @@ pub fn _execute_ir_plan_with_gpu(ir_plan_ser: Vec, py: Python) -> PyResult

>) -> PyResult { - let df = interop::arrow::to_rust::to_rust_df(&rb)?; + pub fn from_arrow_record_batches(py: Python, rb: Vec>) -> PyResult { + let df = interop::arrow::to_rust::to_rust_df(py, &rb)?; Ok(Self::from(df)) } } diff --git a/crates/polars-python/src/dataframe/export.rs b/crates/polars-python/src/dataframe/export.rs index b32ad3d6afb3..36037865feb2 100644 --- a/crates/polars-python/src/dataframe/export.rs +++ b/crates/polars-python/src/dataframe/export.rs @@ -79,19 +79,17 @@ impl PyDataFrame { } #[allow(clippy::wrong_self_convention)] - pub fn to_arrow(&mut self, compat_level: PyCompatLevel) -> PyResult> { - self.df.align_chunks_par(); - Python::with_gil(|py| { - let pyarrow = py.import_bound("pyarrow")?; - let names = self.df.get_column_names_str(); + pub fn to_arrow(&mut self, py: Python, compat_level: PyCompatLevel) -> PyResult> { + py.allow_threads(|| self.df.align_chunks_par()); + let pyarrow = py.import_bound("pyarrow")?; + let names = self.df.get_column_names_str(); - let rbs = self - .df - .iter_chunks(compat_level.0, true) - .map(|rb| interop::arrow::to_py::to_py_rb(&rb, &names, py, &pyarrow)) - .collect::>()?; - Ok(rbs) - }) + let rbs = self + .df + .iter_chunks(compat_level.0, true) + .map(|rb| interop::arrow::to_py::to_py_rb(&rb, &names, py, &pyarrow)) + .collect::>()?; + Ok(rbs) } /// Create a `Vec` of PyArrow RecordBatch instances. @@ -100,8 +98,8 @@ impl PyDataFrame { /// since those can't be converted correctly via PyArrow. The calling Python /// code should make sure these are not included. #[allow(clippy::wrong_self_convention)] - pub fn to_pandas(&mut self) -> PyResult> { - self.df.as_single_chunk_par(); + pub fn to_pandas(&mut self, py: Python) -> PyResult> { + py.allow_threads(|| self.df.as_single_chunk_par()); Python::with_gil(|py| { let pyarrow = py.import_bound("pyarrow")?; let names = self.df.get_column_names_str(); @@ -154,7 +152,7 @@ impl PyDataFrame { py: Python<'py>, requested_schema: Option, ) -> PyResult> { - self.df.align_chunks_par(); + py.allow_threads(|| self.df.align_chunks_par()); dataframe_to_stream(&self.df, py) } } diff --git a/crates/polars-python/src/dataframe/general.rs b/crates/polars-python/src/dataframe/general.rs index 8f2321d103fd..e866e7db1004 100644 --- a/crates/polars-python/src/dataframe/general.rs +++ b/crates/polars-python/src/dataframe/general.rs @@ -45,67 +45,87 @@ impl PyDataFrame { .collect() } - pub fn add(&self, s: &PySeries) -> PyResult { - let df = (&self.df + &s.series).map_err(PyPolarsErr::from)?; + pub fn add(&self, py: Python, s: &PySeries) -> PyResult { + let df = py + .allow_threads(|| &self.df + &s.series) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn sub(&self, s: &PySeries) -> PyResult { - let df = (&self.df - &s.series).map_err(PyPolarsErr::from)?; + pub fn sub(&self, py: Python, s: &PySeries) -> PyResult { + let df = py + .allow_threads(|| &self.df - &s.series) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn div(&self, s: &PySeries) -> PyResult { - let df = (&self.df / &s.series).map_err(PyPolarsErr::from)?; + pub fn div(&self, py: Python, s: &PySeries) -> PyResult { + let df = py + .allow_threads(|| &self.df / &s.series) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn mul(&self, s: &PySeries) -> PyResult { - let df = (&self.df * &s.series).map_err(PyPolarsErr::from)?; + pub fn mul(&self, py: Python, s: &PySeries) -> PyResult { + let df = py + .allow_threads(|| &self.df * &s.series) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn rem(&self, s: &PySeries) -> PyResult { - let df = (&self.df % &s.series).map_err(PyPolarsErr::from)?; + pub fn rem(&self, py: Python, s: &PySeries) -> PyResult { + let df = py + .allow_threads(|| &self.df % &s.series) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn add_df(&self, s: &Self) -> PyResult { - let df = (&self.df + &s.df).map_err(PyPolarsErr::from)?; + pub fn add_df(&self, py: Python, s: &Self) -> PyResult { + let df = py + .allow_threads(|| &self.df + &s.df) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn sub_df(&self, s: &Self) -> PyResult { - let df = (&self.df - &s.df).map_err(PyPolarsErr::from)?; + pub fn sub_df(&self, py: Python, s: &Self) -> PyResult { + let df = py + .allow_threads(|| &self.df - &s.df) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn div_df(&self, s: &Self) -> PyResult { - let df = (&self.df / &s.df).map_err(PyPolarsErr::from)?; + pub fn div_df(&self, py: Python, s: &Self) -> PyResult { + let df = py + .allow_threads(|| &self.df / &s.df) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn mul_df(&self, s: &Self) -> PyResult { - let df = (&self.df * &s.df).map_err(PyPolarsErr::from)?; + pub fn mul_df(&self, py: Python, s: &Self) -> PyResult { + let df = py + .allow_threads(|| &self.df * &s.df) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn rem_df(&self, s: &Self) -> PyResult { - let df = (&self.df % &s.df).map_err(PyPolarsErr::from)?; + pub fn rem_df(&self, py: Python, s: &Self) -> PyResult { + let df = py + .allow_threads(|| &self.df % &s.df) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } #[pyo3(signature = (n, with_replacement, shuffle, seed=None))] pub fn sample_n( &self, + py: Python, n: &PySeries, with_replacement: bool, shuffle: bool, seed: Option, ) -> PyResult { - let df = self - .df - .sample_n(&n.series, with_replacement, shuffle, seed) + let df = py + .allow_threads(|| self.df.sample_n(&n.series, with_replacement, shuffle, seed)) .map_err(PyPolarsErr::from)?; Ok(df.into()) } @@ -113,14 +133,17 @@ impl PyDataFrame { #[pyo3(signature = (frac, with_replacement, shuffle, seed=None))] pub fn sample_frac( &self, + py: Python, frac: &PySeries, with_replacement: bool, shuffle: bool, seed: Option, ) -> PyResult { - let df = self - .df - .sample_frac(&frac.series, with_replacement, shuffle, seed) + let df = py + .allow_threads(|| { + self.df + .sample_frac(&frac.series, with_replacement, shuffle, seed) + }) .map_err(PyPolarsErr::from)?; Ok(df.into()) } @@ -183,34 +206,41 @@ impl PyDataFrame { self.df.is_empty() } - pub fn hstack(&self, columns: Vec) -> PyResult { + pub fn hstack(&self, py: Python, columns: Vec) -> PyResult { let columns = columns.to_series(); // @scalar-opt let columns = columns.into_iter().map(Into::into).collect::>(); - let df = self.df.hstack(&columns).map_err(PyPolarsErr::from)?; + let df = py + .allow_threads(|| self.df.hstack(&columns)) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn hstack_mut(&mut self, columns: Vec) -> PyResult<()> { + pub fn hstack_mut(&mut self, py: Python, columns: Vec) -> PyResult<()> { let columns = columns.to_series(); // @scalar-opt let columns = columns.into_iter().map(Into::into).collect::>(); - self.df.hstack_mut(&columns).map_err(PyPolarsErr::from)?; + py.allow_threads(|| self.df.hstack_mut(&columns)) + .map_err(PyPolarsErr::from)?; Ok(()) } - pub fn vstack(&self, other: &PyDataFrame) -> PyResult { - let df = self.df.vstack(&other.df).map_err(PyPolarsErr::from)?; + pub fn vstack(&self, py: Python, other: &PyDataFrame) -> PyResult { + let df = py + .allow_threads(|| self.df.vstack(&other.df)) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn vstack_mut(&mut self, other: &PyDataFrame) -> PyResult<()> { - self.df.vstack_mut(&other.df).map_err(PyPolarsErr::from)?; + pub fn vstack_mut(&mut self, py: Python, other: &PyDataFrame) -> PyResult<()> { + py.allow_threads(|| self.df.vstack_mut(&other.df)) + .map_err(PyPolarsErr::from)?; Ok(()) } - pub fn extend(&mut self, other: &PyDataFrame) -> PyResult<()> { - self.df.extend(&other.df).map_err(PyPolarsErr::from)?; + pub fn extend(&mut self, py: Python, other: &PyDataFrame) -> PyResult<()> { + py.allow_threads(|| self.df.extend(&other.df)) + .map_err(PyPolarsErr::from)?; Ok(()) } @@ -254,10 +284,9 @@ impl PyDataFrame { Ok(series) } - pub fn select(&self, columns: Vec) -> PyResult { - let df = self - .df - .select(columns.iter().map(|x| &**x)) + pub fn select(&self, py: Python, columns: Vec) -> PyResult { + let df = py + .allow_threads(|| self.df.select(columns.iter().map(|x| &**x))) .map_err(PyPolarsErr::from)?; Ok(PyDataFrame::new(df)) } @@ -297,46 +326,55 @@ impl PyDataFrame { } #[pyo3(signature = (offset, length=None))] - pub fn slice(&self, offset: i64, length: Option) -> Self { - let df = self - .df - .slice(offset, length.unwrap_or_else(|| self.df.height())); + pub fn slice(&self, py: Python, offset: i64, length: Option) -> Self { + let df = py.allow_threads(|| { + self.df + .slice(offset, length.unwrap_or_else(|| self.df.height())) + }); df.into() } - pub fn head(&self, n: usize) -> Self { - let df = self.df.head(Some(n)); + pub fn head(&self, py: Python, n: usize) -> Self { + let df = py.allow_threads(|| self.df.head(Some(n))); PyDataFrame::new(df) } - pub fn tail(&self, n: usize) -> Self { - let df = self.df.tail(Some(n)); + pub fn tail(&self, py: Python, n: usize) -> Self { + let df = py.allow_threads(|| self.df.tail(Some(n))); PyDataFrame::new(df) } - pub fn is_unique(&self) -> PyResult { - let mask = self.df.is_unique().map_err(PyPolarsErr::from)?; + pub fn is_unique(&self, py: Python) -> PyResult { + let mask = py + .allow_threads(|| self.df.is_unique()) + .map_err(PyPolarsErr::from)?; Ok(mask.into_series().into()) } - pub fn is_duplicated(&self) -> PyResult { - let mask = self.df.is_duplicated().map_err(PyPolarsErr::from)?; + pub fn is_duplicated(&self, py: Python) -> PyResult { + let mask = py + .allow_threads(|| self.df.is_duplicated()) + .map_err(PyPolarsErr::from)?; Ok(mask.into_series().into()) } - pub fn equals(&self, other: &PyDataFrame, null_equal: bool) -> bool { + pub fn equals(&self, py: Python, other: &PyDataFrame, null_equal: bool) -> bool { if null_equal { - self.df.equals_missing(&other.df) + py.allow_threads(|| self.df.equals_missing(&other.df)) } else { - self.df.equals(&other.df) + py.allow_threads(|| self.df.equals(&other.df)) } } #[pyo3(signature = (name, offset=None))] - pub fn with_row_index(&self, name: &str, offset: Option) -> PyResult { - let df = self - .df - .with_row_index(name.into(), offset) + pub fn with_row_index( + &self, + py: Python, + name: &str, + offset: Option, + ) -> PyResult { + let df = py + .allow_threads(|| self.df.with_row_index(name.into(), offset)) .map_err(PyPolarsErr::from)?; Ok(df.into()) } @@ -398,6 +436,7 @@ impl PyDataFrame { #[pyo3(signature = (on, index, value_name=None, variable_name=None))] pub fn unpivot( &self, + py: Python, on: Vec, index: Vec, value_name: Option<&str>, @@ -411,7 +450,9 @@ impl PyDataFrame { variable_name: variable_name.map(|s| s.into()), }; - let df = self.df.unpivot2(args).map_err(PyPolarsErr::from)?; + let df = py + .allow_threads(|| self.df.unpivot2(args)) + .map_err(PyPolarsErr::from)?; Ok(PyDataFrame::new(df)) } @@ -419,6 +460,7 @@ impl PyDataFrame { #[pyo3(signature = (on, index, values, maintain_order, sort_columns, aggregate_expr, separator))] pub fn pivot_expr( &self, + py: Python, on: Vec, index: Option>, values: Option>, @@ -429,31 +471,38 @@ impl PyDataFrame { ) -> PyResult { let fun = if maintain_order { pivot_stable } else { pivot }; let agg_expr = aggregate_expr.map(|expr| expr.inner); - let df = fun( - &self.df, - on, - index, - values, - sort_columns, - agg_expr, - separator, - ) - .map_err(PyPolarsErr::from)?; + let df = py + .allow_threads(|| { + fun( + &self.df, + on, + index, + values, + sort_columns, + agg_expr, + separator, + ) + }) + .map_err(PyPolarsErr::from)?; Ok(PyDataFrame::new(df)) } pub fn partition_by( &self, + py: Python, by: Vec, maintain_order: bool, include_key: bool, ) -> PyResult> { - let out = if maintain_order { - self.df.partition_by_stable(by, include_key) - } else { - self.df.partition_by(by, include_key) - } - .map_err(PyPolarsErr::from)?; + let out = py + .allow_threads(|| { + if maintain_order { + self.df.partition_by_stable(by, include_key) + } else { + self.df.partition_by(by, include_key) + } + }) + .map_err(PyPolarsErr::from)?; // SAFETY: PyDataFrame is a repr(transparent) DataFrame. Ok(unsafe { std::mem::transmute::, Vec>(out) }) @@ -463,38 +512,40 @@ impl PyDataFrame { self.df.clone().lazy().into() } - pub fn max_horizontal(&self) -> PyResult> { - let s = self.df.max_horizontal().map_err(PyPolarsErr::from)?; + pub fn max_horizontal(&self, py: Python) -> PyResult> { + let s = py + .allow_threads(|| self.df.max_horizontal()) + .map_err(PyPolarsErr::from)?; Ok(s.map(|s| s.take_materialized_series().into())) } - pub fn min_horizontal(&self) -> PyResult> { - let s = self.df.min_horizontal().map_err(PyPolarsErr::from)?; + pub fn min_horizontal(&self, py: Python) -> PyResult> { + let s = py + .allow_threads(|| self.df.min_horizontal()) + .map_err(PyPolarsErr::from)?; Ok(s.map(|s| s.take_materialized_series().into())) } - pub fn sum_horizontal(&self, ignore_nulls: bool) -> PyResult> { + pub fn sum_horizontal(&self, py: Python, ignore_nulls: bool) -> PyResult> { let null_strategy = if ignore_nulls { NullStrategy::Ignore } else { NullStrategy::Propagate }; - let s = self - .df - .sum_horizontal(null_strategy) + let s = py + .allow_threads(|| self.df.sum_horizontal(null_strategy)) .map_err(PyPolarsErr::from)?; Ok(s.map(|s| s.into())) } - pub fn mean_horizontal(&self, ignore_nulls: bool) -> PyResult> { + pub fn mean_horizontal(&self, py: Python, ignore_nulls: bool) -> PyResult> { let null_strategy = if ignore_nulls { NullStrategy::Ignore } else { NullStrategy::Propagate }; - let s = self - .df - .mean_horizontal(null_strategy) + let s = py + .allow_threads(|| self.df.mean_horizontal(null_strategy)) .map_err(PyPolarsErr::from)?; Ok(s.map(|s| s.into())) } @@ -502,24 +553,26 @@ impl PyDataFrame { #[pyo3(signature = (columns, separator, drop_first=false))] pub fn to_dummies( &self, + py: Python, columns: Option>, separator: Option<&str>, drop_first: bool, ) -> PyResult { - let df = match columns { - Some(cols) => self.df.columns_to_dummies( - cols.iter().map(|x| x as &str).collect(), - separator, - drop_first, - ), - None => self.df.to_dummies(separator, drop_first), - } - .map_err(PyPolarsErr::from)?; + let df = py + .allow_threads(|| match columns { + Some(cols) => self.df.columns_to_dummies( + cols.iter().map(|x| x as &str).collect(), + separator, + drop_first, + ), + None => self.df.to_dummies(separator, drop_first), + }) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn null_count(&self) -> Self { - let df = self.df.null_count(); + pub fn null_count(&self, py: Python) -> Self { + let df = py.allow_threads(|| self.df.null_count()); df.into() } @@ -555,19 +608,29 @@ impl PyDataFrame { }) } - pub fn shrink_to_fit(&mut self) { - self.df.shrink_to_fit(); + pub fn shrink_to_fit(&mut self, py: Python) { + py.allow_threads(|| self.df.shrink_to_fit()); } - pub fn hash_rows(&mut self, k0: u64, k1: u64, k2: u64, k3: u64) -> PyResult { + pub fn hash_rows( + &mut self, + py: Python, + k0: u64, + k1: u64, + k2: u64, + k3: u64, + ) -> PyResult { let hb = PlRandomState::with_seeds(k0, k1, k2, k3); - let hash = self.df.hash_rows(Some(hb)).map_err(PyPolarsErr::from)?; + let hash = py + .allow_threads(|| self.df.hash_rows(Some(hb))) + .map_err(PyPolarsErr::from)?; Ok(hash.into_series().into()) } #[pyo3(signature = (keep_names_as, column_names))] pub fn transpose( &mut self, + py: Python, keep_names_as: Option<&str>, column_names: &Bound, ) -> PyResult { @@ -578,54 +641,61 @@ impl PyDataFrame { } else { None }; - Ok(self - .df - .transpose(keep_names_as, new_col_names) + Ok(py + .allow_threads(|| self.df.transpose(keep_names_as, new_col_names)) .map_err(PyPolarsErr::from)? .into()) } + pub fn upsample( &self, + py: Python, by: Vec, index_column: &str, every: &str, stable: bool, ) -> PyResult { let every = Duration::try_parse(every).map_err(PyPolarsErr::from)?; - let out = if stable { - self.df.upsample_stable(by, index_column, every) - } else { - self.df.upsample(by, index_column, every) - }; + let out = py.allow_threads(|| { + if stable { + self.df.upsample_stable(by, index_column, every) + } else { + self.df.upsample(by, index_column, every) + } + }); let out = out.map_err(PyPolarsErr::from)?; Ok(out.into()) } - pub fn to_struct(&self, name: &str, invalid_indices: Vec) -> PySeries { - let ca = self.df.clone().into_struct(name.into()); - - if !invalid_indices.is_empty() { - let mut validity = MutableBitmap::with_capacity(ca.len()); - validity.extend_constant(ca.len(), true); - for i in invalid_indices { - validity.set(i, false); + pub fn to_struct(&self, py: Python, name: &str, invalid_indices: Vec) -> PySeries { + py.allow_threads(|| { + let ca = self.df.clone().into_struct(name.into()); + + if !invalid_indices.is_empty() { + let mut validity = MutableBitmap::with_capacity(ca.len()); + validity.extend_constant(ca.len(), true); + for i in invalid_indices { + validity.set(i, false); + } + let ca = ca.rechunk(); + ca.with_outer_validity(Some(validity.freeze())) + .into_series() + .into() + } else { + ca.into_series().into() } - let ca = ca.rechunk(); - ca.with_outer_validity(Some(validity.freeze())) - .into_series() - .into() - } else { - ca.into_series().into() - } + }) } - pub fn unnest(&self, columns: Vec) -> PyResult { - let df = self.df.unnest(columns).map_err(PyPolarsErr::from)?; + pub fn unnest(&self, py: Python, columns: Vec) -> PyResult { + let df = py + .allow_threads(|| self.df.unnest(columns)) + .map_err(PyPolarsErr::from)?; Ok(df.into()) } - pub fn clear(&self) -> Self { - self.df.clear().into() + pub fn clear(&self, py: Python) -> Self { + py.allow_threads(|| self.df.clear()).into() } #[allow(clippy::wrong_self_convention)] diff --git a/crates/polars-python/src/functions/range.rs b/crates/polars-python/src/functions/range.rs index b6eae4400dd8..11ff3864fdfa 100644 --- a/crates/polars-python/src/functions/range.rs +++ b/crates/polars-python/src/functions/range.rs @@ -17,6 +17,7 @@ pub fn int_range(start: PyExpr, end: PyExpr, step: i64, dtype: Wrap) - /// Eager version of `int_range` to avoid overhead from the expression engine. #[pyfunction] pub fn eager_int_range( + py: Python, lower: &Bound<'_, PyAny>, upper: &Bound<'_, PyAny>, step: &Bound<'_, PyAny>, @@ -34,7 +35,7 @@ pub fn eager_int_range( let start_v: <$T as PolarsNumericType>::Native = lower.extract()?; let end_v: <$T as PolarsNumericType>::Native = upper.extract()?; let step: i64 = step.extract()?; - new_int_range::<$T>(start_v, end_v, step, PlSmallStr::from_static("literal")) + py.allow_threads(|| new_int_range::<$T>(start_v, end_v, step, PlSmallStr::from_static("literal"))) }); let s = ret.map_err(PyPolarsErr::from)?; diff --git a/crates/polars-python/src/interop/arrow/to_rust.rs b/crates/polars-python/src/interop/arrow/to_rust.rs index 1add88c96fd8..ee741c4279cc 100644 --- a/crates/polars-python/src/interop/arrow/to_rust.rs +++ b/crates/polars-python/src/interop/arrow/to_rust.rs @@ -46,7 +46,7 @@ pub fn array_to_rust(obj: &Bound) -> PyResult { } } -pub fn to_rust_df(rb: &[Bound]) -> PyResult { +pub fn to_rust_df(py: Python, rb: &[Bound]) -> PyResult { let schema = rb .first() .ok_or_else(|| PyPolarsErr::Other("empty table".into()))? @@ -79,17 +79,19 @@ pub fn to_rust_df(rb: &[Bound]) -> PyResult { // for instance string -> large-utf8 // dict encoded to categorical let columns = if run_parallel { - POOL.install(|| { - columns - .into_par_iter() - .enumerate() - .map(|(i, arr)| { - let s = Series::try_from((names[i].clone(), arr)) - .map_err(PyPolarsErr::from)? - .into_column(); - Ok(s) - }) - .collect::>>() + py.allow_threads(|| { + POOL.install(|| { + columns + .into_par_iter() + .enumerate() + .map(|(i, arr)| { + let s = Series::try_from((names[i].clone(), arr)) + .map_err(PyPolarsErr::from)? + .into_column(); + Ok(s) + }) + .collect::>>() + }) }) } else { columns diff --git a/crates/polars-python/src/interop/numpy/to_numpy_df.rs b/crates/polars-python/src/interop/numpy/to_numpy_df.rs index c14753bdc7a3..887d218f5fe0 100644 --- a/crates/polars-python/src/interop/numpy/to_numpy_df.rs +++ b/crates/polars-python/src/interop/numpy/to_numpy_df.rs @@ -251,6 +251,7 @@ fn try_df_to_numpy_numeric_supertype( }; Some(np_array) } + fn df_columns_to_numpy( py: Python, df: &DataFrame, diff --git a/crates/polars-python/src/interop/numpy/to_numpy_series.rs b/crates/polars-python/src/interop/numpy/to_numpy_series.rs index 12f71e2a551d..e2a6c439caad 100644 --- a/crates/polars-python/src/interop/numpy/to_numpy_series.rs +++ b/crates/polars-python/src/interop/numpy/to_numpy_series.rs @@ -85,20 +85,21 @@ fn try_series_to_numpy_view( if !allow_nulls && series_contains_null(s) { return None; } - let (s_owned, writable_flag) = handle_chunks(s, allow_rechunk)?; + let (s_owned, writable_flag) = handle_chunks(py, s, allow_rechunk)?; let array = series_to_numpy_view_recursive(py, s_owned, writable_flag); Some((array, writable_flag)) } + /// Rechunk the Series if required. /// /// NumPy arrays are always contiguous, so we may have to rechunk before creating a view. /// If we do so, we can flag the resulting array as writable. -fn handle_chunks(s: &Series, allow_rechunk: bool) -> Option<(Series, bool)> { +fn handle_chunks(py: Python, s: &Series, allow_rechunk: bool) -> Option<(Series, bool)> { let is_chunked = s.n_chunks() > 1; match (is_chunked, allow_rechunk) { (true, false) => None, - (true, true) => Some((s.rechunk(), true)), + (true, true) => Some((py.allow_threads(|| s.rechunk()), true)), (false, _) => Some((s.clone(), false)), } } diff --git a/crates/polars-python/src/map/mod.rs b/crates/polars-python/src/map/mod.rs index 3bf96f91e631..9ffc74961302 100644 --- a/crates/polars-python/src/map/mod.rs +++ b/crates/polars-python/src/map/mod.rs @@ -32,6 +32,7 @@ impl PyArrowPrimitiveType for Float32Type {} impl PyArrowPrimitiveType for Float64Type {} fn iterator_to_struct<'a>( + py: Python, it: impl Iterator>>, init_null_count: usize, first_value: AnyValue<'a>, @@ -115,11 +116,13 @@ fn iterator_to_struct<'a>( } } - let fields = POOL.install(|| { - field_names_ordered - .par_iter() - .map(|name| Series::new(name.clone(), struct_fields.get(name).unwrap())) - .collect::>() + let fields = py.allow_threads(|| { + POOL.install(|| { + field_names_ordered + .par_iter() + .map(|name| Series::new(name.clone(), struct_fields.get(name).unwrap())) + .collect::>() + }) }); Ok( diff --git a/crates/polars-python/src/map/series.rs b/crates/polars-python/src/map/series.rs index cb731e7c03f8..16d6212b8d1e 100644 --- a/crates/polars-python/src/map/series.rs +++ b/crates/polars-python/src/map/series.rs @@ -271,6 +271,7 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { .skip(init_null_count + skip) .map(|val| call_lambda(py, lambda, val).ok()); iterator_to_struct( + py, it, init_null_count, first_value, @@ -283,6 +284,7 @@ impl<'a> ApplyLambda<'a> for BooleanChunked { .skip(init_null_count + skip) .map(|opt_val| opt_val.and_then(|val| call_lambda(py, lambda, val).ok())); iterator_to_struct( + py, it, init_null_count, first_value, @@ -576,6 +578,7 @@ where .skip(init_null_count + skip) .map(|val| call_lambda(py, lambda, val).ok()); iterator_to_struct( + py, it, init_null_count, first_value, @@ -588,6 +591,7 @@ where .skip(init_null_count + skip) .map(|opt_val| opt_val.and_then(|val| call_lambda(py, lambda, val).ok())); iterator_to_struct( + py, it, init_null_count, first_value, @@ -874,6 +878,7 @@ impl<'a> ApplyLambda<'a> for StringChunked { .skip(init_null_count + skip) .map(|val| call_lambda(py, lambda, val).ok()); iterator_to_struct( + py, it, init_null_count, first_value, @@ -886,6 +891,7 @@ impl<'a> ApplyLambda<'a> for StringChunked { .skip(init_null_count + skip) .map(|opt_val| opt_val.and_then(|val| call_lambda(py, lambda, val).ok())); iterator_to_struct( + py, it, init_null_count, first_value, @@ -1221,6 +1227,7 @@ impl<'a> ApplyLambda<'a> for ListChunked { call_lambda(py, lambda, python_series_wrapper).ok() }); iterator_to_struct( + py, it, init_null_count, first_value, @@ -1245,6 +1252,7 @@ impl<'a> ApplyLambda<'a> for ListChunked { }) }); iterator_to_struct( + py, it, init_null_count, first_value, @@ -1648,6 +1656,7 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { call_lambda(py, lambda, python_series_wrapper).ok() }); iterator_to_struct( + py, it, init_null_count, first_value, @@ -1672,6 +1681,7 @@ impl<'a> ApplyLambda<'a> for ArrayChunked { }) }); iterator_to_struct( + py, it, init_null_count, first_value, @@ -2042,7 +2052,7 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { fn apply_into_struct( &'a self, - _py: Python, + py: Python, lambda: &Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, @@ -2056,6 +2066,7 @@ impl<'a> ApplyLambda<'a> for ObjectChunked { Some(out) }); iterator_to_struct( + py, it, init_null_count, first_value, @@ -2329,7 +2340,7 @@ impl<'a> ApplyLambda<'a> for StructChunked { fn apply_into_struct( &'a self, - _py: Python, + py: Python, lambda: &Bound<'a, PyAny>, init_null_count: usize, first_value: AnyValue<'a>, @@ -2340,6 +2351,7 @@ impl<'a> ApplyLambda<'a> for StructChunked { Some(out) }); iterator_to_struct( + py, it, init_null_count, first_value,