Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update explicit-comms for dask-expr support #1323

Merged
merged 23 commits into from
Apr 3, 2024

Conversation

rjzamora
Copy link
Member

@rjzamora rjzamora commented Mar 20, 2024

Makes a few small changes to explicit-comms to support dask-expr.

EDIT: The changes are no longer "small".

@rjzamora rjzamora added bug Something isn't working 2 - In Progress Currently a work in progress non-breaking Non-breaking change labels Mar 20, 2024
@rjzamora rjzamora self-assigned this Mar 20, 2024
@rjzamora rjzamora requested a review from a team as a code owner March 20, 2024 19:10
@github-actions github-actions bot added the python python code needed label Mar 20, 2024
Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from the linting issues, looks good to me. I think it should be good to go as soon as CI passes. Also tagging @madsbk for awareness.

dask_cuda/utils.py Outdated Show resolved Hide resolved
@quasiben
Copy link
Member

I gave this a try and I'm seeing errors. It's been a while since I've run this query so it could be user error:

python local_cudf_merge.py -p ucx -d 0,1,2,3,4,5,6,7 -c 10_000_000

2024-03-20 12:49:49,925 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 637a8bdfdc46dc6083f254fce7023e81 initialized by task ('shuffle-transfer-637a8bdfdc46dc6083f254fce7023e81', 0) executed on worker ucx://127.0.0.1:50385
2024-03-20 12:49:50,535 - distributed.worker - WARNING - Compute Failed
Key:       ('shuffle-transfer-637a8bdfdc46dc6083f254fce7023e81', 6)
Function:  shuffle_transfer
args:      (              key  shuffle  payload  _partitions
0        60000000        1  9459992            1
1        60000001        5  5005508            5
2        60000002        3  3183193            3
3        60000003        1   536845            1
4        60000004        3  8484645            3
...           ...      ...      ...          ...
9999995  69999995        3  9745632            3
9999996  69999996        5  8371927            5
9999997  69999997        3  8066013            3
9999998  69999998        1  3762504            1
9999999  69999999        7  9411065            7

[10000000 rows x 4 columns], '637a8bdfdc46dc6083f254fce7023e81', 6, 8, '_partitions', Empty DataFrame
Columns: [key, shuffle, payload, _partitions]
Index: [], {0, 1, 2, 3, 4, 5, 6, 7}, True, True)
kwargs:    {}
Exception: "RuntimeError('P2P shuffling 637a8bdfdc46dc6083f254fce7023e81 failed during transfer phase')"

And with explicit-comms:

python local_cudf_merge.py -p ucx -d 0,1,2,3,4,5,6,7 -c 10_000_000 -b explicit-comms

2024-03-20 12:51:47,067 - distributed.worker.state_machine - WARNING - Async instruction for <Task cancelled name="execute(('shuffle-transfer-637a8bdfdc46dc6083f254fce7023e81', 5))" coro=<Worker.execute() done, defined at /datasets/bzaitlen/miniconda3/envs/dask-expr-rapid
s-24.06/lib/python3.10/site-packages/distributed/worker_state_machine.py:3615>> ended with CancelledError
Traceback (most recent call last):
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_core.py", line 494, in handle_transfer_errors
    yield
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 79, in shuffle_transfer
    return get_worker_plugin().add_partition(
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_worker_plugin.py", line 346, in add_partition
    return shuffle_run.add_partition(
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_core.py", line 343, in add_partition
    shards = self._shard_partition(data, partition_id)
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 521, in _shard_partition
    out = split_by_worker(
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/distributed/shuffle/_shuffle.py", line 339, in split_by_worker
    t = to_pyarrow_table_dispatch(df, preserve_index=True)
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/dask/utils.py", line 772, in __call__
    meth = self.dispatch(type(arg))
  File "/datasets/bzaitlen/miniconda3/envs/dask-expr-rapids-24.06/lib/python3.10/site-packages/dask/utils.py", line 766, in dispatch
    raise TypeError(f"No dispatch for {cls}")
TypeError: No dispatch for <class 'cudf.core.dataframe.DataFrame'>

@rjzamora rjzamora marked this pull request as draft March 20, 2024 20:12
@rjzamora
Copy link
Member Author

I gave this a try and I'm seeing errors.

Hmm - I actually am expecting errors in this PR. (I just realized that I never addressed the fact that we literally patch/wrap a bunch of deprecated shuffling code to automatically dispatch to explicit-comms). That said, I wasn't expecting a local-cudf-merge problem, so I'll try to reproduce.

@rjzamora
Copy link
Member Author

It looks like explicit-comms is going to be a pretty-big headache to migrate over to dask-expr :/

I have the basic benchmarks working locally, but there is no longer a clean way to support the "explicit-comms" config that we used to support by monkey-patching dask.dataframe. Dask-expr no longer uses a monolithic function (like rearrange_by_coumn) to inject a shuffle operation for sorting and joining. Instead, these operations are now a composition of abstract expressions. It may be necessary to deprecate/prohibit this behavior when query-planning is enabled (for now).

@pentschev
Copy link
Member

It looks like explicit-comms is going to be a pretty-big headache to migrate over to dask-expr :/

I have the basic benchmarks working locally, but there is no longer a clean way to support the "explicit-comms" config that we used to support by monkey-patching dask.dataframe. Dask-expr no longer uses a monolithic function (like rearrange_by_coumn) to inject a shuffle operation for sorting and joining. Instead, these operations are now a composition of abstract expressions. It may be necessary to deprecate/prohibit this behavior when query-planning is enabled (for now).

Thanks for debugging this Rick. I must admit I'm not of any help when it comes to either Dask Dataframe or explicit-comms, on the other hand @madsbk may have great ideas here.

@madsbk
Copy link
Member

madsbk commented Mar 21, 2024

Hmm, maybe it is time to deprecate the monkey patching of dask and tell users to call shuffle() directly? And maybe create a basic merge() function that uses shuffle() people can use?

Do you guys know many are using explicit-comms?

@rjzamora
Copy link
Member Author

Hmm, maybe it is time to deprecate the monkey patching of dask and tell users to call shuffle() directly? And maybe create a basic merge() function that uses shuffle() people can use?

Yes, I think this route makes the most sense for now. I have a feeling that there are clean ways to introduce an "official" explicit-comms shuffle using the new API, but I'd like to take some time to get it right.

Do you guys know many are using explicit-comms?

The only "official" user that I personally know of is nemo curator. However, (1) they are not using the monkey patching approach, and (2) they do not develop against the latest RAPIDS version. cc @VibhuJawa @ayushdg

@github-actions github-actions bot added the ci label Mar 21, 2024
@quasiben
Copy link
Member

I'm not aware of any external usage. It is useful to have for benchmarking and continue exploration of shuffling generally. With that said, I agree with @rjzamora we should take our time getting it right

# Setting a seed that triggers max amount of comm in the two-GPU case.
if gpu:
import cupy as xp

import cudf as xdf
import dask_cudf # noqa: F401
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generating a collection in this way puts us in a strange edge case where dask_cudf may never get imported on the worker, and so some type-based dispatching may fail. The only way to avoid this possibility is to import dask_cudf before we deploy the cluster, or to import dask_cudf here.

@rjzamora
Copy link
Member Author

CI failures without the hacky test-skip logic are due to a bad/old dask-expr version being pulled (dask/distributed#8574)

@rjzamora rjzamora marked this pull request as ready for review April 2, 2024 16:55
@rjzamora rjzamora requested a review from a team as a code owner April 2, 2024 16:55
@rjzamora
Copy link
Member Author

rjzamora commented Apr 2, 2024

@pentschev - The upstream nightly situation should be resolved now. You approved this PR a while back when it looked pretty different. Did you have time for another pass?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note on these changes: For 24.06, we want to explicitly test with both DASK_DATAFRAME__QUERY_PLANNING=True and DASK_DATAFRAME__QUERY_PLANNING=False.

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora , left a few minor requests but otherwise it looks good.

ci/test_python.sh Outdated Show resolved Hide resolved
Comment on lines 380 to 394
# Raise error early if "explicit-comms" is not allowed
if (
args.backend == "explicit-comms"
and dask.config.get(
"dataframe.query-planning",
None,
)
is not False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API (set the 'dataframe.query-planning' "
"config to `False` before executing).",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems you're not addressing this for local_cudf_groupby and local_cudf_merge, are explicit-comms still supported for those? If not, I'd suggest moving this into parse_benchmark_args just before the function returns, and as such you'd cover all three cases.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - sorry. I'll use the same message for local_cudf_groupby. This is local_cudf_merge, so I assume you are also asking why I am not checking for this in local_cudf_shuffle? If so, it's because that benchmark does not use the dask config (it uses dask_cuda.explicit_comms.dataframe.shuffle.shuffle directly).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right - sorry. I'll use the same message for local_cudf_groupby.

I'd still suggest simply moving this to parse_benchmarks_args, this will cover all benchmarks and avoid code duplication, which is essentially why we wrote it that way to begin with.

This is local_cudf_merge, so I assume you are also asking why I am not checking for this in local_cudf_shuffle?

Yes, sorry, that is what I meant. 😅

If so, it's because that benchmark does not use the dask config (it uses dask_cuda.explicit_comms.dataframe.shuffle.shuffle directly).

But would using it directly still work with query-planning enabled? If not then moving the exception above to parse_benchmarks_args would also cover it and avoid more boilerplate code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd still suggest simply moving this to parse_benchmarks_args, this will cover all benchmarks and avoid code duplication, which is essentially why we wrote it that way to begin with.

Ah - I see what you mean. Using dask_cuda.explicit_comms.dataframe.shuffle.shuffle directly works fine, so it would still be good to allow that option for the shuffle benchmark somehow.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, then how about still moving this to parse_benchmarks_args but adding another kwarg to it that would still allow running select benchmarks (e.g., local_cudf_shuffle) with planning-query enabled. Something like below:

def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[], *, force_explicit_comms=False):
...

    if args.multi_node and len(args.hosts.split(",")) < 2:
        raise ValueError("--multi-node requires at least 2 hosts")

    # Raise error early if "explicit-comms" is not allowed
    if (
        args.backend == "explicit-comms"
        and dask.config.get(
            "dataframe.query-planning",
            None,
        )
        is not False
        and force_explicit_comms is False
    ):
        raise NotImplementedError(
            "The 'explicit-comms' config is not yet supported when "
            "query-planning is enabled in dask. Please use the legacy "
            "dask-dataframe API (set the 'dataframe.query-planning' "
            "config to `False` before executing).",
        )

    return args

Then local_cudf_{groupby,merge} remain as they were and for local_cudf_shuffle you simply add force_explicit_comms=True to this line.

WDYT?

Copy link
Member Author

@rjzamora rjzamora Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds good. Pushed something very similar to your solution just now (just used a check_explicit_comms arg instead of force_explicit_comms).

ci/test_python.sh Outdated Show resolved Hide resolved
Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Rick, everything looks good. I'll let you hit merge if there's nothing else here, or in case someone else still wants to review this.

UCXPY_IFNAME=eth0 \
UCX_WARN_UNUSED_ENV_VARS=n \
UCX_MEMTYPE_CACHE=n \
timeout 30m pytest \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, 15 minutes was insufficient, explicit comms tests are some of the longer running ones. It also took me a few minutes to figure out what exactly happened, and to simplify that in the future I've opened #1330 to make this debugging process easier in the future.

@rjzamora rjzamora added 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 2 - In Progress Currently a work in progress labels Apr 3, 2024
@rjzamora
Copy link
Member Author

rjzamora commented Apr 3, 2024

/merge

@rapids-bot rapids-bot bot merged commit 58e4b95 into rapidsai:branch-24.06 Apr 3, 2024
27 checks passed
@rjzamora rjzamora deleted the expr-explicit-comms branch April 3, 2024 23:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge bug Something isn't working ci non-breaking Non-breaking change python python code needed
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

5 participants