From d14d7e93e702dc28bb239043ae822eff586e7b64 Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Wed, 10 Apr 2024 10:49:43 -0500 Subject: [PATCH] Support expression-based Dask Dataframe API (#5835) Resolves current/recent CI failures related to dask-expr migration in `dask.dataframe`/`dask_cudf`. **Notes**: - Most CI failures are resolved by avoiding imports from `dask_cudf.core` for instance checks. Types should be imported from the top-level `dask_cudf` package to ensure the proper API (legacy vs dask-expr) is used. - `kneighbors_classifier` failures are caused by https://github.com/dask/dask-expr/issues/1015 - The temporary workaround is to convert `y` to a legacy collection in this particular case. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) - Dante Gama Dessavre (https://github.com/dantegd) Approvers: - Bradley Dice (https://github.com/bdice) - GALI PREM SAGAR (https://github.com/galipremsagar) - Jake Awe (https://github.com/AyodeAwe) - Peter Andreas Entschev (https://github.com/pentschev) URL: https://github.com/rapidsai/cuml/pull/5835 --- ci/test_wheel.sh | 4 ++-- python/cuml/dask/__init__.py | 6 +++++- python/cuml/dask/common/base.py | 6 +++--- python/cuml/dask/common/input_utils.py | 6 +++--- python/cuml/dask/common/part_utils.py | 6 +++--- python/cuml/dask/neighbors/kneighbors_classifier.py | 11 ++++++++--- python/cuml/dask/preprocessing/LabelEncoder.py | 8 ++++---- python/cuml/dask/preprocessing/encoders.py | 10 +++++----- 8 files changed, 33 insertions(+), 24 deletions(-) diff --git a/ci/test_wheel.sh b/ci/test_wheel.sh index 0ecbd6a430..47ad8e0b1b 100755 --- a/ci/test_wheel.sh +++ b/ci/test_wheel.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2023, NVIDIA CORPORATION. +# Copyright (c) 2023-2024, NVIDIA CORPORATION. set -euo pipefail @@ -27,7 +27,7 @@ else ./ci/run_cuml_singlegpu_pytests.sh \ --numprocesses=8 \ --dist=worksteal \ - -k 'not test_sparse_pca_inputs' \ + -k 'not test_sparse_pca_inputs and not test_fil_skl_classification' \ --junitxml="${RAPIDS_TESTS_DIR}/junit-cuml.xml" # Run test_sparse_pca_inputs separately diff --git a/python/cuml/dask/__init__.py b/python/cuml/dask/__init__.py index f2dc448552..6aaf17a3b3 100644 --- a/python/cuml/dask/__init__.py +++ b/python/cuml/dask/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # +from dask import config from cuml.dask import cluster from cuml.dask import common @@ -27,6 +28,9 @@ from cuml.dask import preprocessing from cuml.dask import solvers +# Avoid "p2p" shuffling in dask for now +config.set({"dataframe.shuffle.method": "tasks"}) + __all__ = [ "cluster", "common", diff --git a/python/cuml/dask/common/base.py b/python/cuml/dask/common/base.py index a9949310be..1f2f71542c 100644 --- a/python/cuml/dask/common/base.py +++ b/python/cuml/dask/common/base.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,7 @@ from distributed.client import Future from functools import wraps -from dask_cudf.core import Series as dcSeries +from dask_cudf import Series as dcSeries from cuml.internals.safe_imports import gpu_only_import_from from cuml.internals.base import Base from cuml.internals import BaseMetaClass @@ -37,7 +37,7 @@ dask_cudf = gpu_only_import("dask_cudf") -dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame") +dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame") class BaseEstimator(object, metaclass=BaseMetaClass): diff --git a/python/cuml/dask/common/input_utils.py b/python/cuml/dask/common/input_utils.py index 688de03219..5c4f7d0913 100644 --- a/python/cuml/dask/common/input_utils.py +++ b/python/cuml/dask/common/input_utils.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -24,7 +24,7 @@ from cuml.dask.common.dask_arr_utils import validate_dask_array from cuml.dask.common.dask_df_utils import to_dask_cudf from cuml.dask.common.utils import get_client -from dask_cudf.core import Series as dcSeries +from dask_cudf import Series as dcSeries from dask.dataframe import Series as daskSeries from dask.dataframe import DataFrame as daskDataFrame from cudf import Series @@ -43,7 +43,7 @@ DataFrame = gpu_only_import_from("cudf", "DataFrame") -dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame") +dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame") class DistributedDataHandler: diff --git a/python/cuml/dask/common/part_utils.py b/python/cuml/dask/common/part_utils.py index a6aa892a76..c92f2c351d 100644 --- a/python/cuml/dask/common/part_utils.py +++ b/python/cuml/dask/common/part_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019-2023, NVIDIA CORPORATION. +# Copyright (c) 2019-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ # from cuml.dask.common.utils import parse_host_port -from dask_cudf.core import Series as dcSeries +from dask_cudf import Series as dcSeries from cuml.internals.safe_imports import gpu_only_import_from from dask.dataframe import Series as daskSeries from dask.dataframe import DataFrame as daskDataFrame @@ -30,7 +30,7 @@ np = cpu_only_import("numpy") -dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame") +dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame") def hosts_to_parts(futures): diff --git a/python/cuml/dask/neighbors/kneighbors_classifier.py b/python/cuml/dask/neighbors/kneighbors_classifier.py index 2844823e06..18babd75c0 100644 --- a/python/cuml/dask/neighbors/kneighbors_classifier.py +++ b/python/cuml/dask/neighbors/kneighbors_classifier.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -111,9 +111,14 @@ def fit(self, X, y): if isinstance(y, DaskSeries): uniq_labels.append(y.unique()) else: - n_targets = len(y.columns) + # Dask-expr does not support numerical column names + # See: https://github.com/dask/dask-expr/issues/1015 + _y = y + if hasattr(y, "to_legacy_dataframe"): + _y = y.to_legacy_dataframe() + n_targets = len(_y.columns) for i in range(n_targets): - uniq_labels.append(y.iloc[:, i].unique()) + uniq_labels.append(_y.iloc[:, i].unique()) uniq_labels = da.compute(uniq_labels)[0] if hasattr(uniq_labels[0], "values_host"): # for cuDF Series diff --git a/python/cuml/dask/preprocessing/LabelEncoder.py b/python/cuml/dask/preprocessing/LabelEncoder.py index 07a6ac2479..fcc35c07a9 100644 --- a/python/cuml/dask/preprocessing/LabelEncoder.py +++ b/python/cuml/dask/preprocessing/LabelEncoder.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +14,6 @@ # from cuml.preprocessing import LabelEncoder as LE from cuml.common.exceptions import NotFittedError -from dask_cudf.core import Series as daskSeries from cuml.dask.common.base import BaseEstimator from cuml.dask.common.base import DelayedTransformMixin from cuml.dask.common.base import DelayedInverseTransformMixin @@ -24,7 +23,8 @@ from collections.abc import Sequence from cuml.internals.safe_imports import gpu_only_import_from -dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame") +dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame") +dcSeries = gpu_only_import_from("dask_cudf", "Series") class LabelEncoder( @@ -148,7 +148,7 @@ def fit(self, y): _classes = y.unique().compute().sort_values(ignore_index=True) el = first(y) if isinstance(y, Sequence) else y self.datatype = ( - "cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy" + "cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy" ) self._set_internal_model(LE(**self.kwargs).fit(y, _classes=_classes)) return self diff --git a/python/cuml/dask/preprocessing/encoders.py b/python/cuml/dask/preprocessing/encoders.py index 8bf2503578..e574a53c0c 100644 --- a/python/cuml/dask/preprocessing/encoders.py +++ b/python/cuml/dask/preprocessing/encoders.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION. +# Copyright (c) 2020-2024, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,11 +21,11 @@ DelayedTransformMixin, ) from cuml.internals.safe_imports import gpu_only_import_from, gpu_only_import -from dask_cudf.core import Series as daskSeries from toolz import first dask_cudf = gpu_only_import("dask_cudf") -dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame") +dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame") +dcSeries = gpu_only_import_from("dask_cudf", "Series") class DelayedFitTransformMixin: @@ -123,7 +123,7 @@ def fit(self, X): el = first(X) if isinstance(X, Sequence) else X self.datatype = ( - "cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy" + "cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy" ) self._set_internal_model(OneHotEncoderMG(**self.kwargs).fit(X)) @@ -233,7 +233,7 @@ def fit(self, X): el = first(X) if isinstance(X, Sequence) else X self.datatype = ( - "cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy" + "cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy" ) self._set_internal_model(OrdinalEncoderMG(**self.kwargs).fit(X))