Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partitioning in explicit-comms shuffle #1356

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions dask_cuda/explicit_comms/dataframe/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from operator import getitem
from typing import Any, Callable, Dict, List, Optional, Set, TypeVar

import numpy as np
import pandas as pd

import dask
import dask.config
import dask.dataframe
Expand Down Expand Up @@ -155,9 +158,16 @@ def compute_map_index(
if column_names[0] == "_partitions":
ind = df[column_names[0]]
else:
ind = hash_object_dispatch(
df[column_names] if column_names else df, index=False
)
# Need to cast numerical dtypes to be consistent
# with `dask.dataframe.shuffle.partitioning_index`
dtypes = {}
index = df[column_names] if column_names else df
for col, dtype in index.dtypes.items():
if pd.api.types.is_numeric_dtype(dtype):
dtypes[col] = np.float64
if dtypes:
index = index.astype(dtypes, errors="ignore")
ind = hash_object_dispatch(index, index=False)
return ind % npartitions


Expand Down Expand Up @@ -187,15 +197,8 @@ def partition_dataframe(
partitions
Dict of dataframe-partitions, mapping partition-ID to dataframe
"""
if column_names[0] != "_partitions" and hasattr(df, "partition_by_hash"):
return dict(
zip(
range(npartitions),
df.partition_by_hash(
column_names, npartitions, keep_index=not ignore_index
),
)
)
# TODO: Use `partition_by_hash` if/when dtype-casting is added
# (See: https://github.com/rapidsai/cudf/issues/16221)
map_index = compute_map_index(df, column_names, npartitions)
return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index)

Expand Down Expand Up @@ -529,18 +532,19 @@ def shuffle(
# TODO: can we do this without using `submit()` to avoid the overhead
# of creating a Future for each dataframe partition?

futures = []
_futures = {}
for rank in ranks:
for part_id in rank_to_out_part_ids[rank]:
futures.append(
c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
)
_futures[part_id] = c.client.submit(
getitem,
shuffle_result[rank],
part_id,
workers=[c.worker_addresses[rank]],
)

# Make sure partitions are properly ordered
futures = [_futures.pop(i) for i in range(npartitions)]
Comment on lines +545 to +546
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ayushdg - FYI: I think this means the ordering of partitions could have been "wrong" even before the dtype-casting change in dask :/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. Based on the limited testing I've done the biggest change in results on my tests were from #1323 , even when paired with an older version of Dask prior to the dtype change.


# Create a distributed Dataframe from all the pieces
divs = [None] * (len(futures) + 1)
kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"}
Expand Down
46 changes: 38 additions & 8 deletions dask_cuda/tests/test_explicit_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,14 @@ def test_dataframe_merge_empty_partitions():

def check_partitions(df, npartitions):
"""Check that all values in `df` hashes to the same"""
hashes = partitioning_index(df, npartitions)
dtypes = {}
for col, dtype in df.dtypes.items():
if pd.api.types.is_numeric_dtype(dtype):
dtypes[col] = np.float64
if not dtypes:
dtypes = None

hashes = partitioning_index(df, npartitions, cast_dtype=dtypes)
if len(hashes) > 0:
return len(hashes.unique()) == 1
else:
Expand All @@ -128,11 +135,10 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):
worker_class=IncreasedCloseTimeoutNanny,
processes=True,
) as cluster:
with Client(cluster) as client:
all_workers = list(client.get_worker_logs().keys())
with Client(cluster):
comms.default_comms()
np.random.seed(42)
df = pd.DataFrame({"key": np.random.random(100)})
df = pd.DataFrame({"key": np.random.randint(0, high=100, size=100)})
if backend == "cudf":
df = cudf.DataFrame.from_pandas(df)

Expand All @@ -141,15 +147,13 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):

for input_nparts in range(1, 5):
for output_nparts in range(1, 5):
ddf = dd.from_pandas(df.copy(), npartitions=input_nparts).persist(
workers=all_workers
)
ddf1 = dd.from_pandas(df.copy(), npartitions=input_nparts)
Comment on lines -144 to +150
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange Finding: Even without the changes in this PR, I get a "cancelled" error when I persist ddf and then perform both an explicit comms shuffle and then a task-based shuffle. I don't understand the cause of this yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok still moving ahead with the changes even though there's a cancelation error?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The changes in this PR are independent of the cancellation error (it happens in branch-24.08 without these changes).

Another important detail: I only get the error when query-planning is disabled. Therefore, I assume the problem has to do with a key-name collision that doesn't happen with dask-expr (which is much more disciplined about key names than the legacy API is).

# To reduce test runtime, we change the batchsizes here instead
# of using a test parameter.
for batchsize in (-1, 1, 2):
with dask.config.set(explicit_comms_batchsize=batchsize):
ddf = explicit_comms_shuffle(
ddf,
ddf1,
Comment on lines -152 to +156
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit confusing to me that we were previously modifying the initial collection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to avoid that. I think @madsbk may have an idea if this was an oversight or was intentional when he gets back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was a oversight :)

["_partitions"] if _partitions else ["key"],
npartitions=output_nparts,
batchsize=batchsize,
Expand Down Expand Up @@ -177,6 +181,32 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions):
got = ddf.compute().sort_values("key")
assert_eq(got, expected)

# Check that partitioning is consistent with "tasks"
ddf_tasks = ddf1.shuffle(
["key"],
npartitions=output_nparts,
shuffle_method="tasks",
)
for i in range(output_nparts):
expected_partition = ddf_tasks.partitions[
i
].compute()["key"]
actual_partition = ddf.partitions[i].compute()[
"key"
]
if backend == "cudf":
expected_partition = (
expected_partition.values_host
)
actual_partition = actual_partition.values_host
else:
expected_partition = expected_partition.values
actual_partition = actual_partition.values
assert all(
np.sort(expected_partition)
== np.sort(actual_partition)
)


@pytest.mark.parametrize("nworkers", [1, 2, 3])
@pytest.mark.parametrize("backend", ["pandas", "cudf"])
Expand Down
Loading