Skip to content

Commit

Permalink
Make test_rolling test center=False more and fix bugs
Browse files Browse the repository at this point in the history
Signed-off-by: mvashishtha <[email protected]>
  • Loading branch information
mvashishtha authored and christinafan committed Jun 26, 2023
1 parent fba5927 commit d9875d4
Show file tree
Hide file tree
Showing 3 changed files with 313 additions and 142 deletions.
109 changes: 75 additions & 34 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1995,7 +1995,7 @@ def window(
axis : int or modin.core.dataframe.base.utils.Axis
The axis to slide over.
reduce_fn : callable(rowgroup|colgroup) -> row|col
The reduce function to apply over the data.
The reduce function to apply over the data. This must not change metadata.
window_size : int
The number of rows/columns to pass to the reduce function.
(The size of the sliding window).
Expand All @@ -2007,13 +2007,8 @@ def window(
PandasDataframe
A new PandasDataframe with the reduce function applied over windows of the specified
axis.
Notes
-----
The user-defined reduce function must reduce each window’s column
(row if axis=1) down to a single value.
"""

axis = Axis(axis)

# applies reduction function over entire virtual partition
Expand All @@ -2026,34 +2021,72 @@ def window_function_complete(virtual_partition):
def window_function_partition(virtual_partition):
virtual_partition_copy = virtual_partition.copy()
window_result = reduce_fn(virtual_partition_copy)
return window_result.iloc[:, window_size - 1 : ] if axis == Axis.COL_WISE else window_result.iloc[window_size - 1: , :]
return (
window_result.iloc[:, window_size - 1 :]
if axis == Axis.COL_WISE
else window_result.iloc[window_size - 1 :, :]
)

num_parts = len(self._partitions[0]) if axis == Axis.COL_WISE else len(self._partitions)
num_parts = (
len(self._partitions[0]) if axis == Axis.COL_WISE else len(self._partitions)
)
results = []

for i in range(num_parts):
# get the ith partition
starting_part = self._partitions[:, [i]] if axis == Axis.COL_WISE else self._partitions[i]
# get the ith partition
starting_part = (
self._partitions[:, [i]]
if axis == Axis.COL_WISE
else self._partitions[i]
)

# partitions to join in virtual partition
parts_to_join = [starting_part] if (axis == Axis.ROW_WISE) else [[partition[0]] for partition in starting_part]
parts_to_join = (
[starting_part]
if (axis == Axis.ROW_WISE)
else [[partition[0]] for partition in starting_part]
)

# used to determine if window continues into next partition or if we can create virtual partition
last_window_span = window_size - 1
k = i + 1

while (last_window_span > 0 and k < num_parts):
while last_window_span > 0 and k < num_parts:
# new partition
new_parts = self._partitions[:, [k]] if axis == Axis.COL_WISE else self._partitions[k]
part_len = new_parts[0][0].width() if axis == Axis.COL_WISE else new_parts[0].length()
new_parts = (
self._partitions[:, [k]]
if axis == Axis.COL_WISE
else self._partitions[k]
)
part_len = (
new_parts[0][0].width()
if axis == Axis.COL_WISE
else new_parts[0].length()
)

if (last_window_span <= part_len):
if last_window_span <= part_len:
if axis == Axis.COL_WISE:
masked_new_parts = [[part[0].mask(row_labels = slice(None), col_labels = slice(0, last_window_span))] for part in new_parts]
masked_new_parts = [
[
part[0].mask(
row_labels=slice(None),
col_labels=slice(0, last_window_span),
)
]
for part in new_parts
]
for x, r in enumerate(parts_to_join):
r.append(masked_new_parts[x][0])
else:
masked_new_parts = np.array([part.mask(row_labels = slice(0, last_window_span), col_labels=slice(None)) for part in new_parts])
masked_new_parts = np.array(
[
part.mask(
row_labels=slice(0, last_window_span),
col_labels=slice(None),
)
for part in new_parts
]
)
parts_to_join.append(masked_new_parts)
break
else:
Expand All @@ -2064,37 +2097,45 @@ def window_function_partition(virtual_partition):
else:
parts_to_join.append(new_parts)
last_window_span -= part_len
k += 1
k += 1

# create virtual partition and perform window operation
virtual_partitions = self._partition_mgr_cls.row_partitions(np.array(parts_to_join), full_axis = False) if axis == Axis.COL_WISE else self._partition_mgr_cls.column_partitions(np.array(parts_to_join), full_axis=False)
virtual_partitions = (
self._partition_mgr_cls.row_partitions(
np.array(parts_to_join), full_axis=False
)
if axis == Axis.COL_WISE
else self._partition_mgr_cls.column_partitions(
np.array(parts_to_join), full_axis=False
)
)

if i == 0:
reduce_result = [virtual_partition.apply(window_function_complete) for virtual_partition in virtual_partitions]
reduce_result = [
virtual_partition.apply(window_function_complete)
for virtual_partition in virtual_partitions
]
else:
reduce_result = [virtual_partition.apply(window_function_partition) for virtual_partition in virtual_partitions]

reduce_result = [
virtual_partition.apply(window_function_partition)
for virtual_partition in virtual_partitions
]

# append reduction result to results array
if axis == Axis.ROW_WISE:
results.append(reduce_result)
else:
if results == []:
results = [[x] for x in reduce_result]
else:
else:
for x, r in enumerate(results):
r.append(reduce_result[x])
r.append(reduce_result[x])

results = np.array(results)

return self.__constructor__(
results,
self.index,
self.columns,
None,
None,
result_schema
)

results, self.index, self.columns, None, None, result_schema
)

@lazy_metadata_decorator(apply_axis="both")
def fold(self, axis, func, new_columns=None):
Expand Down
Loading

0 comments on commit d9875d4

Please sign in to comment.