diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index e08929ff082..3e4d34f0a38 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2232,8 +2232,8 @@ def map( def window( self, axis: Union[int, Axis], - reduce_fn: Callable, window_size: int, + reduce_fn: Callable, result_schema: Optional[Dict[Hashable, type]] = None, ) -> "PandasDataframe": """ @@ -2244,9 +2244,9 @@ def window( axis : int or modin.core.dataframe.base.utils.Axis The axis to slide over. reduce_fn : callable(rowgroup|colgroup) -> row|col - The reduce function to apply over the data. + The reduce function to apply over the data. This must not change metadata. window_size : int - The number of row/columns to pass to the function. + The number of rows/columns to pass to the reduce function. (The size of the sliding window). result_schema : dict, optional Mapping from column labels to data types that represents the types of the output dataframe. @@ -2256,13 +2256,136 @@ def window( PandasDataframe A new PandasDataframe with the reduce function applied over windows of the specified axis. - - Notes - ----- - The user-defined reduce function must reduce each window’s column - (row if axis=1) down to a single value. """ - pass + + axis = Axis(axis) + + # applies reduction function over entire virtual partition + def window_function_complete(virtual_partition): + # have to copy the pandas dataframe on ray because it's immutable + virtual_partition_copy = virtual_partition.copy() + window_result = reduce_fn(virtual_partition_copy) + return window_result + + # applies reduction function over one window of virtual partition + def window_function_partition(virtual_partition): + virtual_partition_copy = virtual_partition.copy() + window_result = reduce_fn(virtual_partition_copy) + return ( + window_result.iloc[:, window_size - 1 :] + if axis == Axis.COL_WISE + else window_result.iloc[window_size - 1 :, :] + ) + + num_parts = ( + len(self._partitions[0]) if axis == Axis.COL_WISE else len(self._partitions) + ) + results = [] + + for i in range(num_parts): + # get the ith partition + starting_part = ( + self._partitions[:, [i]] + if axis == Axis.COL_WISE + else self._partitions[i] + ) + + # partitions to join in virtual partition + parts_to_join = ( + [starting_part] + if (axis == Axis.ROW_WISE) + else [[partition[0]] for partition in starting_part] + ) + + # used to determine if window continues into next partition or if we can create virtual partition + last_window_span = window_size - 1 + k = i + 1 + + while last_window_span > 0 and k < num_parts: + # new partition + new_parts = ( + self._partitions[:, [k]] + if axis == Axis.COL_WISE + else self._partitions[k] + ) + part_len = ( + new_parts[0][0].width() + if axis == Axis.COL_WISE + else new_parts[0].length() + ) + + if last_window_span <= part_len: + if axis == Axis.COL_WISE: + masked_new_parts = [ + [ + part[0].mask( + row_labels=slice(None), + col_labels=slice(0, last_window_span), + ) + ] + for part in new_parts + ] + for x, r in enumerate(parts_to_join): + r.append(masked_new_parts[x][0]) + else: + masked_new_parts = np.array( + [ + part.mask( + row_labels=slice(0, last_window_span), + col_labels=slice(None), + ) + for part in new_parts + ] + ) + parts_to_join.append(masked_new_parts) + break + else: + # window continues into next part, so just add this part to parts_to_join + if axis == Axis.COL_WISE: + for x, r in enumerate(parts_to_join): + r.append(new_parts[x][0]) + else: + parts_to_join.append(new_parts) + last_window_span -= part_len + k += 1 + + # create virtual partition and perform window operation + virtual_partitions = ( + self._partition_mgr_cls.row_partitions( + np.array(parts_to_join), full_axis=False + ) + if axis == Axis.COL_WISE + else self._partition_mgr_cls.column_partitions( + np.array(parts_to_join), full_axis=False + ) + ) + + if i == 0: + reduce_result = [ + virtual_partition.apply(window_function_complete) + for virtual_partition in virtual_partitions + ] + else: + reduce_result = [ + virtual_partition.apply(window_function_partition) + for virtual_partition in virtual_partitions + ] + + # append reduction result to results array + if axis == Axis.ROW_WISE: + results.append(reduce_result) + else: + if results == []: + results = [[x] for x in reduce_result] + else: + for x, r in enumerate(results): + r.append(reduce_result[x]) + + results = np.array(results) + + return self.__constructor__( + np.array(results), self.index, self.columns, None, None, result_schema + ) @lazy_metadata_decorator(apply_axis="both") def fold(self, axis, func, new_columns=None): diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 500818421b6..d913a4483a2 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -229,7 +229,7 @@ def column_partitions(cls, partitions, full_axis=True): ] @classmethod - def row_partitions(cls, partitions): + def row_partitions(cls, partitions, full_axis=True): """ List of `BaseDataframeAxisPartition` objects representing row-wise partitions. @@ -250,7 +250,11 @@ def row_partitions(cls, partitions): """ if not isinstance(partitions, list): partitions = [partitions] - return [cls._row_partition_class(row) for frame in partitions for row in frame] + return [ + cls._row_partition_class(row, full_axis=full_axis) + for frame in partitions + for row in frame + ] @classmethod def axis_partition(cls, partitions, axis, full_axis: bool = True): diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 5f98683425b..dc2a1fb7aab 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -216,6 +216,32 @@ def dt_op_builder(df, *args, **kwargs): return dt_op_builder +def _current_engine_supports_virtual_partitions(): + return Engine.get() in ("Ray", "Dask", "Unidist") + + +def _can_use_cell_wise_window(center, window, win_type): + return ( + # need to make virtual partitions to use cell-wise window + _current_engine_supports_virtual_partitions() + # modin frame window assumes center is True + and center is False + # window can be time intervals like "1d" or even more exotic objects like + # FixedForwardWindowIndexer. modin frame window only handles integers + and isinstance(window, int) + # window types like triangular window can normalize the input values by the + # maximum value, but finding the maximum value requires access to the entire + # axis that we are windowing. For example: + # the 4th value of + # pd.Series([1, 10, 1000, 10_000]).rolling(3, win_type="triang").std() + # on pandas 1.5.3 is 5461.392260, but if we do the rolling from the second value + # onwards with + # pd.Series([10, 1000, 10_000]).rolling(3, win_type="triang").std(), the 3rd + # value is 5426.261196. So we can't decompose the windowing cell-wise. + and win_type is None + ) + + def copy_df_for_func(func, display_name: str = None): """ Build function that execute specified `func` against passed frame inplace. @@ -1498,82 +1524,316 @@ def expanding_corr( ) ) - window_mean = Fold.register( + _window_mean = Fold.register( lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).mean(*args, **kwargs) ) ) - window_sum = Fold.register( + + def window_mean(self, axis, window_kwargs, *args, **kwargs): + window = window_kwargs["window"] + center = window_kwargs["center"] + win_type = window_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**window_kwargs).mean(*args, **kwargs), + ) + ) + else: + return self._window_mean(axis, window_kwargs, *args, **kwargs) + + _window_sum = Fold.register( lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).sum(*args, **kwargs) ) ) - window_var = Fold.register( + + def window_sum(self, axis, window_kwargs, *args, **kwargs): + window = window_kwargs["window"] + center = window_kwargs["center"] + win_type = window_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**window_kwargs).sum(*args, **kwargs), + ) + ) + else: + return self._window_sum(axis, window_kwargs, *args, **kwargs) + + _window_var = Fold.register( lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).var(ddof=ddof, *args, **kwargs) ) ) - window_std = Fold.register( + + def window_var(self, axis, window_kwargs, ddof, *args, **kwargs): + window = window_kwargs["window"] + center = window_kwargs["center"] + win_type = window_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**window_kwargs).var(ddof, *args, **kwargs), + ) + ) + else: + return self._window_var(axis, window_kwargs, ddof, *args, **kwargs) + + _window_std = Fold.register( lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).std(ddof=ddof, *args, **kwargs) ) ) - rolling_count = Fold.register( - lambda df, rolling_kwargs: pandas.DataFrame( - df.rolling(**rolling_kwargs).count() - ) + + def window_std(self, axis, window_kwargs, ddof, *args, **kwargs): + window = window_kwargs["window"] + center = window_kwargs["center"] + win_type = window_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**window_kwargs).std(ddof=ddof, *args, **kwargs), + ) + ) + else: + return self._window_std(axis, window_kwargs, ddof, *args, **kwargs) + + _rolling_count = Fold.register( + lambda df, rolling_kwargs: pandas.DataFrame(df.rolling(**rolling_kwargs).count()) ) - rolling_sum = Fold.register( + + def rolling_count(self, axis, rolling_kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, window, lambda df: df.rolling(**rolling_kwargs).count() + ) + ) + else: + return self._rolling_count(axis, rolling_kwargs) + + _rolling_sum = Fold.register( lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).sum(*args, **kwargs) ) ) - rolling_sem = Fold.register( + + def rolling_sum(self, axis, rolling_kwargs, *args, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).sum(*args, **kwargs), + ) + ) + else: + return self._rolling_sum(axis, rolling_kwargs, *args, **kwargs) + + _rolling_sem = Fold.register( lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).sem(*args, **kwargs) ) ) - rolling_mean = Fold.register( + + def rolling_sem(self, axis, rolling_kwargs, *args, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).sem(*args, **kwargs), + ) + ) + else: + return self._rolling_sem(axis, rolling_kwargs, *args, **kwargs) + + _rolling_mean = Fold.register( lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).mean(*args, **kwargs) ) ) - rolling_median = Fold.register( + + def rolling_mean(self, axis, rolling_kwargs, *args, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).mean(*args, **kwargs), + ) + ) + else: + return self._rolling_mean(axis, rolling_kwargs, *args, **kwargs) + + _rolling_median = Fold.register( lambda df, rolling_kwargs, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).median(**kwargs) ) ) - rolling_var = Fold.register( + + def rolling_median(self, axis, rolling_kwargs, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, window, lambda df: df.rolling(**rolling_kwargs).median(**kwargs) + ) + ) + else: + return self._rolling_median(axis, rolling_kwargs, **kwargs) + + _rolling_var = Fold.register( lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).var(ddof=ddof, *args, **kwargs) ) ) - rolling_std = Fold.register( + + def rolling_var(self, axis, rolling_kwargs, ddof, *args, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).var( + ddof=ddof, *args, **kwargs + ), + ) + ) + else: + return self._rolling_var(axis, rolling_kwargs, ddof, *args, **kwargs) + + _rolling_std = Fold.register( lambda df, rolling_kwargs, ddof, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).std(ddof=ddof, *args, **kwargs) ) ) - rolling_min = Fold.register( + + def rolling_std(self, axis, rolling_kwargs, ddof, *args, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).std( + ddof=ddof, *args, **kwargs + ), + ) + ) + else: + return self._rolling_std(axis, rolling_kwargs, ddof, *args, **kwargs) + + _rolling_min = Fold.register( lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).min(*args, **kwargs) ) ) - rolling_max = Fold.register( + + def rolling_min(self, axis, rolling_kwargs, *args, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).min(*args, **kwargs), + ) + ) + else: + return self._rolling_min(axis, rolling_kwargs, *args, **kwargs) + + _rolling_max = Fold.register( lambda df, rolling_kwargs, *args, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).max(*args, **kwargs) ) ) - rolling_skew = Fold.register( + + def rolling_max(self, axis, rolling_kwargs, *args, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).max(*args, **kwargs), + ) + ) + else: + return self._rolling_max(axis, rolling_kwargs, *args, **kwargs) + + _rolling_skew = Fold.register( lambda df, rolling_kwargs, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).skew(**kwargs) ) ) - rolling_kurt = Fold.register( + + def rolling_skew(self, axis, rolling_kwargs, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, window, lambda df: df.rolling(**rolling_kwargs).skew(**kwargs) + ) + ) + else: + return self._rolling_skew(axis, rolling_kwargs, **kwargs) + + _rolling_kurt = Fold.register( lambda df, rolling_kwargs, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).kurt(**kwargs) ) ) - rolling_apply = Fold.register( + + def rolling_kurt(self, axis, rolling_kwargs, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, window, lambda df: df.rolling(**rolling_kwargs).kurt(**kwargs) + ) + ) + else: + return self._rolling_kurt(axis, rolling_kwargs, **kwargs) + + _rolling_apply = Fold.register( lambda df, rolling_kwargs, func, raw, engine, engine_kwargs, args, kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).apply( func=func, @@ -1585,14 +1845,41 @@ def expanding_corr( ) ) ) - rolling_quantile = Fold.register( - lambda df, rolling_kwargs, q, interpolation, **kwargs: pandas.DataFrame( - df.rolling(**rolling_kwargs).quantile( - q=q, interpolation=interpolation, **kwargs + + def rolling_apply( + self, axis, rolling_kwargs, func, raw, engine, engine_kwargs, args, kwargs + ): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).apply( + func=func, + raw=raw, + engine=engine, + engine_kwargs=engine_kwargs, + args=args, + kwargs=kwargs, + ), + ) ) - ) - ) - rolling_rank = Fold.register( + else: + return self._rolling_apply( + axis, + rolling_kwargs, + func, + raw, + engine, + engine_kwargs, + args, + kwargs, + ) + + _rolling_rank = Fold.register( lambda df, rolling_kwargs, method, ascending, pct, numeric_only, **kwargs: pandas.DataFrame( df.rolling(**rolling_kwargs).rank( method=method, @@ -1604,6 +1891,66 @@ def expanding_corr( ) ) + def rolling_rank( + self, axis, rolling_kwargs, method, ascending, pct, numeric_only, **kwargs + ): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).rank( + method=method, + ascending=ascending, + pct=pct, + numeric_only=numeric_only, + **kwargs, + ), + ) + ) + else: + return self._rolling_rank( + axis, rolling_kwargs, method, ascending, pct, numeric_only, **kwargs + ) + + _rolling_quantile = Fold.register( + lambda df, rolling_kwargs, quantile, interpolation, **kwargs: pandas.DataFrame( + df.rolling(**rolling_kwargs).quantile( + quantile=quantile, interpolation=interpolation, **kwargs + ) + ) + ) + + def rolling_quantile(self, axis, rolling_kwargs, quantile, interpolation, **kwargs): + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).quantile( + quantile=quantile, interpolation=interpolation, **kwargs + ), + ) + ) + else: + return self._rolling_quantile( + axis, rolling_kwargs, quantile, interpolation, **kwargs + ) + + _rolling_corr = Fold.register( + lambda df, rolling_kwargs, other, pairwise, *args, **kwargs: pandas.DataFrame( + df.rolling(**rolling_kwargs).corr( + other=other, pairwise=pairwise, *args, **kwargs + ) + ) + ) + def rolling_corr(self, axis, rolling_kwargs, other, pairwise, *args, **kwargs): if len(self.columns) > 1: return self.default_to_pandas( @@ -1612,13 +1959,31 @@ def rolling_corr(self, axis, rolling_kwargs, other, pairwise, *args, **kwargs): ) ) else: - return Fold.register( - lambda df: pandas.DataFrame( - df.rolling(**rolling_kwargs).corr( - other=other, pairwise=pairwise, *args, **kwargs + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window(center, window, win_type): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).corr( + other=other, pairwise=pairwise, *args, **kwargs + ), ) ) - )(self, axis) + else: + return self._rolling_corr( + axis, rolling_kwargs, other, pairwise, *args, **kwargs + ) + + _rolling_cov = Fold.register( + lambda df, rolling_kwargs, other, pairwise, ddof, **kwargs: pandas.DataFrame( + df.rolling(**rolling_kwargs).cov( + other=other, pairwise=pairwise, ddof=ddof, **kwargs + ) + ) + ) def rolling_cov(self, axis, rolling_kwargs, other, pairwise, ddof, **kwargs): if len(self.columns) > 1: @@ -1628,15 +1993,31 @@ def rolling_cov(self, axis, rolling_kwargs, other, pairwise, ddof, **kwargs): ) ) else: - return Fold.register( - lambda df: pandas.DataFrame( - df.rolling(**rolling_kwargs).cov( - other=other, pairwise=pairwise, ddof=ddof, **kwargs + window = rolling_kwargs["window"] + center = rolling_kwargs["center"] + win_type = rolling_kwargs["win_type"] + if _can_use_cell_wise_window( + center, + window, + win_type, + ): + return self.__constructor__( + self._modin_frame.window( + axis, + window, + lambda df: df.rolling(**rolling_kwargs).cov( + other=other, pairwise=pairwise, **kwargs + ), ) ) - )(self, axis) + else: + return self._rolling_cov( + axis, rolling_kwargs, other, pairwise, ddof, **kwargs + ) def rolling_aggregate(self, axis, rolling_kwargs, func, *args, **kwargs): + # note that we can't use the modin frame's window(), which assumes that the + # metadata doesn't change. new_modin_frame = self._modin_frame.apply_full_axis( axis, lambda df: pandas.DataFrame( diff --git a/modin/pandas/window.py b/modin/pandas/window.py index 4f3ca62c276..1c4594e04f3 100644 --- a/modin/pandas/window.py +++ b/modin/pandas/window.py @@ -68,28 +68,28 @@ def mean(self, *args, **kwargs): query_compiler=self._query_compiler.window_mean( self.axis, self.window_kwargs, *args, **kwargs ) - ) + ) def sum(self, *args, **kwargs): return self._dataframe.__constructor__( query_compiler=self._query_compiler.window_sum( self.axis, self.window_kwargs, *args, **kwargs ) - ) + ) def var(self, ddof=1, *args, **kwargs): return self._dataframe.__constructor__( query_compiler=self._query_compiler.window_var( self.axis, self.window_kwargs, ddof, *args, **kwargs ) - ) + ) def std(self, ddof=1, *args, **kwargs): return self._dataframe.__constructor__( query_compiler=self._query_compiler.window_std( self.axis, self.window_kwargs, ddof, *args, **kwargs ) - ) + ) @_inherit_docstrings( @@ -169,7 +169,7 @@ def _aggregate(self, method_name, *args, **kwargs): qc_result = self._call_qc_method(method_name, *args, **kwargs) return self._dataframe.__constructor__(query_compiler=qc_result) - def count(self): + def count(self, *args, **kwargs): return self._aggregate("count") def sem(self, *args, **kwargs): @@ -179,7 +179,7 @@ def sum(self, *args, **kwargs): return self._aggregate("sum", *args, **kwargs) def mean(self, *args, **kwargs): - return self._aggregate("mean", *args, **kwargs) + return self._aggregate("mean", *args, **kwargs) def median(self, **kwargs): return self._aggregate("median", **kwargs) @@ -194,12 +194,11 @@ def min(self, *args, **kwargs): return self._aggregate("min", *args, **kwargs) def max(self, *args, **kwargs): - return self._aggregate("max", *args, **kwargs) + return self._aggregate("max", *args, **kwargs) def corr(self, other=None, pairwise=None, *args, **kwargs): from .dataframe import DataFrame from .series import Series - if isinstance(other, DataFrame): other = other._query_compiler.to_pandas() elif isinstance(other, Series): @@ -210,7 +209,6 @@ def corr(self, other=None, pairwise=None, *args, **kwargs): def cov(self, other=None, pairwise=None, ddof: Optional[int] = 1, **kwargs): from .dataframe import DataFrame from .series import Series - if isinstance(other, DataFrame): other = other._query_compiler.to_pandas() elif isinstance(other, Series): diff --git a/modin/tests/pandas/test_rolling.py b/modin/tests/pandas/test_rolling.py index e156701056a..461780bd7f2 100644 --- a/modin/tests/pandas/test_rolling.py +++ b/modin/tests/pandas/test_rolling.py @@ -54,6 +54,8 @@ @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) +#@pytest.mark.parametrize("axis", [0, 1]) +#@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize("axis", [lib.no_default, 1]) @pytest.mark.parametrize( "method, kwargs", @@ -74,7 +76,7 @@ ("median", {}), ], ) -def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs): +def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs, center): # Testing of Rolling class modin_df, pandas_df = create_test_dfs(data) if window > len(pandas_df): @@ -87,7 +89,7 @@ def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs): window=window, min_periods=min_periods, win_type=None, - center=True, + center=center, axis=axis, ), method, @@ -98,16 +100,19 @@ def test_dataframe_rolling(data, window, min_periods, axis, method, kwargs): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) +#@pytest.mark.parametrize("axis", [0, 1]) +#@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") +#def test_dataframe_agg(data, window, min_periods, axis, center): @pytest.mark.parametrize("axis", [lib.no_default, 1]) def test_dataframe_agg(data, window, min_periods, axis): modin_df, pandas_df = create_test_dfs(data) if window > len(pandas_df): window = len(pandas_df) modin_rolled = modin_df.rolling( - window=window, min_periods=min_periods, win_type=None, center=True, axis=axis + window=window, min_periods=min_periods, win_type=None, center=center, axis=axis ) pandas_rolled = pandas_df.rolling( - window=window, min_periods=min_periods, win_type=None, center=True, axis=axis + window=window, min_periods=min_periods, win_type=None, center=center, axis=axis ) df_equals(pandas_rolled.aggregate(np.sum), modin_rolled.aggregate(np.sum)) # TODO(https://github.com/modin-project/modin/issues/4260): Once pandas @@ -123,6 +128,8 @@ def test_dataframe_agg(data, window, min_periods, axis): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) +#@pytest.mark.parametrize("axis", [0, 1]) +#@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize("axis", [lib.no_default, 1]) @pytest.mark.parametrize( "method, kwargs", @@ -133,7 +140,7 @@ def test_dataframe_agg(data, window, min_periods, axis): ("std", {"ddof": 0}), ], ) -def test_dataframe_window(data, window, min_periods, axis, method, kwargs): +def test_dataframe_window(data, window, min_periods, axis, method, kwargs, center): # Testing of Window class modin_df, pandas_df = create_test_dfs(data) if window > len(pandas_df): @@ -146,7 +153,7 @@ def test_dataframe_window(data, window, min_periods, axis, method, kwargs): window=window, min_periods=min_periods, win_type="triang", - center=True, + center=center, axis=axis, ), method, @@ -208,6 +215,7 @@ def test_dataframe_dt_index(axis, on, closed, window): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize( "method, kwargs", [ @@ -229,7 +237,7 @@ def test_dataframe_dt_index(axis, on, closed, window): ("median", {}), ], ) -def test_series_rolling(data, window, min_periods, method, kwargs): +def test_series_rolling(data, window, min_periods, method, kwargs, center): # Test of Rolling class modin_series, pandas_series = create_test_series(data) if window > len(pandas_series): @@ -242,7 +250,7 @@ def test_series_rolling(data, window, min_periods, method, kwargs): window=window, min_periods=min_periods, win_type=None, - center=True, + center=center, ), method, )(**kwargs), @@ -252,15 +260,16 @@ def test_series_rolling(data, window, min_periods, method, kwargs): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) -def test_series_corr_cov(data, window, min_periods): +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") +def test_series_corr_cov(data, window, min_periods, center): modin_series, pandas_series = create_test_series(data) if window > len(pandas_series): window = len(pandas_series) modin_rolled = modin_series.rolling( - window=window, min_periods=min_periods, win_type=None, center=True + window=window, min_periods=min_periods, win_type=None, center=center ) pandas_rolled = pandas_series.rolling( - window=window, min_periods=min_periods, win_type=None, center=True + window=window, min_periods=min_periods, win_type=None, center=center ) df_equals(modin_rolled.corr(modin_series), pandas_rolled.corr(pandas_series)) df_equals( @@ -274,6 +283,7 @@ def test_series_corr_cov(data, window, min_periods): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("window", [5, 100]) @pytest.mark.parametrize("min_periods", [None, 5]) +@pytest.mark.parametrize("center", [True, False], ids=lambda x: f"center={x}") @pytest.mark.parametrize( "method, kwargs", [ @@ -283,7 +293,7 @@ def test_series_corr_cov(data, window, min_periods): ("std", {"ddof": 0}), ], ) -def test_series_window(data, window, min_periods, method, kwargs): +def test_series_window(data, window, min_periods, method, kwargs, center): # Test of Window class modin_series, pandas_series = create_test_series(data) if window > len(pandas_series): @@ -296,7 +306,7 @@ def test_series_window(data, window, min_periods, method, kwargs): window=window, min_periods=min_periods, win_type="triang", - center=True, + center=center, ), method, )(**kwargs),