From 058491f60bb7cc4b51dc0429abc724512ff78878 Mon Sep 17 00:00:00 2001 From: Gijs Burghoorn Date: Thu, 14 Nov 2024 13:45:09 +0100 Subject: [PATCH] refactor: Migrate polars-expr AggregationContext to use `Column` (#19736) --- crates/polars-core/src/frame/column/mod.rs | 154 +++++++++++++++--- .../src/frame/column/partitioned.rs | 2 +- crates/polars-core/src/frame/column/scalar.rs | 5 + crates/polars-core/src/frame/column/series.rs | 71 ++++++++ crates/polars-core/src/frame/mod.rs | 14 +- crates/polars-core/src/scalar/mod.rs | 9 + .../src/expressions/aggregation.rs | 116 +++++++------ crates/polars-expr/src/expressions/alias.rs | 12 +- crates/polars-expr/src/expressions/apply.rs | 76 ++++----- crates/polars-expr/src/expressions/binary.rs | 30 ++-- crates/polars-expr/src/expressions/cast.rs | 10 +- crates/polars-expr/src/expressions/column.rs | 45 ++--- crates/polars-expr/src/expressions/count.rs | 4 +- crates/polars-expr/src/expressions/filter.rs | 2 +- crates/polars-expr/src/expressions/gather.rs | 28 ++-- .../polars-expr/src/expressions/group_iter.rs | 28 ++-- crates/polars-expr/src/expressions/literal.rs | 5 +- crates/polars-expr/src/expressions/mod.rs | 130 +++++++-------- crates/polars-expr/src/expressions/slice.rs | 16 +- crates/polars-expr/src/expressions/sort.rs | 2 +- crates/polars-expr/src/expressions/sortby.rs | 20 ++- crates/polars-expr/src/expressions/ternary.rs | 36 ++-- crates/polars-expr/src/expressions/window.rs | 22 +-- crates/polars-lazy/src/dsl/list.rs | 2 +- crates/polars-lazy/src/frame/pivot.rs | 2 +- .../polars-mem-engine/src/executors/filter.rs | 20 ++- .../src/executors/group_by.rs | 2 +- .../polars-mem-engine/src/executors/stack.rs | 14 +- crates/polars-ops/src/series/ops/index.rs | 11 +- 29 files changed, 531 insertions(+), 357 deletions(-) create mode 100644 crates/polars-core/src/frame/column/series.rs diff --git a/crates/polars-core/src/frame/column/mod.rs b/crates/polars-core/src/frame/column/mod.rs index d2eec86c1b15..cea56a2e87b7 100644 --- a/crates/polars-core/src/frame/column/mod.rs +++ b/crates/polars-core/src/frame/column/mod.rs @@ -1,5 +1,7 @@ use std::borrow::Cow; +use arrow::bitmap::MutableBitmap; +use arrow::trusted_len::TrustMyLength; use num_traits::{Num, NumCast}; use polars_error::PolarsResult; use polars_utils::index::check_bounds; @@ -8,6 +10,7 @@ pub use scalar::ScalarColumn; use self::gather::check_bounds_ca; use self::partitioned::PartitionedColumn; +use self::series::SeriesColumn; use crate::chunked_array::cast::CastOptions; use crate::chunked_array::metadata::{MetadataFlags, MetadataTrait}; use crate::datatypes::ReshapeDimension; @@ -20,6 +23,7 @@ mod arithmetic; mod compare; mod partitioned; mod scalar; +mod series; /// A column within a [`DataFrame`]. /// @@ -35,7 +39,7 @@ mod scalar; #[cfg_attr(feature = "serde", serde(from = "Series"))] #[cfg_attr(feature = "serde", serde(into = "_SerdeSeries"))] pub enum Column { - Series(Series), + Series(SeriesColumn), Partitioned(PartitionedColumn), Scalar(ScalarColumn), } @@ -47,12 +51,13 @@ pub trait IntoColumn: Sized { impl Column { #[inline] + #[track_caller] pub fn new(name: PlSmallStr, values: T) -> Self where Phantom: ?Sized, Series: NamedFrom, { - Self::Series(NamedFrom::new(name, values)) + Self::Series(SeriesColumn::new(NamedFrom::new(name, values))) } #[inline] @@ -95,7 +100,7 @@ impl Column { PartitionedColumn::new_empty(PlSmallStr::EMPTY, DataType::Null), ) .take_materialized_series(); - *self = Column::Series(series); + *self = Column::Series(series.into()); let Column::Series(s) = self else { unreachable!(); }; @@ -107,7 +112,7 @@ impl Column { ScalarColumn::new_empty(PlSmallStr::EMPTY, DataType::Null), ) .take_materialized_series(); - *self = Column::Series(series); + *self = Column::Series(series.into()); let Column::Series(s) = self else { unreachable!(); }; @@ -121,7 +126,7 @@ impl Column { #[inline] pub fn take_materialized_series(self) -> Series { match self { - Column::Series(s) => s, + Column::Series(s) => s.take(), Column::Partitioned(s) => s.take_materialized_series(), Column::Scalar(s) => s.take_materialized_series(), } @@ -586,13 +591,86 @@ impl Column { } } + /// General implementation for aggregation where a non-missing scalar would map to itself. + #[inline(always)] + #[cfg(any(feature = "algorithm_group_by", feature = "bitwise"))] + fn agg_with_unit_scalar( + &self, + groups: &GroupsProxy, + series_agg: impl Fn(&Series, &GroupsProxy) -> Series, + ) -> Column { + match self { + Column::Series(s) => series_agg(s, groups).into_column(), + // @partition-opt + Column::Partitioned(s) => series_agg(s.as_materialized_series(), groups).into_column(), + Column::Scalar(s) => { + if s.is_empty() { + return self.clone(); + } + + // We utilize the aggregation on Series to see: + // 1. the output datatype of the aggregation + // 2. whether this aggregation is even defined + let series_aggregation = series_agg( + &s.as_single_value_series(), + &GroupsProxy::Slice { + // @NOTE: this group is always valid since s is non-empty. + groups: vec![[0, 1]], + rolling: false, + }, + ); + + // If the aggregation is not defined, just return all nulls. + if series_aggregation.has_nulls() { + return Self::new_scalar( + series_aggregation.name().clone(), + Scalar::new(series_aggregation.dtype().clone(), AnyValue::Null), + groups.len(), + ); + } + + let mut scalar_col = s.resize(groups.len()); + // The aggregation might change the type (e.g. mean changes int -> float), so we do + // a cast here to the output type. + if series_aggregation.dtype() != s.dtype() { + scalar_col = scalar_col.cast(series_aggregation.dtype()).unwrap(); + } + + let Some(first_empty_idx) = groups.iter().position(|g| g.is_empty()) else { + // Fast path: no empty groups. keep the scalar intact. + return scalar_col.into_column(); + }; + + // All empty groups produce a *missing* or `null` value. + let mut validity = MutableBitmap::with_capacity(groups.len()); + validity.extend_constant(first_empty_idx, true); + // SAFETY: We trust the length of this iterator. + let iter = unsafe { + TrustMyLength::new( + groups.iter().skip(first_empty_idx).map(|g| !g.is_empty()), + groups.len() - first_empty_idx, + ) + }; + validity.extend_from_trusted_len_iter(iter); + let validity = validity.freeze(); + + let mut s = scalar_col.take_materialized_series().rechunk(); + // SAFETY: We perform a compute_len afterwards. + let chunks = unsafe { s.chunks_mut() }; + chunks[0].with_validity(Some(validity)); + s.compute_len(); + + s.into_column() + }, + } + } + /// # Safety /// /// Does no bounds checks, groups must be correct. #[cfg(feature = "algorithm_group_by")] pub unsafe fn agg_min(&self, groups: &GroupsProxy) -> Self { - // @scalar-opt - unsafe { self.as_materialized_series().agg_min(groups) }.into() + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_min(g) }) } /// # Safety @@ -600,8 +678,7 @@ impl Column { /// Does no bounds checks, groups must be correct. #[cfg(feature = "algorithm_group_by")] pub unsafe fn agg_max(&self, groups: &GroupsProxy) -> Self { - // @scalar-opt - unsafe { self.as_materialized_series().agg_max(groups) }.into() + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_max(g) }) } /// # Safety @@ -609,8 +686,7 @@ impl Column { /// Does no bounds checks, groups must be correct. #[cfg(feature = "algorithm_group_by")] pub unsafe fn agg_mean(&self, groups: &GroupsProxy) -> Self { - // @scalar-opt - unsafe { self.as_materialized_series().agg_mean(groups) }.into() + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_mean(g) }) } /// # Safety @@ -627,8 +703,7 @@ impl Column { /// Does no bounds checks, groups must be correct. #[cfg(feature = "algorithm_group_by")] pub unsafe fn agg_first(&self, groups: &GroupsProxy) -> Self { - // @scalar-opt - unsafe { self.as_materialized_series().agg_first(groups) }.into() + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_first(g) }) } /// # Safety @@ -636,8 +711,7 @@ impl Column { /// Does no bounds checks, groups must be correct. #[cfg(feature = "algorithm_group_by")] pub unsafe fn agg_last(&self, groups: &GroupsProxy) -> Self { - // @scalar-opt - unsafe { self.as_materialized_series().agg_last(groups) }.into() + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_last(g) }) } /// # Safety @@ -672,8 +746,7 @@ impl Column { /// Does no bounds checks, groups must be correct. #[cfg(feature = "algorithm_group_by")] pub unsafe fn agg_median(&self, groups: &GroupsProxy) -> Self { - // @scalar-opt - unsafe { self.as_materialized_series().agg_median(groups) }.into() + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_median(g) }) } /// # Safety @@ -689,7 +762,7 @@ impl Column { /// /// Does no bounds checks, groups must be correct. #[cfg(feature = "algorithm_group_by")] - pub(crate) unsafe fn agg_std(&self, groups: &GroupsProxy, ddof: u8) -> Self { + pub unsafe fn agg_std(&self, groups: &GroupsProxy, ddof: u8) -> Self { // @scalar-opt unsafe { self.as_materialized_series().agg_std(groups, ddof) }.into() } @@ -713,6 +786,30 @@ impl Column { unsafe { self.as_materialized_series().agg_valid_count(groups) }.into() } + /// # Safety + /// + /// Does no bounds checks, groups must be correct. + #[cfg(feature = "bitwise")] + pub fn agg_and(&self, groups: &GroupsProxy) -> Self { + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_and(g) }) + } + /// # Safety + /// + /// Does no bounds checks, groups must be correct. + #[cfg(feature = "bitwise")] + pub fn agg_or(&self, groups: &GroupsProxy) -> Self { + self.agg_with_unit_scalar(groups, |s, g| unsafe { s.agg_or(g) }) + } + /// # Safety + /// + /// Does no bounds checks, groups must be correct. + #[cfg(feature = "bitwise")] + pub fn agg_xor(&self, groups: &GroupsProxy) -> Self { + // @partition-opt + // @scalar-opt + unsafe { self.as_materialized_series().agg_xor(groups) }.into() + } + pub fn full_null(name: PlSmallStr, size: usize, dtype: &DataType) -> Self { Self::new_scalar(name, Scalar::new(dtype.clone(), AnyValue::Null), size) } @@ -877,6 +974,13 @@ impl Column { } } + /// Packs every element into a list. + pub fn as_list(&self) -> ListChunked { + // @scalar-opt + // @partition-opt + self.as_materialized_series().as_list() + } + pub fn is_sorted_flag(&self) -> IsSorted { // @scalar-opt self.as_materialized_series().is_sorted_flag() @@ -1105,19 +1209,25 @@ impl Column { pub fn try_add_owned(self, other: Self) -> PolarsResult { match (self, other) { - (Column::Series(lhs), Column::Series(rhs)) => lhs.try_add_owned(rhs).map(Column::from), + (Column::Series(lhs), Column::Series(rhs)) => { + lhs.take().try_add_owned(rhs.take()).map(Column::from) + }, (lhs, rhs) => lhs + rhs, } } pub fn try_sub_owned(self, other: Self) -> PolarsResult { match (self, other) { - (Column::Series(lhs), Column::Series(rhs)) => lhs.try_sub_owned(rhs).map(Column::from), + (Column::Series(lhs), Column::Series(rhs)) => { + lhs.take().try_sub_owned(rhs.take()).map(Column::from) + }, (lhs, rhs) => lhs - rhs, } } pub fn try_mul_owned(self, other: Self) -> PolarsResult { match (self, other) { - (Column::Series(lhs), Column::Series(rhs)) => lhs.try_mul_owned(rhs).map(Column::from), + (Column::Series(lhs), Column::Series(rhs)) => { + lhs.take().try_mul_owned(rhs.take()).map(Column::from) + }, (lhs, rhs) => lhs * rhs, } } @@ -1443,7 +1553,7 @@ impl From for Column { return Self::Scalar(ScalarColumn::unit_scalar_from_series(series)); } - Self::Series(series) + Self::Series(SeriesColumn::new(series)) } } diff --git a/crates/polars-core/src/frame/column/partitioned.rs b/crates/polars-core/src/frame/column/partitioned.rs index 16d4e9538634..93471c662d72 100644 --- a/crates/polars-core/src/frame/column/partitioned.rs +++ b/crates/polars-core/src/frame/column/partitioned.rs @@ -124,7 +124,7 @@ impl PartitionedColumn { fn _to_series(name: PlSmallStr, values: &Series, ends: &[IdxSize]) -> Series { let dtype = values.dtype(); - let mut column = Column::Series(Series::new_empty(name, dtype)); + let mut column = Column::Series(Series::new_empty(name, dtype).into()); let mut prev_offset = 0; for (i, &offset) in ends.iter().enumerate() { diff --git a/crates/polars-core/src/frame/column/scalar.rs b/crates/polars-core/src/frame/column/scalar.rs index e3d8105362c4..c08a9e3cfee0 100644 --- a/crates/polars-core/src/frame/column/scalar.rs +++ b/crates/polars-core/src/frame/column/scalar.rs @@ -284,6 +284,11 @@ impl ScalarColumn { self.scalar.update(AnyValue::Null); self } + + pub fn map_scalar(&mut self, map_scalar: impl Fn(Scalar) -> Scalar) { + self.scalar = map_scalar(std::mem::take(&mut self.scalar)); + self.materialized.take(); + } } impl IntoColumn for ScalarColumn { diff --git a/crates/polars-core/src/frame/column/series.rs b/crates/polars-core/src/frame/column/series.rs new file mode 100644 index 000000000000..c7f79906ea0d --- /dev/null +++ b/crates/polars-core/src/frame/column/series.rs @@ -0,0 +1,71 @@ +use std::ops::{Deref, DerefMut}; + +use super::Series; + +/// A very thin wrapper around [`Series`] that represents a [`Column`]ized version of [`Series`]. +/// +/// At the moment this just conditionally tracks where it was created so that materialization +/// problems can be tracked down. +#[derive(Debug, Clone)] +pub struct SeriesColumn { + inner: Series, + + #[cfg(debug_assertions)] + materialized_at: Option>, +} + +impl SeriesColumn { + #[track_caller] + pub fn new(series: Series) -> Self { + Self { + inner: series, + + #[cfg(debug_assertions)] + materialized_at: if std::env::var("POLARS_TRACK_SERIES_MATERIALIZATION").as_deref() + == Ok("1") + { + Some(std::sync::Arc::new( + std::backtrace::Backtrace::force_capture(), + )) + } else { + None + }, + } + } + + pub fn materialized_at(&self) -> Option<&std::backtrace::Backtrace> { + #[cfg(debug_assertions)] + { + self.materialized_at.as_ref().map(|v| v.as_ref()) + } + + #[cfg(not(debug_assertions))] + None + } + + pub fn take(self) -> Series { + self.inner + } +} + +impl From for SeriesColumn { + #[track_caller] + #[inline(always)] + fn from(value: Series) -> Self { + Self::new(value) + } +} + +impl Deref for SeriesColumn { + type Target = Series; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for SeriesColumn { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} diff --git a/crates/polars-core/src/frame/mod.rs b/crates/polars-core/src/frame/mod.rs index 0d8fef7f4c4a..6fed5c25071c 100644 --- a/crates/polars-core/src/frame/mod.rs +++ b/crates/polars-core/src/frame/mod.rs @@ -538,7 +538,7 @@ impl DataFrame { // Don't parallelize this. Memory overhead for s in &mut self.columns { if let Column::Series(s) = s { - *s = s.rechunk(); + *s = s.rechunk().into(); } } self @@ -2085,6 +2085,8 @@ impl DataFrame { let mut max_value_ca = StringChunkedBuilder::new(PlSmallStr::from_static("max_value"), num_columns); let mut distinct_count_ca: Vec> = Vec::with_capacity(num_columns); + let mut materialized_at_ca = + StringChunkedBuilder::new(PlSmallStr::from_static("materialized_at"), num_columns); for col in &self.columns { let metadata = col.get_metadata(); @@ -2099,10 +2101,10 @@ impl DataFrame { ) }); - let repr = match col { - Column::Series(_) => "series", - Column::Partitioned(_) => "partitioned", - Column::Scalar(_) => "scalar", + let (repr, materialized_at) = match col { + Column::Series(s) => ("series", s.materialized_at()), + Column::Partitioned(_) => ("partitioned", None), + Column::Scalar(_) => ("scalar", None), }; let sorted_asc = flags.contains(MetadataFlags::SORTED_ASC); let sorted_dsc = flags.contains(MetadataFlags::SORTED_DSC); @@ -2116,6 +2118,7 @@ impl DataFrame { min_value_ca.append_option(min_value.map(|v| v.as_any_value().to_string())); max_value_ca.append_option(max_value.map(|v| v.as_any_value().to_string())); distinct_count_ca.push(distinct_count); + materialized_at_ca.append_option(materialized_at.map(|v| format!("{v:#?}"))); } unsafe { @@ -2134,6 +2137,7 @@ impl DataFrame { &distinct_count_ca[..], ) .into_column(), + materialized_at_ca.finish().into_column(), ], ) } diff --git a/crates/polars-core/src/scalar/mod.rs b/crates/polars-core/src/scalar/mod.rs index 3e456837e534..7487603ff998 100644 --- a/crates/polars-core/src/scalar/mod.rs +++ b/crates/polars-core/src/scalar/mod.rs @@ -15,6 +15,15 @@ pub struct Scalar { value: AnyValue<'static>, } +impl Default for Scalar { + fn default() -> Self { + Self { + dtype: DataType::Null, + value: AnyValue::Null, + } + } +} + impl Scalar { #[inline(always)] pub fn new(dtype: DataType, value: AnyValue<'static>) -> Self { diff --git a/crates/polars-expr/src/expressions/aggregation.rs b/crates/polars-expr/src/expressions/aggregation.rs index fb691d746715..883598789622 100644 --- a/crates/polars-expr/src/expressions/aggregation.rs +++ b/crates/polars-expr/src/expressions/aggregation.rs @@ -206,7 +206,7 @@ impl PhysicalExpr for AggregationExpr { ) -> PolarsResult> { let mut ac = self.input.evaluate_on_groups(df, groups, state)?; // don't change names by aggregations as is done in polars-core - let keep_name = ac.series().name().clone(); + let keep_name = ac.get_values().name().clone(); polars_ensure!(!matches!(ac.agg_state(), AggState::Literal(_)), ComputeError: "cannot aggregate a literal"); if let AggregatedScalar(_) = ac.agg_state() { @@ -223,37 +223,37 @@ impl PhysicalExpr for AggregationExpr { let out = unsafe { match self.agg_type.groupby { GroupByMethod::Min => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = s.agg_min(&groups); - AggregatedScalar(agg_s.with_name(keep_name)) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = c.agg_min(&groups); + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::Max => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = s.agg_max(&groups); - AggregatedScalar(agg_s.with_name(keep_name)) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = c.agg_max(&groups); + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::Median => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = s.agg_median(&groups); - AggregatedScalar(agg_s.with_name(keep_name)) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = c.agg_median(&groups); + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::Mean => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = s.agg_mean(&groups); - AggregatedScalar(agg_s.with_name(keep_name)) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = c.agg_mean(&groups); + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::Sum => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = s.agg_sum(&groups); - AggregatedScalar(agg_s.with_name(keep_name)) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = c.agg_sum(&groups); + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::Count { include_nulls } => { - if include_nulls || ac.series().null_count() == 0 { + if include_nulls || ac.get_values().null_count() == 0 { // a few fast paths that prevent materializing new groups match ac.update_groups { UpdateGroups::WithSeriesLen => { let list = ac - .series() + .get_values() .list() .expect("impl error, should be a list at this point"); @@ -288,7 +288,7 @@ impl PhysicalExpr for AggregationExpr { }, }; s.rename(keep_name); - AggregatedScalar(s.into_series()) + AggregatedScalar(s.into_column()) }, UpdateGroups::WithGroupsLen => { // no need to update the groups @@ -296,20 +296,20 @@ impl PhysicalExpr for AggregationExpr { // not the correct order let mut ca = ac.groups.group_count(); ca.rename(keep_name); - AggregatedScalar(ca.into_series()) + AggregatedScalar(ca.into_column()) }, // materialize groups _ => { let mut ca = ac.groups().group_count(); ca.rename(keep_name); - AggregatedScalar(ca.into_series()) + AggregatedScalar(ca.into_column()) }, } } else { // TODO: optimize this/and write somewhere else. match ac.agg_state() { AggState::Literal(s) | AggState::AggregatedScalar(s) => { - AggregatedScalar(Series::new( + AggregatedScalar(Column::new( keep_name, [(s.len() as IdxSize - s.null_count() as IdxSize)], )) @@ -323,7 +323,7 @@ impl PhysicalExpr for AggregationExpr { .map(|s| s.len() as IdxSize - s.null_count() as IdxSize) }) .collect(); - AggregatedScalar(out.into_series().with_name(keep_name)) + AggregatedScalar(out.into_column().with_name(keep_name)) }, AggState::NotAggregated(s) => { let s = s.clone(); @@ -334,7 +334,9 @@ impl PhysicalExpr for AggregationExpr { match groups.as_ref() { GroupsProxy::Idx(idx) => { let s = s.rechunk(); - let array = &s.chunks()[0]; + // @scalar-opt + // @partition-opt + let array = &s.as_materialized_series().chunks()[0]; let validity = array.validity().unwrap(); idx.iter() .map(|(_, g)| { @@ -365,7 +367,7 @@ impl PhysicalExpr for AggregationExpr { }, } }; - AggregatedScalar(out.into_series()) + AggregatedScalar(out.into_column()) }, } } @@ -392,10 +394,10 @@ impl PhysicalExpr for AggregationExpr { // // if it is not, we traverse the groups and create // a list per group. - let s = match ac.agg_state() { + let c = match ac.agg_state() { // mean agg: // -> f64 -> list - AggState::AggregatedScalar(s) => s + AggState::AggregatedScalar(c) => c .reshape_list(&[ ReshapeDimension::Infer, ReshapeDimension::new_dimension(1), @@ -403,25 +405,25 @@ impl PhysicalExpr for AggregationExpr { .unwrap(), _ => { let agg = ac.aggregated(); - agg.as_list().into_series() + agg.as_list().into_column() }, }; - AggregatedList(s.with_name(keep_name)) + AggregatedList(c.with_name(keep_name)) }, GroupByMethod::Groups => { let mut column: ListChunked = ac.groups().as_list_chunked(); column.rename(keep_name); - AggregatedScalar(column.into_series()) + AggregatedScalar(column.into_column()) }, GroupByMethod::Std(ddof) => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = s.agg_std(&groups, ddof); - AggregatedScalar(agg_s.with_name(keep_name)) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = c.agg_std(&groups, ddof); + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::Var(ddof) => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = s.agg_var(&groups, ddof); - AggregatedScalar(agg_s.with_name(keep_name)) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = c.agg_var(&groups, ddof); + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::Quantile(_, _) => { // implemented explicitly in AggQuantile struct @@ -429,24 +431,28 @@ impl PhysicalExpr for AggregationExpr { }, #[cfg(feature = "bitwise")] GroupByMethod::Bitwise(f) => { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = match f { - GroupByBitwiseMethod::And => s.agg_and(&groups), - GroupByBitwiseMethod::Or => s.agg_or(&groups), - GroupByBitwiseMethod::Xor => s.agg_xor(&groups), + let (c, groups) = ac.get_final_aggregation(); + let agg_c = match f { + GroupByBitwiseMethod::And => c.agg_and(&groups), + GroupByBitwiseMethod::Or => c.agg_or(&groups), + GroupByBitwiseMethod::Xor => c.agg_xor(&groups), }; - AggregatedScalar(agg_s.with_name(keep_name)) + AggregatedScalar(agg_c.with_name(keep_name)) }, GroupByMethod::NanMin => { #[cfg(feature = "propagate_nans")] { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = if s.dtype().is_float() { - nan_propagating_aggregate::group_agg_nan_min_s(&s, &groups) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = if c.dtype().is_float() { + nan_propagating_aggregate::group_agg_nan_min_s( + c.as_materialized_series(), + &groups, + ) + .into_column() } else { - s.agg_min(&groups) + c.agg_min(&groups) }; - AggregatedScalar(agg_s.with_name(keep_name)) + AggregatedScalar(agg_c.with_name(keep_name)) } #[cfg(not(feature = "propagate_nans"))] { @@ -456,13 +462,17 @@ impl PhysicalExpr for AggregationExpr { GroupByMethod::NanMax => { #[cfg(feature = "propagate_nans")] { - let (s, groups) = ac.get_final_aggregation(); - let agg_s = if s.dtype().is_float() { - nan_propagating_aggregate::group_agg_nan_max_s(&s, &groups) + let (c, groups) = ac.get_final_aggregation(); + let agg_c = if c.dtype().is_float() { + nan_propagating_aggregate::group_agg_nan_max_s( + c.as_materialized_series(), + &groups, + ) + .into_column() } else { - s.agg_max(&groups) + c.agg_max(&groups) }; - AggregatedScalar(agg_s.with_name(keep_name)) + AggregatedScalar(agg_c.with_name(keep_name)) } #[cfg(not(feature = "propagate_nans"))] { @@ -757,7 +767,7 @@ impl PhysicalExpr for AggQuantileExpr { ) -> PolarsResult> { let mut ac = self.input.evaluate_on_groups(df, groups, state)?; // don't change names by aggregations as is done in polars-core - let keep_name = ac.series().name().clone(); + let keep_name = ac.get_values().name().clone(); let quantile = self.get_quantile(df, state)?; diff --git a/crates/polars-expr/src/expressions/alias.rs b/crates/polars-expr/src/expressions/alias.rs index f2065289e1ae..131d2ca2f16c 100644 --- a/crates/polars-expr/src/expressions/alias.rs +++ b/crates/polars-expr/src/expressions/alias.rs @@ -48,17 +48,13 @@ impl PhysicalExpr for AliasExpr { state: &ExecutionState, ) -> PolarsResult> { let mut ac = self.physical_expr.evaluate_on_groups(df, groups, state)?; - let s = ac.take(); - let s = self.finish(s.into()); + let c = ac.take(); + let c = self.finish(c); if ac.is_literal() { - ac.with_literal(s.take_materialized_series()); + ac.with_literal(c); } else { - ac.with_series( - s.take_materialized_series(), - ac.is_aggregated(), - Some(&self.expr), - )?; + ac.with_values(c, ac.is_aggregated(), Some(&self.expr))?; } Ok(ac) } diff --git a/crates/polars-expr/src/expressions/apply.rs b/crates/polars-expr/src/expressions/apply.rs index f8e2619c4153..c03511a64734 100644 --- a/crates/polars-expr/src/expressions/apply.rs +++ b/crates/polars-expr/src/expressions/apply.rs @@ -92,11 +92,11 @@ impl ApplyExpr { let all_unit_len = all_unit_length(&ca); if all_unit_len && self.function_returns_scalar { ac.with_agg_state(AggState::AggregatedScalar( - ca.explode().unwrap().into_series(), + ca.explode().unwrap().into_column(), )); ac.with_update_groups(UpdateGroups::No); } else { - ac.with_series(ca.into_series(), true, Some(&self.expr))?; + ac.with_values(ca.into_column(), true, Some(&self.expr))?; ac.with_update_groups(UpdateGroups::WithSeriesLen); } @@ -120,7 +120,7 @@ impl ApplyExpr { &self, mut ac: AggregationContext<'a>, ) -> PolarsResult> { - let s = ac.series(); + let s = ac.get_values(); polars_ensure!( !matches!(ac.agg_state(), AggState::AggregatedScalar(_)), @@ -131,7 +131,7 @@ impl ApplyExpr { let name = s.name().clone(); let agg = ac.aggregated(); // Collection of empty list leads to a null dtype. See: #3687. - if agg.len() == 0 { + if agg.is_empty() { // Create input for the function to determine the output dtype, see #3946. let agg = agg.list().unwrap(); let input_dtype = agg.inner_dtype(); @@ -199,35 +199,28 @@ impl ApplyExpr { &self, mut ac: AggregationContext<'a>, ) -> PolarsResult> { - let (s, aggregated) = match ac.agg_state() { - AggState::AggregatedList(s) => { - let ca = s.list().unwrap(); + let (c, aggregated) = match ac.agg_state() { + AggState::AggregatedList(c) => { + let ca = c.list().unwrap(); let out = ca.apply_to_inner(&|s| { - self.eval_and_flatten(&mut [s.into()]) - .map(|c| c.as_materialized_series().clone()) + Ok(self + .eval_and_flatten(&mut [s.into_column()])? + .take_materialized_series()) })?; - (out.into_series(), true) + (out.into_column(), true) }, - AggState::NotAggregated(s) => { - let (out, aggregated) = ( - self.eval_and_flatten(&mut [s.clone().into()])? - .as_materialized_series() - .clone(), - false, - ); - check_map_output_len(s.len(), out.len(), &self.expr)?; + AggState::NotAggregated(c) => { + let (out, aggregated) = (self.eval_and_flatten(&mut [c.clone()])?, false); + check_map_output_len(c.len(), out.len(), &self.expr)?; (out, aggregated) }, agg_state => { - ac.with_agg_state(agg_state.try_map(|s| { - self.eval_and_flatten(&mut [s.clone().into()]) - .map(|c| c.as_materialized_series().clone()) - })?); + ac.with_agg_state(agg_state.try_map(|s| self.eval_and_flatten(&mut [s.clone()]))?); return Ok(ac); }, }; - ac.with_series_and_args(s, aggregated, Some(&self.expr), true)?; + ac.with_values_and_args(c, aggregated, Some(&self.expr), true)?; Ok(ac) } fn apply_multiple_group_aware<'a>( @@ -385,11 +378,8 @@ impl PhysicalExpr for ApplyExpr { match self.collect_groups { ApplyOptions::ApplyList => { - let s = self - .eval_and_flatten(&mut [ac.aggregated().into()])? - .as_materialized_series() - .clone(); - ac.with_series(s, true, Some(&self.expr))?; + let c = self.eval_and_flatten(&mut [ac.aggregated()])?; + ac.with_values(c, true, Some(&self.expr))?; Ok(ac) }, ApplyOptions::GroupWise => self.apply_single_group_aware(ac), @@ -400,18 +390,12 @@ impl PhysicalExpr for ApplyExpr { match self.collect_groups { ApplyOptions::ApplyList => { - let mut s = acs - .iter_mut() - .map(|ac| ac.aggregated().into()) - .collect::>(); - let s = self - .eval_and_flatten(&mut s)? - .as_materialized_series() - .clone(); + let mut c = acs.iter_mut().map(|ac| ac.aggregated()).collect::>(); + let c = self.eval_and_flatten(&mut c)?; // take the first aggregation context that as that is the input series let mut ac = acs.swap_remove(0); ac.with_update_groups(UpdateGroups::WithGroupsLen); - ac.with_series(s, true, Some(&self.expr))?; + ac.with_values(c, true, Some(&self.expr))?; Ok(ac) }, ApplyOptions::GroupWise => self.apply_multiple_group_aware(acs, df), @@ -487,7 +471,7 @@ fn apply_multiple_elementwise<'a>( let other = acs[1..] .iter() - .map(|ac| ac.flat_naive().into_owned().into()) + .map(|ac| ac.flat_naive().into_owned()) .collect::>(); let out = ca.apply_to_inner(&|s| { @@ -501,14 +485,14 @@ fn apply_multiple_elementwise<'a>( .clone()) })?; let mut ac = acs.swap_remove(0); - ac.with_series(out.into_series(), true, None)?; + ac.with_values(out.into_column(), true, None)?; Ok(ac) }, first_as => { let check_lengths = check_lengths && !matches!(first_as, AggState::Literal(_)); let aggregated = acs.iter().all(|ac| ac.is_aggregated() | ac.is_literal()) && acs.iter().any(|ac| ac.is_aggregated()); - let mut s = acs + let mut c = acs .iter_mut() .enumerate() .map(|(i, ac)| { @@ -523,19 +507,15 @@ fn apply_multiple_elementwise<'a>( .map(Column::from) .collect::>(); - let input_len = s[0].len(); - let s = function - .call_udf(&mut s)? - .unwrap() - .as_materialized_series() - .clone(); + let input_len = c[0].len(); + let c = function.call_udf(&mut c)?.unwrap(); if check_lengths { - check_map_output_len(input_len, s.len(), expr)?; + check_map_output_len(input_len, c.len(), expr)?; } // Take the first aggregation context that as that is the input series. let mut ac = acs.swap_remove(0); - ac.with_series_and_args(s, aggregated, None, true)?; + ac.with_values_and_args(c, aggregated, None, true)?; Ok(ac) }, } diff --git a/crates/polars-expr/src/expressions/binary.rs b/crates/polars-expr/src/expressions/binary.rs index 10f217844ab1..0976afc5e608 100644 --- a/crates/polars-expr/src/expressions/binary.rs +++ b/crates/polars-expr/src/expressions/binary.rs @@ -121,14 +121,14 @@ impl BinaryExpr { aggregated: bool, ) -> PolarsResult> { // We want to be able to mutate in place, so we take the lhs to make sure that we drop. - let lhs = ac_l.series().clone(); - let rhs = ac_r.series().clone(); + let lhs = ac_l.get_values().clone(); + let rhs = ac_r.get_values().clone(); // Drop lhs so that we might operate in place. drop(ac_l.take()); - let out = apply_operator_owned(lhs.into_column(), rhs.into_column(), self.op)?; - ac_l.with_series(out.take_materialized_series(), aggregated, Some(&self.expr))?; + let out = apply_operator_owned(lhs, rhs, self.op)?; + ac_l.with_values(out, aggregated, Some(&self.expr))?; Ok(ac_l) } @@ -137,20 +137,20 @@ impl BinaryExpr { mut ac_l: AggregationContext<'a>, mut ac_r: AggregationContext<'a>, ) -> PolarsResult> { - let name = ac_l.series().name().clone(); + let name = ac_l.get_values().name().clone(); ac_l.groups(); ac_r.groups(); polars_ensure!(ac_l.groups.len() == ac_r.groups.len(), ComputeError: "lhs and rhs should have same group length"); - let left_s = ac_l.series().rechunk().into_column(); - let right_s = ac_r.series().rechunk().into_column(); - let res_s = apply_operator(&left_s, &right_s, self.op)?; + let left_c = ac_l.get_values().rechunk().into_column(); + let right_c = ac_r.get_values().rechunk().into_column(); + let res_c = apply_operator(&left_c, &right_c, self.op)?; ac_l.with_update_groups(UpdateGroups::WithSeriesLen); - let res_s = if res_s.len() == 1 { - res_s.new_from_index(0, ac_l.groups.len()) + let res_s = if res_c.len() == 1 { + res_c.new_from_index(0, ac_l.groups.len()) } else { - ListChunked::full(name, res_s.as_materialized_series(), ac_l.groups.len()).into_column() + ListChunked::full(name, res_c.as_materialized_series(), ac_l.groups.len()).into_column() }; - ac_l.with_series(res_s.take_materialized_series(), true, Some(&self.expr))?; + ac_l.with_values(res_s, true, Some(&self.expr))?; Ok(ac_l) } @@ -159,7 +159,7 @@ impl BinaryExpr { mut ac_l: AggregationContext<'a>, mut ac_r: AggregationContext<'a>, ) -> PolarsResult> { - let name = ac_l.series().name().clone(); + let name = ac_l.get_values().name().clone(); let ca = ac_l .iter_groups(false) .zip(ac_r.iter_groups(false)) @@ -175,7 +175,7 @@ impl BinaryExpr { .with_name(name); ac_l.with_update_groups(UpdateGroups::WithSeriesLen); - ac_l.with_agg_state(AggState::AggregatedList(ca.into_series())); + ac_l.with_agg_state(AggState::AggregatedList(ca.into_column())); Ok(ac_l) } } @@ -260,7 +260,7 @@ impl PhysicalExpr for BinaryExpr { apply_operator(&lhs.into_column(), &rhs.get_inner().into_column(), self.op) .map(|c| c.take_materialized_series()) })?; - ac_l.with_series(out.into_series(), true, Some(&self.expr))?; + ac_l.with_values(out.into_column(), true, Some(&self.expr))?; Ok(ac_l) }, _ => self.apply_group_aware(ac_l, ac_r), diff --git a/crates/polars-expr/src/expressions/cast.rs b/crates/polars-expr/src/expressions/cast.rs index 95f0c9eebee5..1bc230ceab8f 100644 --- a/crates/polars-expr/src/expressions/cast.rs +++ b/crates/polars-expr/src/expressions/cast.rs @@ -59,14 +59,14 @@ impl PhysicalExpr for CastExpr { self.finish(&s.into_column()) .map(|c| c.take_materialized_series()) })?; - ac.with_series(casted.into_series(), true, None)?; + ac.with_values(casted.into_column(), true, None)?; }, AggState::AggregatedScalar(s) => { let s = self.finish(&s.clone().into_column())?; if ac.is_literal() { - ac.with_literal(s.take_materialized_series()); + ac.with_literal(s); } else { - ac.with_series(s.take_materialized_series(), true, None)?; + ac.with_values(s, true, None)?; } }, _ => { @@ -77,9 +77,9 @@ impl PhysicalExpr for CastExpr { let s = self.finish(&s.as_ref().clone().into_column())?; if ac.is_literal() { - ac.with_literal(s.take_materialized_series()); + ac.with_literal(s); } else { - ac.with_series(s.take_materialized_series(), false, None)?; + ac.with_values(s, false, None)?; } }, } diff --git a/crates/polars-expr/src/expressions/column.rs b/crates/polars-expr/src/expressions/column.rs index 2142d22df6d9..99b5ba9fe262 100644 --- a/crates/polars-expr/src/expressions/column.rs +++ b/crates/polars-expr/src/expressions/column.rs @@ -21,9 +21,9 @@ impl ColumnExpr { impl ColumnExpr { fn check_external_context( &self, - out: PolarsResult, + out: PolarsResult, state: &ExecutionState, - ) -> PolarsResult { + ) -> PolarsResult { match out { Ok(col) => Ok(col), Err(e) => { @@ -33,7 +33,7 @@ impl ColumnExpr { for df in state.ext_contexts.as_ref() { let out = df.column(&self.name); if out.is_ok() { - return out.map(Column::as_materialized_series).cloned(); + return out.cloned(); } } Err(e) @@ -44,12 +44,12 @@ impl ColumnExpr { fn process_by_idx( &self, - out: &Series, + out: &Column, _state: &ExecutionState, _schema: &Schema, df: &DataFrame, check_state_schema: bool, - ) -> PolarsResult { + ) -> PolarsResult { if out.name() != &*self.name { if check_state_schema { if let Some(schema) = _state.get_schema() { @@ -75,9 +75,7 @@ impl ColumnExpr { // in release we fallback to linear search #[allow(unreachable_code)] { - df.column(&self.name) - .map(Column::as_materialized_series) - .cloned() + df.column(&self.name).cloned() } } else { Ok(out.clone()) @@ -88,7 +86,7 @@ impl ColumnExpr { df: &DataFrame, _state: &ExecutionState, _panic_during_test: bool, - ) -> PolarsResult { + ) -> PolarsResult { #[cfg(feature = "panic_on_schema")] { if _panic_during_test @@ -100,9 +98,7 @@ impl ColumnExpr { } // in release we fallback to linear search #[allow(unreachable_code)] - df.column(&self.name) - .map(Column::as_materialized_series) - .cloned() + df.column(&self.name).cloned() } fn process_from_state_schema( @@ -110,19 +106,17 @@ impl ColumnExpr { df: &DataFrame, state: &ExecutionState, schema: &Schema, - ) -> PolarsResult { + ) -> PolarsResult { match schema.get_full(&self.name) { None => self.process_by_linear_search(df, state, true), Some((idx, _, _)) => match df.get_columns().get(idx) { - Some(out) => { - self.process_by_idx(out.as_materialized_series(), state, schema, df, false) - }, + Some(out) => self.process_by_idx(out, state, schema, df, false), None => self.process_by_linear_search(df, state, true), }, } } - fn process_cse(&self, df: &DataFrame, schema: &Schema) -> PolarsResult { + fn process_cse(&self, df: &DataFrame, schema: &Schema) -> PolarsResult { // The CSE columns are added on the rhs. let offset = schema.len(); let columns = &df.get_columns()[offset..]; @@ -131,7 +125,6 @@ impl ColumnExpr { .iter() .find(|s| s.name() == &self.name) .unwrap() - .as_materialized_series() .clone()) } } @@ -146,13 +139,7 @@ impl PhysicalExpr for ColumnExpr { // check if the schema was correct // if not do O(n) search match df.get_columns().get(idx) { - Some(out) => self.process_by_idx( - out.as_materialized_series(), - state, - &self.schema, - df, - true, - ), + Some(out) => self.process_by_idx(out, state, &self.schema, df, true), None => { // partitioned group_by special case if let Some(schema) = state.get_schema() { @@ -183,12 +170,8 @@ impl PhysicalExpr for ColumnExpr { groups: &'a GroupsProxy, state: &ExecutionState, ) -> PolarsResult> { - let s = self.evaluate(df, state)?; - Ok(AggregationContext::new( - s.take_materialized_series(), - Cow::Borrowed(groups), - false, - )) + let c = self.evaluate(df, state)?; + Ok(AggregationContext::new(c, Cow::Borrowed(groups), false)) } fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> { diff --git a/crates/polars-expr/src/expressions/count.rs b/crates/polars-expr/src/expressions/count.rs index 6102caf5a354..db25f0d9e73b 100644 --- a/crates/polars-expr/src/expressions/count.rs +++ b/crates/polars-expr/src/expressions/count.rs @@ -32,8 +32,8 @@ impl PhysicalExpr for CountExpr { _state: &ExecutionState, ) -> PolarsResult> { let ca = groups.group_count().with_name(PlSmallStr::from_static(LEN)); - let s = ca.into_series(); - Ok(AggregationContext::new(s, Cow::Borrowed(groups), true)) + let c = ca.into_column(); + Ok(AggregationContext::new(c, Cow::Borrowed(groups), true)) } fn to_field(&self, _input_schema: &Schema) -> PolarsResult { diff --git a/crates/polars-expr/src/expressions/filter.rs b/crates/polars-expr/src/expressions/filter.rs index 6f847a7fa8ed..f2b1383059ee 100644 --- a/crates/polars-expr/src/expressions/filter.rs +++ b/crates/polars-expr/src/expressions/filter.rs @@ -73,7 +73,7 @@ impl PhysicalExpr for FilterExpr { .with_name(s.name().clone()) } }; - ac_s.with_series(out.into_series(), true, Some(&self.expr))?; + ac_s.with_values(out.into_column(), true, Some(&self.expr))?; ac_s.update_groups = WithSeriesLen; Ok(ac_s) } else { diff --git a/crates/polars-expr/src/expressions/gather.rs b/crates/polars-expr/src/expressions/gather.rs index 19a0e35ff315..5c0ccae4f2bc 100644 --- a/crates/polars-expr/src/expressions/gather.rs +++ b/crates/polars-expr/src/expressions/gather.rs @@ -2,7 +2,7 @@ use arrow::legacy::utils::CustomIterTools; use polars_core::chunked_array::builder::get_list_builder; use polars_core::prelude::*; use polars_core::utils::NoNull; -use polars_ops::prelude::{convert_to_unsigned_index, is_positive_idx_uncertain}; +use polars_ops::prelude::{convert_to_unsigned_index, is_positive_idx_uncertain_col}; use super::*; use crate::expressions::{AggState, AggregationContext, PhysicalExpr, UpdateGroups}; @@ -33,14 +33,14 @@ impl PhysicalExpr for GatherExpr { let mut ac = self.phys_expr.evaluate_on_groups(df, groups, state)?; let mut idx = self.idx.evaluate_on_groups(df, groups, state)?; - let s_idx = idx.series(); - match s_idx.dtype() { + let c_idx = idx.get_values(); + match c_idx.dtype() { DataType::List(inner) => { polars_ensure!(inner.is_integer(), InvalidOperation: "expected numeric dtype as index, got {:?}", inner) }, dt if dt.is_integer() => { // Unsigned integers will fall through and will use faster paths. - if !is_positive_idx_uncertain(s_idx) { + if !is_positive_idx_uncertain_col(c_idx) { return self.process_negative_indices_agg(ac, idx, groups); } }, @@ -80,10 +80,10 @@ impl PhysicalExpr for GatherExpr { .map(|(s, idx)| Some(s?.as_ref().take(idx?.as_ref().idx().unwrap()))) .map(|opt_res| opt_res.transpose()) .collect::>()? - .with_name(ac.series().name().clone()) + .with_name(ac.get_values().name().clone()) }; - ac.with_series(taken.into_series(), true, Some(&self.expr))?; + ac.with_values(taken.into_column(), true, Some(&self.expr))?; ac.with_update_groups(UpdateGroups::WithSeriesLen); Ok(ac) } @@ -162,10 +162,10 @@ impl GatherExpr { let taken = if self.returns_scalar { taken } else { - taken.as_list().into_series() + taken.as_list().into_column() }; - ac.with_series(taken, true, Some(&self.expr))?; + ac.with_values(taken, true, Some(&self.expr))?; Ok(ac) } else { self.gather_aggregated_expensive(ac, idx) @@ -183,7 +183,7 @@ impl GatherExpr { .unwrap() .try_apply_amortized(|s| s.as_ref().take(idx))?; - ac.with_series(out.into_series(), true, Some(&self.expr))?; + ac.with_values(out.into_column(), true, Some(&self.expr))?; ac.with_update_groups(UpdateGroups::WithGroupsLen); Ok(ac) } @@ -228,10 +228,10 @@ impl GatherExpr { let taken = if self.returns_scalar { taken } else { - taken.as_list().into_series() + taken.as_list().into_column() }; - ac.with_series(taken, true, Some(&self.expr))?; + ac.with_values(taken, true, Some(&self.expr))?; ac.with_update_groups(UpdateGroups::WithGroupsLen); Ok(ac) }, @@ -249,9 +249,9 @@ impl GatherExpr { ) -> PolarsResult> { let mut builder = get_list_builder( &ac.dtype(), - idx.series().len(), + idx.get_values().len(), groups.len(), - ac.series().name().clone(), + ac.get_values().name().clone(), ); let iter = ac.iter_groups(false).zip(idx.iter_groups(false)); @@ -265,7 +265,7 @@ impl GatherExpr { _ => builder.append_null(), }; } - let out = builder.finish().into_series(); + let out = builder.finish().into_column(); ac.with_agg_state(AggState::AggregatedList(out)); Ok(ac) } diff --git a/crates/polars-expr/src/expressions/group_iter.rs b/crates/polars-expr/src/expressions/group_iter.rs index b42851e49d2a..31a694fe4a86 100644 --- a/crates/polars-expr/src/expressions/group_iter.rs +++ b/crates/polars-expr/src/expressions/group_iter.rs @@ -12,45 +12,45 @@ impl AggregationContext<'_> { match self.agg_state() { AggState::Literal(_) => { self.groups(); - let s = self.series().rechunk(); + let c = self.get_values().rechunk(); let name = if keep_names { - s.name().clone() + c.name().clone() } else { PlSmallStr::EMPTY }; // SAFETY: dtype is correct unsafe { Box::new(LitIter::new( - s.array_ref(0).clone(), + c.as_materialized_series().array_ref(0).clone(), self.groups.len(), - s._dtype(), + c.dtype(), name, )) } }, AggState::AggregatedScalar(_) => { self.groups(); - let s = self.series(); + let c = self.get_values(); let name = if keep_names { - s.name().clone() + c.name().clone() } else { PlSmallStr::EMPTY }; // SAFETY: dtype is correct unsafe { Box::new(FlatIter::new( - s.chunks(), + c.as_materialized_series().chunks(), self.groups.len(), - s.dtype(), + c.dtype(), name, )) } }, AggState::AggregatedList(_) => { - let s = self.series(); - let list = s.list().unwrap(); + let c = self.get_values(); + let list = c.list().unwrap(); let name = if keep_names { - s.name().clone() + c.name().clone() } else { PlSmallStr::EMPTY }; @@ -59,10 +59,10 @@ impl AggregationContext<'_> { AggState::NotAggregated(_) => { // we don't take the owned series as we want a reference let _ = self.aggregated(); - let s = self.series(); - let list = s.list().unwrap(); + let c = self.get_values(); + let list = c.list().unwrap(); let name = if keep_names { - s.name().clone() + c.name().clone() } else { PlSmallStr::EMPTY }; diff --git a/crates/polars-expr/src/expressions/literal.rs b/crates/polars-expr/src/expressions/literal.rs index 0ab9ad9872b3..e2ea2f6d0f90 100644 --- a/crates/polars-expr/src/expressions/literal.rs +++ b/crates/polars-expr/src/expressions/literal.rs @@ -139,10 +139,7 @@ impl PhysicalExpr for LiteralExpr { state: &ExecutionState, ) -> PolarsResult> { let s = self.evaluate(df, state)?; - Ok(AggregationContext::from_literal( - s.take_materialized_series(), - Cow::Borrowed(groups), - )) + Ok(AggregationContext::from_literal(s, Cow::Borrowed(groups))) } fn as_partitioned_aggregator(&self) -> Option<&dyn PartitionedAggregation> { diff --git a/crates/polars-expr/src/expressions/mod.rs b/crates/polars-expr/src/expressions/mod.rs index 277afddb41f2..70963dde7eec 100644 --- a/crates/polars-expr/src/expressions/mod.rs +++ b/crates/polars-expr/src/expressions/mod.rs @@ -48,28 +48,28 @@ use crate::state::ExecutionState; #[derive(Clone, Debug)] pub enum AggState { - /// Already aggregated: `.agg_list(group_tuples`) is called + /// Already aggregated: `.agg_list(group_tuples)` is called /// and produced a `Series` of dtype `List` - AggregatedList(Series), + AggregatedList(Column), /// Already aggregated: `.agg` is called on an aggregation /// that produces a scalar. /// think of `sum`, `mean`, `variance` like aggregations. - AggregatedScalar(Series), + AggregatedScalar(Column), /// Not yet aggregated: `agg_list` still has to be called. - NotAggregated(Series), - Literal(Series), + NotAggregated(Column), + Literal(Column), } impl AggState { fn try_map(&self, func: F) -> PolarsResult where - F: FnOnce(&Series) -> PolarsResult, + F: FnOnce(&Column) -> PolarsResult, { Ok(match self { - AggState::AggregatedList(s) => AggState::AggregatedList(func(s)?), - AggState::AggregatedScalar(s) => AggState::AggregatedScalar(func(s)?), - AggState::Literal(s) => AggState::Literal(func(s)?), - AggState::NotAggregated(s) => AggState::NotAggregated(func(s)?), + AggState::AggregatedList(c) => AggState::AggregatedList(func(c)?), + AggState::AggregatedScalar(c) => AggState::AggregatedScalar(func(c)?), + AggState::Literal(c) => AggState::Literal(func(c)?), + AggState::NotAggregated(c) => AggState::NotAggregated(func(c)?), }) } } @@ -152,14 +152,14 @@ impl<'a> AggregationContext<'a> { self.update_groups = UpdateGroups::No; }, UpdateGroups::WithSeriesLen => { - let s = self.series().clone(); - self.det_groups_from_list(&s); + let s = self.get_values().clone(); + self.det_groups_from_list(s.as_materialized_series()); }, } &self.groups } - pub(crate) fn series(&self) -> &Series { + pub(crate) fn get_values(&self) -> &Column { match &self.state { AggState::NotAggregated(s) | AggState::AggregatedScalar(s) @@ -191,20 +191,20 @@ impl<'a> AggregationContext<'a> { /// - `aggregated` sets if the Series is a list due to aggregation (could also be a list because its /// the columns dtype) fn new( - series: Series, + column: Column, groups: Cow<'a, GroupsProxy>, aggregated: bool, ) -> AggregationContext<'a> { - let series = match (aggregated, series.dtype()) { + let series = match (aggregated, column.dtype()) { (true, &DataType::List(_)) => { - assert_eq!(series.len(), groups.len()); - AggState::AggregatedList(series) + assert_eq!(column.len(), groups.len()); + AggState::AggregatedList(column) }, (true, _) => { - assert_eq!(series.len(), groups.len()); - AggState::AggregatedScalar(series) + assert_eq!(column.len(), groups.len()); + AggState::AggregatedScalar(column) }, - _ => AggState::NotAggregated(series), + _ => AggState::NotAggregated(column), }; Self { @@ -230,7 +230,7 @@ impl<'a> AggregationContext<'a> { } } - fn from_literal(lit: Series, groups: Cow<'a, GroupsProxy>) -> AggregationContext<'a> { + fn from_literal(lit: Column, groups: Cow<'a, GroupsProxy>) -> AggregationContext<'a> { Self { state: AggState::Literal(lit), groups, @@ -283,7 +283,7 @@ impl<'a> AggregationContext<'a> { }, _ => { let groups = { - self.series() + self.get_values() .list() .expect("impl error, should be a list at this point") .amortized_iter() @@ -312,27 +312,27 @@ impl<'a> AggregationContext<'a> { /// # Arguments /// - `aggregated` sets if the Series is a list due to aggregation (could also be a list because its /// the columns dtype) - pub(crate) fn with_series( + pub(crate) fn with_values( &mut self, - series: Series, + column: Column, aggregated: bool, expr: Option<&Expr>, ) -> PolarsResult<&mut Self> { - self.with_series_and_args(series, aggregated, expr, false) + self.with_values_and_args(column, aggregated, expr, false) } - pub(crate) fn with_series_and_args( + pub(crate) fn with_values_and_args( &mut self, - series: Series, + column: Column, aggregated: bool, expr: Option<&Expr>, // if the applied function was a `map` instead of an `apply` // this will keep functions applied over literals as literals: F(lit) = lit mapped: bool, ) -> PolarsResult<&mut Self> { - self.state = match (aggregated, series.dtype()) { + self.state = match (aggregated, column.dtype()) { (true, &DataType::List(_)) => { - if series.len() != self.groups.len() { + if column.len() != self.groups.len() { let fmt_expr = if let Some(e) = expr { format!("'{e:?}' ") } else { @@ -342,30 +342,30 @@ impl<'a> AggregationContext<'a> { ComputeError: "aggregation expression '{}' produced a different number of elements: {} \ than the number of groups: {} (this is likely invalid)", - fmt_expr, series.len(), self.groups.len(), + fmt_expr, column.len(), self.groups.len(), ); } - AggState::AggregatedList(series) + AggState::AggregatedList(column) }, - (true, _) => AggState::AggregatedScalar(series), + (true, _) => AggState::AggregatedScalar(column), _ => { match self.state { // already aggregated to sum, min even this series was flattened it never could // retrieve the length before grouping, so it stays in this state. - AggState::AggregatedScalar(_) => AggState::AggregatedScalar(series), + AggState::AggregatedScalar(_) => AggState::AggregatedScalar(column), // applying a function on a literal, keeps the literal state - AggState::Literal(_) if series.len() == 1 && mapped => { - AggState::Literal(series) + AggState::Literal(_) if column.len() == 1 && mapped => { + AggState::Literal(column) }, - _ => AggState::NotAggregated(series), + _ => AggState::NotAggregated(column.into_column()), } }, }; Ok(self) } - pub(crate) fn with_literal(&mut self, series: Series) -> &mut Self { - self.state = AggState::Literal(series); + pub(crate) fn with_literal(&mut self, column: Column) -> &mut Self { + self.state = AggState::Literal(column); self } @@ -373,7 +373,7 @@ impl<'a> AggregationContext<'a> { pub(crate) fn with_groups(&mut self, groups: GroupsProxy) -> &mut Self { if let AggState::AggregatedList(_) = self.agg_state() { // In case of new groups, a series always needs to be flattened - self.with_series(self.flat_naive().into_owned(), false, None) + self.with_values(self.flat_naive().into_owned(), false, None) .unwrap(); } self.groups = Cow::Owned(groups); @@ -383,7 +383,7 @@ impl<'a> AggregationContext<'a> { } /// Get the aggregated version of the series. - pub fn aggregated(&mut self) -> Series { + pub fn aggregated(&mut self) -> Column { // we clone, because we only want to call `self.groups()` if needed. // self groups may instantiate new groups and thus can be expensive. match self.state.clone() { @@ -409,7 +409,7 @@ impl<'a> AggregationContext<'a> { self.update_groups = UpdateGroups::WithGroupsLen; out }, - AggState::AggregatedList(s) | AggState::AggregatedScalar(s) => s, + AggState::AggregatedList(s) | AggState::AggregatedScalar(s) => s.into_column(), AggState::Literal(s) => { self.groups(); let rows = self.groups.len(); @@ -421,21 +421,21 @@ impl<'a> AggregationContext<'a> { ]) .unwrap(); self.state = AggState::AggregatedList(out.clone()); - out + out.into_column() }, } } /// Get the final aggregated version of the series. - pub fn finalize(&mut self) -> Series { + pub fn finalize(&mut self) -> Column { // we clone, because we only want to call `self.groups()` if needed. // self groups may instantiate new groups and thus can be expensive. match &self.state { - AggState::Literal(s) => { - let s = s.clone(); + AggState::Literal(c) => { + let c = c.clone(); self.groups(); let rows = self.groups.len(); - s.new_from_index(0, rows) + c.new_from_index(0, rows) }, _ => self.aggregated(), } @@ -452,15 +452,15 @@ impl<'a> AggregationContext<'a> { } } - pub fn get_final_aggregation(mut self) -> (Series, Cow<'a, GroupsProxy>) { + pub fn get_final_aggregation(mut self) -> (Column, Cow<'a, GroupsProxy>) { let _ = self.groups(); let groups = self.groups; match self.state { - AggState::NotAggregated(s) => (s, groups), - AggState::AggregatedScalar(s) => (s, groups), - AggState::Literal(s) => (s, groups), - AggState::AggregatedList(s) => { - let flattened = s.explode().unwrap(); + AggState::NotAggregated(c) => (c, groups), + AggState::AggregatedScalar(c) => (c, groups), + AggState::Literal(c) => (c, groups), + AggState::AggregatedList(c) => { + let flattened = c.explode().unwrap(); let groups = groups.into_owned(); // unroll the possible flattened state // say we have groups with overlapping windows: @@ -496,10 +496,10 @@ impl<'a> AggregationContext<'a> { /// Note that we call it naive, because if a previous expr /// has filtered or sorted this, this information is in the /// group tuples not the flattened series. - pub(crate) fn flat_naive(&self) -> Cow<'_, Series> { + pub(crate) fn flat_naive(&self) -> Cow<'_, Column> { match &self.state { - AggState::NotAggregated(s) => Cow::Borrowed(s), - AggState::AggregatedList(s) => { + AggState::NotAggregated(c) => Cow::Borrowed(c), + AggState::AggregatedList(c) => { #[cfg(debug_assertions)] { // panic so we find cases where we accidentally explode overlapping groups @@ -509,22 +509,22 @@ impl<'a> AggregationContext<'a> { } } - Cow::Owned(s.explode().unwrap()) + Cow::Owned(c.explode().unwrap()) }, - AggState::AggregatedScalar(s) => Cow::Borrowed(s), - AggState::Literal(s) => Cow::Borrowed(s), + AggState::AggregatedScalar(c) => Cow::Borrowed(c), + AggState::Literal(c) => Cow::Borrowed(c), } } /// Take the series. - pub(crate) fn take(&mut self) -> Series { - let s = match &mut self.state { - AggState::NotAggregated(s) - | AggState::AggregatedScalar(s) - | AggState::AggregatedList(s) => s, - AggState::Literal(s) => s, + pub(crate) fn take(&mut self) -> Column { + let c = match &mut self.state { + AggState::NotAggregated(c) + | AggState::AggregatedScalar(c) + | AggState::AggregatedList(c) => c, + AggState::Literal(c) => c, }; - std::mem::take(s) + std::mem::take(c) } } diff --git a/crates/polars-expr/src/expressions/slice.rs b/crates/polars-expr/src/expressions/slice.rs index 2b805edd1bb0..0c2688d7999a 100644 --- a/crates/polars-expr/src/expressions/slice.rs +++ b/crates/polars-expr/src/expressions/slice.rs @@ -1,5 +1,5 @@ use polars_core::prelude::*; -use polars_core::utils::{slice_offsets, Container, CustomIterTools}; +use polars_core::utils::{slice_offsets, CustomIterTools}; use polars_core::POOL; use rayon::prelude::*; use AnyValue::Null; @@ -14,7 +14,7 @@ pub struct SliceExpr { pub(crate) expr: Expr, } -fn extract_offset(offset: &Series, expr: &Expr) -> PolarsResult { +fn extract_offset(offset: &Column, expr: &Expr) -> PolarsResult { polars_ensure!( offset.len() <= 1, expr = expr, ComputeError: "invalid argument to slice; expected an offset literal, got series of length {}", @@ -25,7 +25,7 @@ fn extract_offset(offset: &Series, expr: &Expr) -> PolarsResult { ) } -fn extract_length(length: &Series, expr: &Expr) -> PolarsResult { +fn extract_length(length: &Column, expr: &Expr) -> PolarsResult { polars_ensure!( length.len() <= 1, expr = expr, ComputeError: "invalid argument to slice; expected a length literal, got series of length {}", @@ -39,11 +39,11 @@ fn extract_length(length: &Series, expr: &Expr) -> PolarsResult { } } -fn extract_args(offset: &Series, length: &Series, expr: &Expr) -> PolarsResult<(i64, usize)> { +fn extract_args(offset: &Column, length: &Column, expr: &Expr) -> PolarsResult<(i64, usize)> { Ok((extract_offset(offset, expr)?, extract_length(length, expr)?)) } -fn check_argument(arg: &Series, groups: &GroupsProxy, name: &str, expr: &Expr) -> PolarsResult<()> { +fn check_argument(arg: &Column, groups: &GroupsProxy, name: &str, expr: &Expr) -> PolarsResult<()> { polars_ensure!( !matches!(arg.dtype(), DataType::List(_)), expr = expr, ComputeError: "invalid slice argument: cannot use an array as {} argument", name, @@ -92,11 +92,7 @@ impl PhysicalExpr for SliceExpr { let offset = &results[0]; let length = &results[1]; let series = &results[2]; - let (offset, length) = extract_args( - offset.as_materialized_series(), - length.as_materialized_series(), - &self.expr, - )?; + let (offset, length) = extract_args(offset, length, &self.expr)?; Ok(series.slice(offset, length)) } diff --git a/crates/polars-expr/src/expressions/sort.rs b/crates/polars-expr/src/expressions/sort.rs index be9fe57e29ce..df816f9b48e7 100644 --- a/crates/polars-expr/src/expressions/sort.rs +++ b/crates/polars-expr/src/expressions/sort.rs @@ -63,7 +63,7 @@ impl PhysicalExpr for SortExpr { AggState::AggregatedList(s) => { let ca = s.list().unwrap(); let out = ca.lst_sort(self.options)?; - ac.with_series(out.into_series(), true, Some(&self.expr))?; + ac.with_values(out.into_column(), true, Some(&self.expr))?; }, _ => { let series = ac.flat_naive().into_owned(); diff --git a/crates/polars-expr/src/expressions/sortby.rs b/crates/polars-expr/src/expressions/sortby.rs index fad081cb49ed..ed34ed6414cd 100644 --- a/crates/polars-expr/src/expressions/sortby.rs +++ b/crates/polars-expr/src/expressions/sortby.rs @@ -133,8 +133,8 @@ fn sort_by_groups_no_match_single<'a>( }) .collect_ca_with_dtype(PlSmallStr::EMPTY, dtype) }); - let s = ca?.with_name(s_in.name().clone()).into_series(); - ac_in.with_series(s, true, Some(expr))?; + let c = ca?.with_name(s_in.name().clone()).into_column(); + ac_in.with_values(c, true, Some(expr))?; Ok(ac_in) } @@ -281,12 +281,16 @@ impl PhysicalExpr for SortByExpr { .collect::>>()?; let mut sort_by_s = ac_sort_by .iter() - .map(|s| { - let s = s.flat_naive(); - match s.dtype() { + .map(|c| { + let c = c.flat_naive(); + match c.dtype() { #[cfg(feature = "dtype-categorical")] - DataType::Categorical(_, _) | DataType::Enum(_, _) => s.into_owned(), - _ => s.to_physical_repr().into_owned(), + DataType::Categorical(_, _) | DataType::Enum(_, _) => { + c.as_materialized_series().clone() + }, + // @scalar-opt + // @partition-opt + _ => c.to_physical_repr().take_materialized_series(), } }) .collect::>(); @@ -363,7 +367,7 @@ impl PhysicalExpr for SortByExpr { // group_by operation - we must ensure that we are as well. if ordered_by_group_operation { let s = ac_in.aggregated(); - ac_in.with_series(s.explode().unwrap(), false, None)?; + ac_in.with_values(s.explode().unwrap(), false, None)?; } ac_in.with_groups(groups); diff --git a/crates/polars-expr/src/expressions/ternary.rs b/crates/polars-expr/src/expressions/ternary.rs index 2d1035c22eb7..bbd0c5f7d936 100644 --- a/crates/polars-expr/src/expressions/ternary.rs +++ b/crates/polars-expr/src/expressions/ternary.rs @@ -56,13 +56,13 @@ fn finish_as_iters<'a>( .transpose() }) .collect::>()? - .with_name(ac_truthy.series().name().clone()); + .with_name(ac_truthy.get_values().name().clone()); // Aggregation leaves only a single chunk. let arr = ca.downcast_iter().next().unwrap(); let list_vals_len = arr.values().len(); - let mut out = ca.into_series(); + let mut out = ca.into_column(); if ac_truthy.arity_should_explode() && ac_falsy.arity_should_explode() && ac_mask.arity_should_explode() && // Exploded list should be equal to groups length. list_vals_len == ac_truthy.groups.len() @@ -70,7 +70,7 @@ fn finish_as_iters<'a>( out = out.explode()? } - ac_truthy.with_series(out, true, None)?; + ac_truthy.with_values(out, true, None)?; Ok(ac_truthy) } @@ -168,8 +168,8 @@ impl PhysicalExpr for TernaryExpr { } let out = ac_truthy - .series() - .zip_with(ac_mask.series().bool()?, ac_falsy.series())?; + .get_values() + .zip_with(ac_mask.get_values().bool()?, ac_falsy.get_values())?; for ac in [&ac_mask, &ac_truthy, &ac_falsy].into_iter() { if matches!(ac.agg_state(), NotAggregated(_)) { @@ -257,21 +257,21 @@ impl PhysicalExpr for TernaryExpr { } let truthy = if let AggregatedList(s) = ac_truthy.agg_state() { - s.list().unwrap().get_inner() + s.list().unwrap().get_inner().into_column() } else { - ac_truthy.series().clone() + ac_truthy.get_values().clone() }; let falsy = if let AggregatedList(s) = ac_falsy.agg_state() { - s.list().unwrap().get_inner() + s.list().unwrap().get_inner().into_column() } else { - ac_falsy.series().clone() + ac_falsy.get_values().clone() }; let mask = if let AggregatedList(s) = ac_mask.agg_state() { - s.list().unwrap().get_inner() + s.list().unwrap().get_inner().into_column() } else { - ac_mask.series().clone() + ac_mask.get_values().clone() }; let out = truthy.zip_with(mask.bool()?, &falsy)?; @@ -280,8 +280,10 @@ impl PhysicalExpr for TernaryExpr { // offsets buffer of the result, so we construct the result // ListChunked directly from the 2. let out = out.rechunk(); - let values = out.array_ref(0); - let offsets = ac_target.series().list().unwrap().offsets()?; + // @scalar-opt + // @partition-opt + let values = out.as_materialized_series().array_ref(0); + let offsets = ac_target.get_values().list().unwrap().offsets()?; let inner_type = out.dtype(); let dtype = LargeListArray::default_datatype(values.dtype().clone()); @@ -291,11 +293,11 @@ impl PhysicalExpr for TernaryExpr { let mut out = ListChunked::with_chunk(truthy.name().clone(), out); unsafe { out.to_logical(inner_type.clone()) }; - if ac_target.series().list().unwrap()._can_fast_explode() { + if ac_target.get_values().list().unwrap()._can_fast_explode() { out.set_fast_explode(); }; - let out = out.into_series(); + let out = out.into_column(); AggregatedList(out) }, @@ -305,8 +307,8 @@ impl PhysicalExpr for TernaryExpr { } let out = ac_truthy - .series() - .zip_with(ac_mask.series().bool()?, ac_falsy.series())?; + .get_values() + .zip_with(ac_mask.get_values().bool()?, ac_falsy.get_values())?; AggregatedScalar(out) }, _ => { diff --git a/crates/polars-expr/src/expressions/window.rs b/crates/polars-expr/src/expressions/window.rs index e15a301f68b4..bbb9a1cface1 100644 --- a/crates/polars-expr/src/expressions/window.rs +++ b/crates/polars-expr/src/expressions/window.rs @@ -45,13 +45,13 @@ enum MapStrategy { impl WindowExpr { fn map_list_agg_by_arg_sort( &self, - out_column: Series, - flattened: Series, + out_column: Column, + flattened: Column, mut ac: AggregationContext, gb: GroupBy, state: &ExecutionState, cache_key: &str, - ) -> PolarsResult { + ) -> PolarsResult { // idx (new-idx, original-idx) let mut idx_mapping = Vec::with_capacity(out_column.len()); @@ -124,14 +124,14 @@ impl WindowExpr { fn map_by_arg_sort( &self, df: &DataFrame, - out_column: Series, - flattened: Series, + out_column: Column, + flattened: Column, mut ac: AggregationContext, group_by_columns: &[Column], gb: GroupBy, state: &ExecutionState, cache_key: &str, - ) -> PolarsResult { + ) -> PolarsResult { // we use an arg_sort to map the values back // This is a bit more complicated because the final group tuples may differ from the original @@ -656,7 +656,7 @@ impl PhysicalExpr for WindowExpr { } } -fn materialize_column(join_opt_ids: &ChunkJoinOptIds, out_column: &Series) -> Series { +fn materialize_column(join_opt_ids: &ChunkJoinOptIds, out_column: &Column) -> Column { { use arrow::Either; use polars_ops::chunked_array::TakeChunked; @@ -680,11 +680,11 @@ fn cache_gb(gb: GroupBy, state: &ExecutionState, cache_key: &str) { /// Simple reducing aggregation can be set by the groups fn set_by_groups( - s: &Series, + s: &Column, groups: &GroupsProxy, len: usize, update_groups: bool, -) -> Option { +) -> Option { if update_groups { return None; } @@ -697,7 +697,9 @@ fn set_by_groups( Some(set_numeric($ca, groups, len)) }}; } - downcast_as_macro_arg_physical!(&s, dispatch).map(|s| s.cast(dtype).unwrap()) + downcast_as_macro_arg_physical!(&s, dispatch) + .map(|s| s.cast(dtype).unwrap()) + .map(Column::from) } else { None } diff --git a/crates/polars-lazy/src/dsl/list.rs b/crates/polars-lazy/src/dsl/list.rs index c706ee9b6957..d23d99b90e5c 100644 --- a/crates/polars-lazy/src/dsl/list.rs +++ b/crates/polars-lazy/src/dsl/list.rs @@ -138,7 +138,7 @@ fn run_on_group_by_engine( let out = match ac.agg_state() { AggState::AggregatedScalar(_) => { let out = ac.aggregated(); - out.as_list().into_series() + out.as_list().into_column() }, _ => ac.aggregated(), }; diff --git a/crates/polars-lazy/src/frame/pivot.rs b/crates/polars-lazy/src/frame/pivot.rs index 4d89eebef010..70eed4d8f58c 100644 --- a/crates/polars-lazy/src/frame/pivot.rs +++ b/crates/polars-lazy/src/frame/pivot.rs @@ -29,7 +29,7 @@ impl PhysicalAggExpr for PivotExpr { )?; phys_expr .evaluate_on_groups(df, groups, &state) - .map(|mut ac| ac.aggregated()) + .map(|mut ac| ac.aggregated().take_materialized_series()) } fn root_name(&self) -> PolarsResult<&PlSmallStr> { diff --git a/crates/polars-mem-engine/src/executors/filter.rs b/crates/polars-mem-engine/src/executors/filter.rs index 417a7ecf766e..a47e9b6f5ed9 100644 --- a/crates/polars-mem-engine/src/executors/filter.rs +++ b/crates/polars-mem-engine/src/executors/filter.rs @@ -10,10 +10,10 @@ pub struct FilterExec { streamable: bool, } -fn series_to_mask(s: &Series) -> PolarsResult<&BooleanChunked> { - s.bool().map_err(|_| { +fn column_to_mask(c: &Column) -> PolarsResult<&BooleanChunked> { + c.bool().map_err(|_| { polars_err!( - ComputeError: "filter predicate must be of type `Boolean`, got `{}`", s.dtype() + ComputeError: "filter predicate must be of type `Boolean`, got `{}`", c.dtype() ) }) } @@ -41,11 +41,14 @@ impl FilterExec { if self.has_window { state.insert_has_window_function_flag() } - let s = self.predicate.evaluate(&df, state)?; + let c = self.predicate.evaluate(&df, state)?; if self.has_window { state.clear_window_expr_cache() } - df.filter(series_to_mask(s.as_materialized_series())?) + + // @scalar-opt + // @partition-opt + df.filter(column_to_mask(&c)?) } fn execute_chunks( @@ -54,8 +57,11 @@ impl FilterExec { state: &ExecutionState, ) -> PolarsResult { let iter = chunks.into_par_iter().map(|df| { - let s = self.predicate.evaluate(&df, state)?; - df.filter(series_to_mask(s.as_materialized_series())?) + let c = self.predicate.evaluate(&df, state)?; + + // @scalar-opt + // @partition-opt + df.filter(column_to_mask(&c)?) }); let df = POOL.install(|| iter.collect::>>())?; Ok(accumulate_dataframes_vertical_unchecked(df)) diff --git a/crates/polars-mem-engine/src/executors/group_by.rs b/crates/polars-mem-engine/src/executors/group_by.rs index 1ae612f64d67..437b7fb574aa 100644 --- a/crates/polars-mem-engine/src/executors/group_by.rs +++ b/crates/polars-mem-engine/src/executors/group_by.rs @@ -7,7 +7,7 @@ pub(super) fn evaluate_aggs( aggs: &[Arc], groups: &GroupsProxy, state: &ExecutionState, -) -> PolarsResult> { +) -> PolarsResult> { POOL.install(|| { aggs.par_iter() .map(|expr| { diff --git a/crates/polars-mem-engine/src/executors/stack.rs b/crates/polars-mem-engine/src/executors/stack.rs index a93d4fc72d89..0b2dbfd01da3 100644 --- a/crates/polars-mem-engine/src/executors/stack.rs +++ b/crates/polars-mem-engine/src/executors/stack.rs @@ -38,12 +38,7 @@ impl StackExec { self.options.run_parallel, )?; // We don't have to do a broadcast check as cse is not allowed to hit this. - df._add_series( - res.into_iter() - .map(|c| c.take_materialized_series()) - .collect(), - schema, - )?; + df._add_columns(res.into_iter().collect(), schema)?; Ok(df) }); @@ -100,12 +95,7 @@ impl StackExec { } } } - df._add_series( - res.into_iter() - .map(|v| v.take_materialized_series()) - .collect(), - schema, - )?; + df._add_columns(res.into_iter().collect(), schema)?; } df }; diff --git a/crates/polars-ops/src/series/ops/index.rs b/crates/polars-ops/src/series/ops/index.rs index 51811cf0c319..b56f499895ff 100644 --- a/crates/polars-ops/src/series/ops/index.rs +++ b/crates/polars-ops/src/series/ops/index.rs @@ -1,7 +1,9 @@ use num_traits::{Signed, Zero}; use polars_core::error::{polars_ensure, PolarsResult}; use polars_core::prelude::arity::unary_elementwise_values; -use polars_core::prelude::{ChunkedArray, DataType, IdxCa, PolarsIntegerType, Series, IDX_DTYPE}; +use polars_core::prelude::{ + ChunkedArray, Column, DataType, IdxCa, PolarsIntegerType, Series, IDX_DTYPE, +}; use polars_utils::index::ToIdx; fn convert(ca: &ChunkedArray, target_len: usize) -> PolarsResult @@ -97,3 +99,10 @@ pub fn is_positive_idx_uncertain(s: &Series) -> bool { _ => unreachable!(), } } + +/// May give false negatives because it ignores the null values. +pub fn is_positive_idx_uncertain_col(c: &Column) -> bool { + // @scalar-opt + // @partition-opt + is_positive_idx_uncertain(c.as_materialized_series()) +}