From 85c64d4bdedfbeebc2e278f1e0b31a343b0f9666 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 18 Mar 2024 10:21:35 -0700 Subject: [PATCH 01/18] relax type-check --- dask_cuda/tests/test_proxy.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dask_cuda/tests/test_proxy.py b/dask_cuda/tests/test_proxy.py index 5458c5ba..31a9e996 100644 --- a/dask_cuda/tests/test_proxy.py +++ b/dask_cuda/tests/test_proxy.py @@ -537,10 +537,10 @@ def test_from_cudf_of_proxy_object(): assert has_parallel_type(df) ddf = dask_cudf.from_cudf(df, npartitions=1) - assert has_parallel_type(ddf) + assert has_parallel_type(ddf._meta) # Notice, the output is a dask-cudf dataframe and not a proxy object - assert type(ddf) is dask_cudf.core.DataFrame + assert type(ddf._meta) is cudf.DataFrame def test_proxy_object_parquet(tmp_path): From be984be912f66e4b9c010c9bc986ffb33144e6b6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 18 Mar 2024 11:50:20 -0700 Subject: [PATCH 02/18] work toward dask-expr support in dask-cuda --- dask_cuda/benchmarks/local_cudf_merge.py | 4 ++-- dask_cuda/benchmarks/local_cudf_shuffle.py | 7 +++---- dask_cuda/explicit_comms/dataframe/shuffle.py | 5 +++-- dask_cuda/tests/test_explicit_comms.py | 5 +++++ dask_cuda/utils.py | 15 +++++++++++++++ 5 files changed, 28 insertions(+), 8 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index ba3a9d56..243fb545 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -8,7 +8,6 @@ import dask from dask.base import tokenize -from dask.dataframe.core import new_dd_object from dask.distributed import performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -20,6 +19,7 @@ print_separator, print_throughput_bandwidth, ) +from dask_cuda.utils import _make_collection # Benchmarking cuDF merge operation based on # @@ -123,7 +123,7 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): for i, part in enumerate(parts) } - ddf = new_dd_object(graph, name, meta, divisions) + ddf = _make_collection(graph, name, meta, divisions) if chunk_type == "build": if not args.no_shuffle: diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index a3492b66..249137b1 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -8,8 +8,6 @@ import dask import dask.dataframe -from dask.dataframe.core import new_dd_object -from dask.dataframe.shuffle import shuffle from dask.distributed import Client, performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -22,6 +20,7 @@ print_separator, print_throughput_bandwidth, ) +from dask_cuda.utils import _make_collection try: import cupy @@ -33,7 +32,7 @@ def shuffle_dask(df, args): - result = shuffle(df, index="data", shuffle="tasks", ignore_index=args.ignore_index) + result = df.shuffle("data", shuffle_method="tasks", ignore_index=args.ignore_index) if args.backend == "dask-noop": result = as_noop(result) t1 = perf_counter() @@ -105,7 +104,7 @@ def create_data( df_meta = create_df(0, args.type) divs = [None] * (len(dsk) + 1) - ret = new_dd_object(dsk, name, df_meta, divs).persist() + ret = _make_collection(dsk, name, df_meta, divs).persist() wait(ret) data_processed = args.in_parts * args.partition_size diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index ca69156d..b370cd57 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -14,13 +14,14 @@ import dask.utils import distributed.worker from dask.base import tokenize -from dask.dataframe.core import DataFrame, Series, _concat as dd_concat, new_dd_object +from dask.dataframe.core import DataFrame, Series, _concat as dd_concat from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch from distributed import wait from distributed.protocol import nested_deserialize, to_serialize from distributed.worker import Worker from .. import comms +from dask_cuda.utils import _make_collection T = TypeVar("T") @@ -538,7 +539,7 @@ def shuffle( # Create a distributed Dataframe from all the pieces divs = [None] * (len(dsk) + 1) - ret = new_dd_object(dsk, name, df_meta, divs).persist() + ret = _make_collection(dsk, name, df_meta, divs).persist() wait([ret]) # Release all temporary dataframes diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index ed34f21f..54724754 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -28,6 +28,11 @@ # Notice, all of the following tests is executed in a new process such # that UCX options of the different tests doesn't conflict. +# Skip these tests when dask-expr is active (for now) +pytest.mark.skipif( + dask.config.get("dataframe.query-planning", None) is not False, + reason="https://github.com/rapidsai/dask-cuda/issues/1311", +) async def my_rank(state, arg): return state["rank"] + arg diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index ff4dbbae..fec8a92f 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -764,3 +764,18 @@ def get_rmm_memory_resource_stack(mr) -> list: if isinstance(mr, rmm.mr.StatisticsResourceAdaptor): return mr.allocation_counts["current_bytes"] return None + +def _make_collection(graph, name, meta, divisions): + # Create a DataFrame collection from a task graph. + # Accounts for legacy vs dask-expr API + try: + # New expression-based API + from dask.dataframe import from_graph + + keys = [(name, i) for i in range(len(divisions))] + return from_graph(graph, meta, divisions, keys, name) + except ImportError: + # Legacy API + from dask.dataframe.core import new_dd_object + + return new_dd_object(graph, name, meta, divisions) From c11c47c7cbf0e26fab571aa4b76df20a360efbf5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 19 Mar 2024 09:23:11 -0700 Subject: [PATCH 03/18] remove skip --- dask_cuda/tests/test_explicit_comms.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 3db76252..105d3b10 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -22,12 +22,6 @@ from dask_cuda.explicit_comms.dataframe.shuffle import shuffle as explicit_comms_shuffle from dask_cuda.utils_test import IncreasedCloseTimeoutNanny -# Skip these tests when dask-expr is active (for now) -pytestmark = pytest.mark.skipif( - dask.config.get("dataframe.query-planning", None) is not False, - reason="https://github.com/rapidsai/dask-cuda/issues/1311", -) - mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") @@ -35,11 +29,6 @@ # Notice, all of the following tests is executed in a new process such # that UCX options of the different tests doesn't conflict. -# Skip these tests when dask-expr is active (for now) -pytest.mark.skipif( - dask.config.get("dataframe.query-planning", None) is not False, - reason="https://github.com/rapidsai/dask-cuda/issues/1311", -) async def my_rank(state, arg): return state["rank"] + arg From dc3212788d0006c2edea8b5b55af2b65f4556cba Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 20 Mar 2024 12:04:57 -0700 Subject: [PATCH 04/18] use futuers_of --- dask_cuda/explicit_comms/dataframe/shuffle.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index b370cd57..c43a9016 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -14,15 +14,17 @@ import dask.utils import distributed.worker from dask.base import tokenize -from dask.dataframe.core import DataFrame, Series, _concat as dd_concat +from dask.dataframe import DataFrame, Series +from dask.dataframe.core import _concat as dd_concat from dask.dataframe.shuffle import group_split_dispatch, hash_object_dispatch from distributed import wait from distributed.protocol import nested_deserialize, to_serialize from distributed.worker import Worker -from .. import comms from dask_cuda.utils import _make_collection +from .. import comms + T = TypeVar("T") @@ -469,8 +471,9 @@ def shuffle( npartitions = df.npartitions # Step (a): - df = df.persist() # Make sure optimizations are apply on the existing graph + df = df.persist() # Make sure optimizations are applied on the existing graph wait([df]) # Make sure all keys has been materialized on workers + persisted_keys = [f.key for f in c.client.futures_of(df)] name = ( "explicit-comms-shuffle-" f"{tokenize(df, column_names, npartitions, ignore_index)}" @@ -480,7 +483,7 @@ def shuffle( # Stage all keys of `df` on the workers and cancel them, which makes it possible # for the shuffle to free memory as the partitions of `df` are consumed. # See CommsContext.stage_keys() for a description of staging. - rank_to_inkeys = c.stage_keys(name=name, keys=df.__dask_keys__()) + rank_to_inkeys = c.stage_keys(name=name, keys=persisted_keys) c.client.cancel(df) # Get batchsize From ad2ea62b185af5a0418ceff56869ed32317cf3a5 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 20 Mar 2024 13:53:27 -0700 Subject: [PATCH 05/18] fix p2p problem --- dask_cuda/benchmarks/local_cudf_merge.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 243fb545..1eb6d8d1 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -25,12 +25,18 @@ # +# Set default shuffle method to "tasks" +if dask.config.get("dataframe.shuffle.method", None) is None: + dask.config.set({"dataframe.shuffle.method": "tasks"}) + + def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): # Setting a seed that triggers max amount of comm in the two-GPU case. if gpu: import cupy as xp import cudf as xdf + import dask_cudf # noqa: F401 else: import numpy as xp import pandas as xdf From 107e31af49973838f1caa030f765cc1640d336e0 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 20 Mar 2024 14:19:35 -0700 Subject: [PATCH 06/18] format --- dask_cuda/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index fec8a92f..ffd549e0 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -765,6 +765,7 @@ def get_rmm_memory_resource_stack(mr) -> list: return mr.allocation_counts["current_bytes"] return None + def _make_collection(graph, name, meta, divisions): # Create a DataFrame collection from a task graph. # Accounts for legacy vs dask-expr API From 71b9956ef3a0b169a1e87b697419c5ab942bcebe Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 20 Mar 2024 16:11:47 -0700 Subject: [PATCH 07/18] test commit --- dask_cuda/tests/test_explicit_comms.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 105d3b10..b9b2ec9a 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -83,6 +83,7 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): pd.testing.assert_frame_equal(got, expected) +@pytest.mark.skip(reason="TEST DO NOT MERGE") def test_dataframe_merge_empty_partitions(): # Notice, we use more partitions than rows p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) @@ -221,6 +222,7 @@ def check_shuffle(): check_shuffle() +@pytest.mark.skip(reason="TEST DO NOT MERGE") @pytest.mark.parametrize("in_cluster", [True, False]) def test_dask_use_explicit_comms(in_cluster): def _timeout(process, function, timeout): @@ -283,6 +285,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): assert_eq(got, expected) +@pytest.mark.skip(reason="TEST DO NOT MERGE") @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) From 0f8bb29e0f243a1dcd9ac4eeeae302ff5f30c556 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 07:41:31 -0700 Subject: [PATCH 08/18] update ci and add NotImplementedError --- ci/test_python.sh | 43 ++++++++++++++++++++++++-- dask_cuda/__init__.py | 12 +++++++ dask_cuda/tests/test_explicit_comms.py | 20 ++++++++++-- 3 files changed, 70 insertions(+), 5 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index f700c935..e633bee6 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -38,8 +38,9 @@ EXITCODE=0 trap "EXITCODE=1" ERR set +e -rapids-logger "pytest dask-cuda" +rapids-logger "pytest dask-cuda (dask-expr)" pushd dask_cuda +DASK_DATAFRAME__QUERY_PLANNING=True \ DASK_CUDA_TEST_SINGLE_GPU=1 \ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ @@ -58,13 +59,51 @@ timeout 60m pytest \ tests -k "not ucxx" popd -rapids-logger "Run local benchmark" +rapids-logger "pytest explicit-comms (legacy dd)" +pushd dask_cuda +DASK_DATAFRAME__QUERY_PLANNING=False \ +DASK_CUDA_TEST_SINGLE_GPU=1 \ +DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ +UCXPY_IFNAME=eth0 \ +UCX_WARN_UNUSED_ENV_VARS=n \ +UCX_MEMTYPE_CACHE=n \ +timeout 60m pytest \ + -vv \ + --durations=0 \ + --capture=no \ + --cache-clear \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-dask-cuda-legacy.xml" \ + --cov-config=../pyproject.toml \ + --cov=dask_cuda \ + --cov-report=xml:"${RAPIDS_COVERAGE_DIR}/dask-cuda-coverage-legacy.xml" \ + --cov-report=term \ + tests/test_explicit_comms.py -k "not ucxx" +popd + +rapids-logger "Run local benchmark (dask-expr)" +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend dask + +DASK_DATAFRAME__QUERY_PLANNING=True \ +python dask_cuda/benchmarks/local_cudf_shuffle.py \ + --partition-size="1 KiB" \ + -d 0 \ + --runs 1 \ + --backend explicit-comms + +rapids-logger "Run local benchmark (legacy dd)" +DASK_DATAFRAME__QUERY_PLANNING=False \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ --runs 1 \ --backend dask +DASK_DATAFRAME__QUERY_PLANNING=False \ python dask_cuda/benchmarks/local_cudf_shuffle.py \ --partition-size="1 KiB" \ -d 0 \ diff --git a/dask_cuda/__init__.py b/dask_cuda/__init__.py index 30f987ac..516599da 100644 --- a/dask_cuda/__init__.py +++ b/dask_cuda/__init__.py @@ -20,6 +20,18 @@ from .proxify_device_objects import proxify_decorator, unproxify_decorator +if dask.config.get("dataframe.query-planning", None) is not False and dask.config.get( + "explicit-comms", False +): + raise NotImplementedError( + "The 'explicit-comms' config is not yet supported when " + "query-planning is enabled in dask. Please use the shuffle " + "API directly, or use the legacy dask-dataframe API " + "(set the 'dataframe.query-planning' config to `False`" + "before importing `dask.dataframe`).", + ) + + # Monkey patching Dask to make use of explicit-comms when `DASK_EXPLICIT_COMMS=True` dask.dataframe.shuffle.rearrange_by_column = get_rearrange_by_column_wrapper( dask.dataframe.shuffle.rearrange_by_column diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index b9b2ec9a..49995592 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -26,6 +26,20 @@ ucp = pytest.importorskip("ucp") +# Skip these tests when dask-expr is active (for now) +query_planning_skip = pytest.mark.skipif( + dask.config.get("dataframe.query-planning", None) is not False, + reason=( + "The 'explicit-comms' config is not supported " + "when query planning is enabled.", + ), +) + +# Set default shuffle method to "tasks" +if dask.config.get("dataframe.shuffle.method", None) is None: + dask.config.set({"dataframe.shuffle.method": "tasks"}) + + # Notice, all of the following tests is executed in a new process such # that UCX options of the different tests doesn't conflict. @@ -83,7 +97,7 @@ def _test_dataframe_merge_empty_partitions(nrows, npartitions): pd.testing.assert_frame_equal(got, expected) -@pytest.mark.skip(reason="TEST DO NOT MERGE") +@query_planning_skip def test_dataframe_merge_empty_partitions(): # Notice, we use more partitions than rows p = mp.Process(target=_test_dataframe_merge_empty_partitions, args=(2, 4)) @@ -222,7 +236,7 @@ def check_shuffle(): check_shuffle() -@pytest.mark.skip(reason="TEST DO NOT MERGE") +@query_planning_skip @pytest.mark.parametrize("in_cluster", [True, False]) def test_dask_use_explicit_comms(in_cluster): def _timeout(process, function, timeout): @@ -285,7 +299,7 @@ def _test_dataframe_shuffle_merge(backend, protocol, n_workers): assert_eq(got, expected) -@pytest.mark.skip(reason="TEST DO NOT MERGE") +@query_planning_skip @pytest.mark.parametrize("nworkers", [1, 2, 4]) @pytest.mark.parametrize("backend", ["pandas", "cudf"]) @pytest.mark.parametrize("protocol", ["tcp", "ucx", "ucxx"]) From d04c7556fbd4a783e28737975e35555aef8bb607 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 08:07:39 -0700 Subject: [PATCH 09/18] add NotImplementedError to local_cudf_merge.py --- dask_cuda/benchmarks/local_cudf_merge.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 1eb6d8d1..154b70c7 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -374,9 +374,27 @@ def parse_args(): if __name__ == "__main__": + args = parse_args() + + # Raise error early if "explicit-comms" is not allowed + if ( + args.backend == "explicit-comms" + and dask.config.get( + "dataframe.query-planning", + None, + ) + is not False + ): + raise NotImplementedError( + "The 'explicit-comms' config is not yet supported when " + "query-planning is enabled in dask. Please use the legacy " + "dask-dataframe API (set the 'dataframe.query-planning' " + "config to `False` before executing).", + ) + execute_benchmark( Config( - args=parse_args(), + args=args, bench_once=bench_once, create_tidy_results=create_tidy_results, pretty_print_results=pretty_print_results, From 1b6ef7d847e86131adbf72242ca9665f466423db Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 08:19:05 -0700 Subject: [PATCH 10/18] use from_delayed - TODO: cleanup --- dask_cuda/explicit_comms/dataframe/shuffle.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index c43a9016..0e6a2f96 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -11,6 +11,7 @@ import dask import dask.config import dask.dataframe +import dask.dataframe as dd import dask.utils import distributed.worker from dask.base import tokenize @@ -21,8 +22,6 @@ from distributed.protocol import nested_deserialize, to_serialize from distributed.worker import Worker -from dask_cuda.utils import _make_collection - from .. import comms T = TypeVar("T") @@ -542,7 +541,7 @@ def shuffle( # Create a distributed Dataframe from all the pieces divs = [None] * (len(dsk) + 1) - ret = _make_collection(dsk, name, df_meta, divs).persist() + ret = dd.from_delayed(dsk.values(), meta=df_meta, divisions=divs).persist() wait([ret]) # Release all temporary dataframes From fcc0e04a7c21f163df04d4eea49272e194146dd8 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 08:58:57 -0700 Subject: [PATCH 11/18] remove _make_collection - isn't needed --- dask_cuda/benchmarks/local_cudf_merge.py | 41 ++++++++++--------- dask_cuda/benchmarks/local_cudf_shuffle.py | 19 +++++---- dask_cuda/explicit_comms/dataframe/shuffle.py | 20 +++++---- dask_cuda/utils.py | 16 -------- 4 files changed, 44 insertions(+), 52 deletions(-) diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 154b70c7..4095dd39 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -7,7 +7,7 @@ import pandas as pd import dask -from dask.base import tokenize +import dask.dataframe as dd from dask.distributed import performance_report, wait from dask.utils import format_bytes, parse_bytes @@ -19,7 +19,6 @@ print_separator, print_throughput_bandwidth, ) -from dask_cuda.utils import _make_collection # Benchmarking cuDF merge operation based on # @@ -30,7 +29,9 @@ dask.config.set({"dataframe.shuffle.method": "tasks"}) -def generate_chunk(i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu): +def generate_chunk(input): + i_chunk, local_size, num_chunks, chunk_type, frac_match, gpu = input + # Setting a seed that triggers max amount of comm in the two-GPU case. if gpu: import cupy as xp @@ -111,25 +112,25 @@ def get_random_ddf(chunk_size, num_chunks, frac_match, chunk_type, args): parts = [chunk_size for _ in range(num_chunks)] device_type = True if args.type == "gpu" else False - meta = generate_chunk(0, 4, 1, chunk_type, None, device_type) + meta = generate_chunk((0, 4, 1, chunk_type, None, device_type)) divisions = [None] * (len(parts) + 1) - name = "generate-data-" + tokenize(chunk_size, num_chunks, frac_match, chunk_type) - - graph = { - (name, i): ( - generate_chunk, - i, - part, - len(parts), - chunk_type, - frac_match, - device_type, - ) - for i, part in enumerate(parts) - } - - ddf = _make_collection(graph, name, meta, divisions) + ddf = dd.from_map( + generate_chunk, + [ + ( + i, + part, + len(parts), + chunk_type, + frac_match, + device_type, + ) + for i, part in enumerate(parts) + ], + meta=meta, + divisions=divisions, + ) if chunk_type == "build": if not args.no_shuffle: diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 249137b1..6899243e 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -20,7 +20,6 @@ print_separator, print_throughput_bandwidth, ) -from dask_cuda.utils import _make_collection try: import cupy @@ -93,18 +92,24 @@ def create_data( ) # Create partition based to the specified partition distribution - dsk = {} + futures = [] for i, part_size in enumerate(dist): for _ in range(part_size): # We use `client.submit` to control placement of the partition. - dsk[(name, len(dsk))] = client.submit( - create_df, chunksize, args.type, workers=[workers[i]], pure=False + futures.append( + client.submit( + create_df, chunksize, args.type, workers=[workers[i]], pure=False + ) ) - wait(dsk.values()) + wait(futures) df_meta = create_df(0, args.type) - divs = [None] * (len(dsk) + 1) - ret = _make_collection(dsk, name, df_meta, divs).persist() + divs = [None] * (len(futures) + 1) + ret = dask.dataframe.from_delayed( + futures, + meta=df_meta, + divisions=divs, + ).persist() wait(ret) data_processed = args.in_parts * args.partition_size diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 0e6a2f96..71e47f5b 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -529,23 +529,25 @@ def shuffle( # TODO: can we do this without using `submit()` to avoid the overhead # of creating a Future for each dataframe partition? - dsk = {} + futures = [] for rank in ranks: for part_id in rank_to_out_part_ids[rank]: - dsk[(name, part_id)] = c.client.submit( - getitem, - shuffle_result[rank], - part_id, - workers=[c.worker_addresses[rank]], + futures.append( + c.client.submit( + getitem, + shuffle_result[rank], + part_id, + workers=[c.worker_addresses[rank]], + ) ) # Create a distributed Dataframe from all the pieces - divs = [None] * (len(dsk) + 1) - ret = dd.from_delayed(dsk.values(), meta=df_meta, divisions=divs).persist() + divs = [None] * (len(futures) + 1) + ret = dd.from_delayed(futures, meta=df_meta, divisions=divs).persist() wait([ret]) # Release all temporary dataframes - for fut in [*shuffle_result.values(), *dsk.values()]: + for fut in [*shuffle_result.values(), *futures]: fut.release() return ret diff --git a/dask_cuda/utils.py b/dask_cuda/utils.py index ffd549e0..ff4dbbae 100644 --- a/dask_cuda/utils.py +++ b/dask_cuda/utils.py @@ -764,19 +764,3 @@ def get_rmm_memory_resource_stack(mr) -> list: if isinstance(mr, rmm.mr.StatisticsResourceAdaptor): return mr.allocation_counts["current_bytes"] return None - - -def _make_collection(graph, name, meta, divisions): - # Create a DataFrame collection from a task graph. - # Accounts for legacy vs dask-expr API - try: - # New expression-based API - from dask.dataframe import from_graph - - keys = [(name, i) for i in range(len(divisions))] - return from_graph(graph, meta, divisions, keys, name) - except ImportError: - # Legacy API - from dask.dataframe.core import new_dd_object - - return new_dd_object(graph, name, meta, divisions) From 69421df293c9e14162922049f5abeb63ead02a9b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 12:06:11 -0700 Subject: [PATCH 12/18] use prefix kwarg - comming soon to dask-expr anyway --- dask_cuda/explicit_comms/dataframe/shuffle.py | 9 ++++++++- dask_cuda/tests/test_explicit_comms.py | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 71e47f5b..d5dda66d 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -543,7 +543,14 @@ def shuffle( # Create a distributed Dataframe from all the pieces divs = [None] * (len(futures) + 1) - ret = dd.from_delayed(futures, meta=df_meta, divisions=divs).persist() + kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"} + try: + ret = dd.from_delayed(futures, **kwargs).persist() + except TypeError: + # This version of dask may not support `prefix` + # See: https://github.com/dask/dask-expr/pull/991 + kwargs.pop("prefix") + ret = dd.from_delayed(futures, **kwargs).persist() wait([ret]) # Release all temporary dataframes diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 49995592..b39f2f2b 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -31,7 +31,7 @@ dask.config.get("dataframe.query-planning", None) is not False, reason=( "The 'explicit-comms' config is not supported " - "when query planning is enabled.", + "when query planning is enabled." ), ) From c79b1f1ec3a6782458e7ef64c9d14ec17627e460 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 12:17:47 -0700 Subject: [PATCH 13/18] make sure batchsize is used in key --- dask_cuda/explicit_comms/dataframe/shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index d5dda66d..6e690f29 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -475,7 +475,7 @@ def shuffle( persisted_keys = [f.key for f in c.client.futures_of(df)] name = ( "explicit-comms-shuffle-" - f"{tokenize(df, column_names, npartitions, ignore_index)}" + f"{tokenize(df, column_names, npartitions, ignore_index, batchsize)}" ) df_meta: DataFrame = df._meta From 68079bbf3c948ceff5fe89b9af7b05a2f5452daf Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 17:22:51 -0700 Subject: [PATCH 14/18] skip for dask<2024.2.1 --- dask_cuda/tests/test_explicit_comms.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index b39f2f2b..25a2b257 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -25,10 +25,11 @@ mp = mp.get_context("spawn") # type: ignore ucp = pytest.importorskip("ucp") +QUERY_PLANNING_ON = dask.config.get("dataframe.query-planning", None) is not False # Skip these tests when dask-expr is active (for now) query_planning_skip = pytest.mark.skipif( - dask.config.get("dataframe.query-planning", None) is not False, + QUERY_PLANNING_ON, reason=( "The 'explicit-comms' config is not supported " "when query planning is enabled." @@ -185,6 +186,10 @@ def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": pytest.importorskip("cudf") + if QUERY_PLANNING_ON: + # There seem to be problems with dask-expr + dask<2024.2.1 + pytest.importorskip("dask", minversion="2024.2.1") + p = mp.Process( target=_test_dataframe_shuffle, args=(backend, protocol, nworkers, _partitions) ) From af6aa675c225002548354f88473e4b278f56e42b Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 21 Mar 2024 18:17:59 -0700 Subject: [PATCH 15/18] more tweaks --- dask_cuda/tests/test_explicit_comms.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index 25a2b257..a7e8fe88 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -187,8 +187,9 @@ def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): pytest.importorskip("cudf") if QUERY_PLANNING_ON: - # There seem to be problems with dask-expr + dask<2024.2.1 + # There seem to be problems with dask_expr<1.0 and dask<2024.2.1 pytest.importorskip("dask", minversion="2024.2.1") + pytest.importorskip("dask_expr", minversion="1.0") p = mp.Process( target=_test_dataframe_shuffle, args=(backend, protocol, nworkers, _partitions) From 4a93bc9baf710cb1a1c34115cc9c91d409a43daa Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Apr 2024 08:40:31 -0700 Subject: [PATCH 16/18] remove workaround logic --- dask_cuda/explicit_comms/dataframe/shuffle.py | 8 +------- dask_cuda/tests/test_explicit_comms.py | 5 ----- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/dask_cuda/explicit_comms/dataframe/shuffle.py b/dask_cuda/explicit_comms/dataframe/shuffle.py index 6e690f29..3f7b7951 100644 --- a/dask_cuda/explicit_comms/dataframe/shuffle.py +++ b/dask_cuda/explicit_comms/dataframe/shuffle.py @@ -544,13 +544,7 @@ def shuffle( # Create a distributed Dataframe from all the pieces divs = [None] * (len(futures) + 1) kwargs = {"meta": df_meta, "divisions": divs, "prefix": "explicit-comms-shuffle"} - try: - ret = dd.from_delayed(futures, **kwargs).persist() - except TypeError: - # This version of dask may not support `prefix` - # See: https://github.com/dask/dask-expr/pull/991 - kwargs.pop("prefix") - ret = dd.from_delayed(futures, **kwargs).persist() + ret = dd.from_delayed(futures, **kwargs).persist() wait([ret]) # Release all temporary dataframes diff --git a/dask_cuda/tests/test_explicit_comms.py b/dask_cuda/tests/test_explicit_comms.py index a7e8fe88..f495648e 100644 --- a/dask_cuda/tests/test_explicit_comms.py +++ b/dask_cuda/tests/test_explicit_comms.py @@ -186,11 +186,6 @@ def test_dataframe_shuffle(backend, protocol, nworkers, _partitions): if backend == "cudf": pytest.importorskip("cudf") - if QUERY_PLANNING_ON: - # There seem to be problems with dask_expr<1.0 and dask<2024.2.1 - pytest.importorskip("dask", minversion="2024.2.1") - pytest.importorskip("dask_expr", minversion="1.0") - p = mp.Process( target=_test_dataframe_shuffle, args=(backend, protocol, nworkers, _partitions) ) From 92b496c4562b47edcf16e5707351cd4df3f70e68 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Apr 2024 14:44:32 -0700 Subject: [PATCH 17/18] respond to code review --- ci/test_python.sh | 2 +- dask_cuda/benchmarks/local_cudf_merge.py | 20 +---------------- dask_cuda/benchmarks/local_cudf_shuffle.py | 4 +++- dask_cuda/benchmarks/utils.py | 25 +++++++++++++++++++++- 4 files changed, 29 insertions(+), 22 deletions(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index c6cf55a2..ea66165d 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -67,7 +67,7 @@ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ UCX_WARN_UNUSED_ENV_VARS=n \ UCX_MEMTYPE_CACHE=n \ -timeout 60m pytest \ +timeout 15m pytest \ -vv \ --durations=0 \ --capture=no \ diff --git a/dask_cuda/benchmarks/local_cudf_merge.py b/dask_cuda/benchmarks/local_cudf_merge.py index 4095dd39..6a68ad78 100644 --- a/dask_cuda/benchmarks/local_cudf_merge.py +++ b/dask_cuda/benchmarks/local_cudf_merge.py @@ -375,27 +375,9 @@ def parse_args(): if __name__ == "__main__": - args = parse_args() - - # Raise error early if "explicit-comms" is not allowed - if ( - args.backend == "explicit-comms" - and dask.config.get( - "dataframe.query-planning", - None, - ) - is not False - ): - raise NotImplementedError( - "The 'explicit-comms' config is not yet supported when " - "query-planning is enabled in dask. Please use the legacy " - "dask-dataframe API (set the 'dataframe.query-planning' " - "config to `False` before executing).", - ) - execute_benchmark( Config( - args=args, + args=parse_args(), bench_once=bench_once, create_tidy_results=create_tidy_results, pretty_print_results=pretty_print_results, diff --git a/dask_cuda/benchmarks/local_cudf_shuffle.py b/dask_cuda/benchmarks/local_cudf_shuffle.py index 6899243e..a1129dd3 100644 --- a/dask_cuda/benchmarks/local_cudf_shuffle.py +++ b/dask_cuda/benchmarks/local_cudf_shuffle.py @@ -258,7 +258,9 @@ def parse_args(): ] return parse_benchmark_args( - description="Distributed shuffle (dask/cudf) benchmark", args_list=special_args + description="Distributed shuffle (dask/cudf) benchmark", + args_list=special_args, + check_explicit_comms=False, ) diff --git a/dask_cuda/benchmarks/utils.py b/dask_cuda/benchmarks/utils.py index 51fae720..5ac79a88 100644 --- a/dask_cuda/benchmarks/utils.py +++ b/dask_cuda/benchmarks/utils.py @@ -11,6 +11,7 @@ import numpy as np import pandas as pd +from dask import config from dask.distributed import Client, SSHCluster from dask.utils import format_bytes, format_time, parse_bytes from distributed.comm.addressing import get_address_host @@ -47,7 +48,11 @@ def as_noop(dsk): raise RuntimeError("Requested noop computation but dask-noop not installed.") -def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[]): +def parse_benchmark_args( + description="Generic dask-cuda Benchmark", + args_list=[], + check_explicit_comms=True, +): parser = argparse.ArgumentParser(description=description) worker_args = parser.add_argument_group(description="Worker configuration") worker_args.add_argument( @@ -317,6 +322,24 @@ def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[] if args.multi_node and len(args.hosts.split(",")) < 2: raise ValueError("--multi-node requires at least 2 hosts") + # Raise error early if "explicit-comms" is not allowed + if ( + check_explicit_comms + and args.backend == "explicit-comms" + and config.get( + "dataframe.query-planning", + None, + ) + is not False + ): + raise NotImplementedError( + "The 'explicit-comms' config is not yet supported when " + "query-planning is enabled in dask. Please use the legacy " + "dask-dataframe API by setting the following environment " + "variable before executing:", + " DASK_DATAFRAME__QUERY_PLANNING=False", + ) + return args From 48e661589579d0479ed8593de20590b6fbc261b1 Mon Sep 17 00:00:00 2001 From: Peter Andreas Entschev Date: Wed, 3 Apr 2024 09:28:54 +0200 Subject: [PATCH 18/18] Increase timeout of explicit comms tests --- ci/test_python.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ci/test_python.sh b/ci/test_python.sh index ea66165d..79a058a7 100755 --- a/ci/test_python.sh +++ b/ci/test_python.sh @@ -67,7 +67,7 @@ DASK_CUDA_WAIT_WORKERS_MIN_TIMEOUT=20 \ UCXPY_IFNAME=eth0 \ UCX_WARN_UNUSED_ENV_VARS=n \ UCX_MEMTYPE_CACHE=n \ -timeout 15m pytest \ +timeout 30m pytest \ -vv \ --durations=0 \ --capture=no \