From 51079a75c2ada1ef002785e2db0343e984869eb0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 3 Jul 2024 12:48:18 -0700 Subject: [PATCH 1/4] adjust shuffle to produce partitions that are consistent with dask.dataframe.shuffle --- dask_cuda/explicit_comms/dataframe/shuffle.py | 43 ++++++++++--------- dask_cuda/tests/test_explicit_comms.py | 41 ++++++++++++++++-- 2 files changed, 60 insertions(+), 24 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 3f7b7951..0e5cc819 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -8,6 +8,9 @@ from operator import getitem from typing import Any, Callable, Dict, List, Optional, Set, TypeVar +import numpy as np +import pandas as pd + import dask import dask.config import dask.dataframe @@ -155,9 +158,16 @@ def compute_map_index( if column_names[0] == "_partitions": ind = df[column_names[0]] else: - ind = hash_object_dispatch( - df[column_names] if column_names else df, index=False - ) + # Need to cast numerical dtypes to be consistent + # with `dask.dataframe.shuffle.partitioning_index` + dtypes = {} + index = df[column_names] if column_names else df + for col, dtype in index.dtypes.items(): + if pd.api.types.is_numeric_dtype(dtype): + dtypes[col] = np.float64 + if dtypes: + index = index.astype(dtypes, errors="ignore") + ind = hash_object_dispatch(index, index=False) return ind % npartitions @@ -187,15 +197,7 @@ def partition_dataframe( partitions Dict of dataframe-partitions, mapping partition-ID to dataframe """ - if column_names[0] != "_partitions" and hasattr(df, "partition_by_hash"): - return dict( - zip( - range(npartitions), - df.partition_by_hash( - column_names, npartitions, keep_index=not ignore_index - ), - ) - ) + # TODO: Use `partition_by_hash` after dtype-casting is added map_index = compute_map_index(df, column_names, npartitions) return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index) @@ -529,18 +531,19 @@ def shuffle( # TODO: can we do this without using `submit()` to avoid the overhead # of creating a Future for each dataframe partition? - futures = [] + _futures = {} for rank in ranks: for part_id in rank_to_out_part_ids[rank]: - futures.append( - c.client.submit( - getitem, - shuffle_result[rank], - part_id, - workers=[c.worker_addresses[rank]], - ) + _futures[part_id] = c.client.submit( + getitem, + shuffle_result[rank], + part_id, + workers=[c.worker_addresses[rank]], ) + # Make sure partitions are properly ordered + futures = [_futures.pop(i) for i in range(npartitions)] + # Create a distributed Dataframe from all the pieces divs = [None] * (len(futures) + 1) kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"} diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index f495648e..ec931e03 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -109,7 +109,14 @@ def test_dataframe_merge_empty_partitions(): def check_partitions(df, npartitions): """Check that all values in `df` hashes to the same""" - hashes = partitioning_index(df, npartitions) + dtypes = {} + for col, dtype in df.dtypes.items(): + if pd.api.types.is_numeric_dtype(dtype): + dtypes[col] = np.float64 + if not dtypes: + dtypes = None + + hashes = partitioning_index(df, npartitions, cast_dtype=dtypes) if len(hashes) > 0: return len(hashes.unique()) == 1 else: @@ -132,7 +139,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): all_workers = list(client.get_worker_logs().keys()) comms.default_comms() np.random.seed(42) - df = pd.DataFrame({"key": np.random.random(100)}) + df = pd.DataFrame({"key": np.random.randint(0, high=100, size=100)}) if backend == "cudf": df = cudf.DataFrame.from_pandas(df) @@ -141,7 +148,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): for input_nparts in range(1, 5): for output_nparts in range(1, 5): - ddf = dd.from_pandas(df.copy(), npartitions=input_nparts).persist( + ddf1 = dd.from_pandas(df.copy(), npartitions=input_nparts).persist( workers=all_workers ) # To reduce test runtime, we change the batchsizes here instead @@ -149,7 +156,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): for batchsize in (-1, 1, 2): with dask.config.set(explicit_comms_batchsize=batchsize): ddf = explicit_comms_shuffle( - ddf, + ddf1, ["_partitions"] if _partitions else ["key"], npartitions=output_nparts, batchsize=batchsize, @@ -177,6 +184,32 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): got = ddf.compute().sort_values("key") assert_eq(got, expected) + # Check that partitioning is consistent with "tasks" + ddf_tasks = ddf1.shuffle( + ["key"], + npartitions=output_nparts, + shuffle_method="tasks", + ) + for i in range(output_nparts): + expected_partition = ddf_tasks.partitions[ + i + ].compute()["key"] + actual_partition = ddf.partitions[i].compute()[ + "key" + ] + if backend == "cudf": + expected_partition = ( + expected_partition.values_host + ) + actual_partition = actual_partition.values_host + else: + expected_partition = expected_partition.values + actual_partition = actual_partition.values + assert all( + np.sort(expected_partition) + == np.sort(actual_partition) + ) + @pytest.mark.parametrize("nworkers", [1, 2, 3]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) From 463ab8d2e5acc6c8fa095b0251da17269bfd6f27 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 3 Jul 2024 12:58:24 -0700 Subject: [PATCH 2/4] add link to PR to track partition_by_hash change --- dask_cuda/explicit_comms/dataframe/shuffle.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 0e5cc819..356b08ba 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -197,7 +197,8 @@ def partition_dataframe( partitions Dict of dataframe-partitions, mapping partition-ID to dataframe """ - # TODO: Use `partition_by_hash` after dtype-casting is added + # TODO: Use `partition_by_hash` if/when dtype-casting is added + # (See: https://github.com/rapidsai/dask-cuda/pull/1356) map_index = compute_map_index(df, column_names, npartitions) return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index) From 8bc757082cc8e55d2c6a9ffb4d449125e3022777 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 5 Jul 2024 07:11:34 -0700 Subject: [PATCH 3/4] fix strange error after persisting --- dask_cuda/tests/test_explicit_comms.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index ec931e03..2806dc1c 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -135,8 +135,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): worker_class=IncreasedCloseTimeoutNanny, processes=True, ) as cluster: - with Client(cluster) as client: - all_workers = list(client.get_worker_logs().keys()) + with Client(cluster): comms.default_comms() np.random.seed(42) df = pd.DataFrame({"key": np.random.randint(0, high=100, size=100)}) @@ -148,9 +147,7 @@ def _test_dataframe_shuffle(backend, protocol, n_workers, _partitions): for input_nparts in range(1, 5): for output_nparts in range(1, 5): - ddf1 = dd.from_pandas(df.copy(), npartitions=input_nparts).persist( - workers=all_workers - ) + ddf1 = dd.from_pandas(df.copy(), npartitions=input_nparts) # To reduce test runtime, we change the batchsizes here instead # of using a test parameter. for batchsize in (-1, 1, 2): From 234477713714aef22738555678f1026e7b071355 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Mon, 8 Jul 2024 16:15:25 -0500 Subject: [PATCH 4/4] Update dask_cuda/explicit_comms/dataframe/shuffle.py --- dask_cuda/explicit_comms/dataframe/shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 356b08ba..70f12335 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -198,7 +198,7 @@ def partition_dataframe( Dict of dataframe-partitions, mapping partition-ID to dataframe """ # TODO: Use `partition_by_hash` if/when dtype-casting is added - # (See: https://github.com/rapidsai/dask-cuda/pull/1356) + # (See: https://github.com/rapidsai/cudf/issues/16221) map_index = compute_map_index(df, column_names, npartitions) return group_split_dispatch(df, map_index, npartitions, ignore_index=ignore_index)