Skip to content

Commit

Permalink
Support expression-based Dask Dataframe API (#5835)
Browse files Browse the repository at this point in the history
Resolves current/recent CI failures related to dask-expr migration in `dask.dataframe`/`dask_cudf`.

**Notes**:

- Most CI failures are resolved by avoiding imports from `dask_cudf.core` for instance checks. Types should be imported from the top-level `dask_cudf` package to ensure the proper API (legacy vs dask-expr) is used.
- `kneighbors_classifier` failures are caused by dask/dask-expr#1015 - The temporary workaround is to convert `y` to a legacy collection in this particular case.

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Dante Gama Dessavre (https://github.com/dantegd)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Jake Awe (https://github.com/AyodeAwe)
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #5835
  • Loading branch information
rjzamora authored Apr 10, 2024
1 parent 6b6952e commit d14d7e9
Show file tree
Hide file tree
Showing 8 changed files with 33 additions and 24 deletions.
4 changes: 2 additions & 2 deletions ci/test_wheel.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/bin/bash
# Copyright (c) 2023, NVIDIA CORPORATION.
# Copyright (c) 2023-2024, NVIDIA CORPORATION.

set -euo pipefail

Expand Down Expand Up @@ -27,7 +27,7 @@ else
./ci/run_cuml_singlegpu_pytests.sh \
--numprocesses=8 \
--dist=worksteal \
-k 'not test_sparse_pca_inputs' \
-k 'not test_sparse_pca_inputs and not test_fil_skl_classification' \
--junitxml="${RAPIDS_TESTS_DIR}/junit-cuml.xml"

# Run test_sparse_pca_inputs separately
Expand Down
6 changes: 5 additions & 1 deletion python/cuml/dask/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from dask import config

from cuml.dask import cluster
from cuml.dask import common
Expand All @@ -27,6 +28,9 @@
from cuml.dask import preprocessing
from cuml.dask import solvers

# Avoid "p2p" shuffling in dask for now
config.set({"dataframe.shuffle.method": "tasks"})

__all__ = [
"cluster",
"common",
Expand Down
6 changes: 3 additions & 3 deletions python/cuml/dask/common/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,7 +15,7 @@

from distributed.client import Future
from functools import wraps
from dask_cudf.core import Series as dcSeries
from dask_cudf import Series as dcSeries
from cuml.internals.safe_imports import gpu_only_import_from
from cuml.internals.base import Base
from cuml.internals import BaseMetaClass
Expand All @@ -37,7 +37,7 @@


dask_cudf = gpu_only_import("dask_cudf")
dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")


class BaseEstimator(object, metaclass=BaseMetaClass):
Expand Down
6 changes: 3 additions & 3 deletions python/cuml/dask/common/input_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,7 @@
from cuml.dask.common.dask_arr_utils import validate_dask_array
from cuml.dask.common.dask_df_utils import to_dask_cudf
from cuml.dask.common.utils import get_client
from dask_cudf.core import Series as dcSeries
from dask_cudf import Series as dcSeries
from dask.dataframe import Series as daskSeries
from dask.dataframe import DataFrame as daskDataFrame
from cudf import Series
Expand All @@ -43,7 +43,7 @@


DataFrame = gpu_only_import_from("cudf", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")


class DistributedDataHandler:
Expand Down
6 changes: 3 additions & 3 deletions python/cuml/dask/common/part_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2019-2023, NVIDIA CORPORATION.
# Copyright (c) 2019-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@
#

from cuml.dask.common.utils import parse_host_port
from dask_cudf.core import Series as dcSeries
from dask_cudf import Series as dcSeries
from cuml.internals.safe_imports import gpu_only_import_from
from dask.dataframe import Series as daskSeries
from dask.dataframe import DataFrame as daskDataFrame
Expand All @@ -30,7 +30,7 @@
np = cpu_only_import("numpy")


dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")


def hosts_to_parts(futures):
Expand Down
11 changes: 8 additions & 3 deletions python/cuml/dask/neighbors/kneighbors_classifier.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -111,9 +111,14 @@ def fit(self, X, y):
if isinstance(y, DaskSeries):
uniq_labels.append(y.unique())
else:
n_targets = len(y.columns)
# Dask-expr does not support numerical column names
# See: https://github.com/dask/dask-expr/issues/1015
_y = y
if hasattr(y, "to_legacy_dataframe"):
_y = y.to_legacy_dataframe()
n_targets = len(_y.columns)
for i in range(n_targets):
uniq_labels.append(y.iloc[:, i].unique())
uniq_labels.append(_y.iloc[:, i].unique())

uniq_labels = da.compute(uniq_labels)[0]
if hasattr(uniq_labels[0], "values_host"): # for cuDF Series
Expand Down
8 changes: 4 additions & 4 deletions python/cuml/dask/preprocessing/LabelEncoder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,6 @@
#
from cuml.preprocessing import LabelEncoder as LE
from cuml.common.exceptions import NotFittedError
from dask_cudf.core import Series as daskSeries
from cuml.dask.common.base import BaseEstimator
from cuml.dask.common.base import DelayedTransformMixin
from cuml.dask.common.base import DelayedInverseTransformMixin
Expand All @@ -24,7 +23,8 @@
from collections.abc import Sequence
from cuml.internals.safe_imports import gpu_only_import_from

dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")
dcSeries = gpu_only_import_from("dask_cudf", "Series")


class LabelEncoder(
Expand Down Expand Up @@ -148,7 +148,7 @@ def fit(self, y):
_classes = y.unique().compute().sort_values(ignore_index=True)
el = first(y) if isinstance(y, Sequence) else y
self.datatype = (
"cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy"
"cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy"
)
self._set_internal_model(LE(**self.kwargs).fit(y, _classes=_classes))
return self
Expand Down
10 changes: 5 additions & 5 deletions python/cuml/dask/preprocessing/encoders.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,11 +21,11 @@
DelayedTransformMixin,
)
from cuml.internals.safe_imports import gpu_only_import_from, gpu_only_import
from dask_cudf.core import Series as daskSeries
from toolz import first

dask_cudf = gpu_only_import("dask_cudf")
dcDataFrame = gpu_only_import_from("dask_cudf.core", "DataFrame")
dcDataFrame = gpu_only_import_from("dask_cudf", "DataFrame")
dcSeries = gpu_only_import_from("dask_cudf", "Series")


class DelayedFitTransformMixin:
Expand Down Expand Up @@ -123,7 +123,7 @@ def fit(self, X):

el = first(X) if isinstance(X, Sequence) else X
self.datatype = (
"cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy"
"cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy"
)

self._set_internal_model(OneHotEncoderMG(**self.kwargs).fit(X))
Expand Down Expand Up @@ -233,7 +233,7 @@ def fit(self, X):

el = first(X) if isinstance(X, Sequence) else X
self.datatype = (
"cudf" if isinstance(el, (dcDataFrame, daskSeries)) else "cupy"
"cudf" if isinstance(el, (dcDataFrame, dcSeries)) else "cupy"
)

self._set_internal_model(OrdinalEncoderMG(**self.kwargs).fit(X))
Expand Down

0 comments on commit d14d7e9

Please sign in to comment.