diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 18c14b494eb..d652f6595a2 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2016,13 +2016,14 @@ def window( axis = Axis(axis) + # applies reduction function over entire virtual partition def window_function_complete(virtual_partition): virtual_partition_copy = virtual_partition.copy() window_result = reduce_fn(virtual_partition_copy) return window_result + # applies reduction function over one window of virtual partition def window_function_partition(virtual_partition): - virtual_partition_copy = virtual_partition.copy() window_result = reduce_fn(virtual_partition_copy) return window_result.iloc[:, window_size - 1 : ] if axis == Axis.COL_WISE else window_result.iloc[window_size - 1: , :] @@ -2037,8 +2038,8 @@ def window_function_partition(virtual_partition): # partitions to join in virtual partition parts_to_join = [starting_part] if (axis == Axis.ROW_WISE) else [[partition[0]] for partition in starting_part] + # used to determine if window continues into next partition or if we can create virtual partition last_window_span = window_size - 1 - k = i + 1 while (last_window_span > 0 and k < num_parts): @@ -2073,6 +2074,7 @@ def window_function_partition(virtual_partition): else: reduce_result = [virtual_partition.apply(window_function_partition) for virtual_partition in virtual_partitions] + # append reduction result to results array if axis == Axis.ROW_WISE: results.append(reduce_result) else: diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 895a134b0d1..43621d17da7 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -1806,8 +1806,7 @@ def rolling_aggregate(self, axis, rolling_args, func, *args, **kwargs): if not center and isinstance(window, int): return self.__constructor__( self._modin_frame.window( - axis, window, - lambda df: df.rolling(*rolling_args).aggregate(func=func, *args, **kwargs) + axis, window, lambda df: df.rolling(*rolling_args).aggregate(func=func, *args, **kwargs) ) ) else: