-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update explicit-comms for dask-expr support #1323
Update explicit-comms for dask-expr support #1323
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apart from the linting issues, looks good to me. I think it should be good to go as soon as CI passes. Also tagging @madsbk for awareness.
I gave this a try and I'm seeing errors. It's been a while since I've run this query so it could be user error: python local_cudf_merge.py -p ucx -d 0,1,2,3,4,5,6,7 -c 10_000_000
And with
|
Hmm - I actually am expecting errors in this PR. (I just realized that I never addressed the fact that we literally patch/wrap a bunch of deprecated shuffling code to automatically dispatch to explicit-comms). That said, I wasn't expecting a local-cudf-merge problem, so I'll try to reproduce. |
It looks like explicit-comms is going to be a pretty-big headache to migrate over to dask-expr :/ I have the basic benchmarks working locally, but there is no longer a clean way to support the |
Thanks for debugging this Rick. I must admit I'm not of any help when it comes to either Dask Dataframe or explicit-comms, on the other hand @madsbk may have great ideas here. |
Yes, I think this route makes the most sense for now. I have a feeling that there are clean ways to introduce an "official" explicit-comms shuffle using the new API, but I'd like to take some time to get it right.
The only "official" user that I personally know of is nemo curator. However, (1) they are not using the monkey patching approach, and (2) they do not develop against the latest RAPIDS version. cc @VibhuJawa @ayushdg |
I'm not aware of any external usage. It is useful to have for benchmarking and continue exploration of shuffling generally. With that said, I agree with @rjzamora we should take our time getting it right |
# Setting a seed that triggers max amount of comm in the two-GPU case. | ||
if gpu: | ||
import cupy as xp | ||
|
||
import cudf as xdf | ||
import dask_cudf # noqa: F401 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generating a collection in this way puts us in a strange edge case where dask_cudf
may never get imported on the worker, and so some type-based dispatching may fail. The only way to avoid this possibility is to import dask_cudf
before we deploy the cluster, or to import dask_cudf
here.
CI failures without the hacky test-skip logic are due to a bad/old dask-expr version being pulled (dask/distributed#8574) |
@pentschev - The upstream nightly situation should be resolved now. You approved this PR a while back when it looked pretty different. Did you have time for another pass? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note on these changes: For 24.06, we want to explicitly test with both DASK_DATAFRAME__QUERY_PLANNING=True
and DASK_DATAFRAME__QUERY_PLANNING=False
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rjzamora , left a few minor requests but otherwise it looks good.
# Raise error early if "explicit-comms" is not allowed | ||
if ( | ||
args.backend == "explicit-comms" | ||
and dask.config.get( | ||
"dataframe.query-planning", | ||
None, | ||
) | ||
is not False | ||
): | ||
raise NotImplementedError( | ||
"The 'explicit-comms' config is not yet supported when " | ||
"query-planning is enabled in dask. Please use the legacy " | ||
"dask-dataframe API (set the 'dataframe.query-planning' " | ||
"config to `False` before executing).", | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems you're not addressing this for local_cudf_groupby
and local_cudf_merge
, are explicit-comms still supported for those? If not, I'd suggest moving this into parse_benchmark_args
just before the function returns, and as such you'd cover all three cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right - sorry. I'll use the same message for local_cudf_groupby
. This is local_cudf_merge
, so I assume you are also asking why I am not checking for this in local_cudf_shuffle
? If so, it's because that benchmark does not use the dask config (it uses dask_cuda.explicit_comms.dataframe.shuffle.shuffle
directly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right - sorry. I'll use the same message for
local_cudf_groupby
.
I'd still suggest simply moving this to parse_benchmarks_args
, this will cover all benchmarks and avoid code duplication, which is essentially why we wrote it that way to begin with.
This is
local_cudf_merge
, so I assume you are also asking why I am not checking for this inlocal_cudf_shuffle
?
Yes, sorry, that is what I meant. 😅
If so, it's because that benchmark does not use the dask config (it uses
dask_cuda.explicit_comms.dataframe.shuffle.shuffle
directly).
But would using it directly still work with query-planning enabled? If not then moving the exception above to parse_benchmarks_args
would also cover it and avoid more boilerplate code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd still suggest simply moving this to parse_benchmarks_args, this will cover all benchmarks and avoid code duplication, which is essentially why we wrote it that way to begin with.
Ah - I see what you mean. Using dask_cuda.explicit_comms.dataframe.shuffle.shuffle
directly works fine, so it would still be good to allow that option for the shuffle benchmark somehow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, then how about still moving this to parse_benchmarks_args
but adding another kwarg to it that would still allow running select benchmarks (e.g., local_cudf_shuffle
) with planning-query enabled. Something like below:
def parse_benchmark_args(description="Generic dask-cuda Benchmark", args_list=[], *, force_explicit_comms=False):
...
if args.multi_node and len(args.hosts.split(",")) < 2:
raise ValueError("--multi-node requires at least 2 hosts")
# Raise error early if "explicit-comms" is not allowed
if (
args.backend == "explicit-comms"
and dask.config.get(
"dataframe.query-planning",
None,
)
is not False
and force_explicit_comms is False
):
raise NotImplementedError(
"The 'explicit-comms' config is not yet supported when "
"query-planning is enabled in dask. Please use the legacy "
"dask-dataframe API (set the 'dataframe.query-planning' "
"config to `False` before executing).",
)
return args
Then local_cudf_{groupby,merge}
remain as they were and for local_cudf_shuffle
you simply add force_explicit_comms=True
to this line.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, sounds good. Pushed something very similar to your solution just now (just used a check_explicit_comms
arg instead of force_explicit_comms
).
…nto expr-explicit-comms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Rick, everything looks good. I'll let you hit merge if there's nothing else here, or in case someone else still wants to review this.
UCXPY_IFNAME=eth0 \ | ||
UCX_WARN_UNUSED_ENV_VARS=n \ | ||
UCX_MEMTYPE_CACHE=n \ | ||
timeout 30m pytest \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, 15 minutes was insufficient, explicit comms tests are some of the longer running ones. It also took me a few minutes to figure out what exactly happened, and to simplify that in the future I've opened #1330 to make this debugging process easier in the future.
/merge |
Makes a few
smallchanges to explicit-comms to support dask-expr.EDIT: The changes are no longer "small".