Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable UMAP to properly handle sparse data #772

Merged
merged 15 commits into from
Nov 9, 2024
119 changes: 102 additions & 17 deletions python/src/spark_rapids_ml/umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import numpy as np
import pandas as pd
import pyspark
import scipy
from pandas import DataFrame as PandasDataFrame
from pyspark.ml.param.shared import (
HasFeaturesCol,
Expand Down Expand Up @@ -66,12 +67,21 @@
_CumlModelReader,
_CumlModelWriter,
_EvaluateFunc,
_read_csr_matrix_from_unwrapped_spark_vec,
_TransformFunc,
_use_sparse_in_cuml,
alias,
param_alias,
)
from .metrics import EvalMetricInfo
from .params import DictTypeConverters, HasFeaturesCols, P, _CumlClass, _CumlParams
from .params import (
DictTypeConverters,
HasEnableSparseDataOptim,
HasFeaturesCols,
P,
_CumlClass,
_CumlParams,
)
from .utils import (
_ArrayOrder,
_concat_and_free,
Expand Down Expand Up @@ -120,7 +130,12 @@ def _pyspark_class(self) -> Optional[ABCMeta]:


class _UMAPCumlParams(
_CumlParams, HasFeaturesCol, HasFeaturesCols, HasLabelCol, HasOutputCol
_CumlParams,
HasFeaturesCol,
HasFeaturesCols,
HasLabelCol,
HasOutputCol,
HasEnableSparseDataOptim,
):
def __init__(self) -> None:
super().__init__()
Expand Down Expand Up @@ -894,6 +909,9 @@ def __init__(
labelCol: Optional[str] = None,
outputCol: Optional[str] = None,
num_workers: Optional[int] = None,
enable_sparse_data_optim: Optional[
bool
] = None, # will enable SparseVector inputs if first row is sparse (for any metric).
**kwargs: Any,
) -> None:
super().__init__()
Expand Down Expand Up @@ -983,7 +1001,8 @@ def _chunk_arr(
model = UMAPModel(
embedding_=broadcast_embeddings,
raw_data_=broadcast_raw_data,
n_cols=len(raw_data[0]),
sparse_fit=self._sparse_fit,
n_cols=self._n_cols,
dtype=type(raw_data[0][0]).__name__,
)

Expand Down Expand Up @@ -1065,7 +1084,8 @@ def _call_cuml_fit_func_dataframe(

cls = self.__class__

select_cols, multi_col_names, _, _ = self._pre_process_data(dataset)
select_cols, multi_col_names, dimension, _ = self._pre_process_data(dataset)
self._n_cols = dimension # for sparse data, this info may be lost after converting to COO format; store beforehand.

dataset = dataset.select(*select_cols)

Expand All @@ -1091,6 +1111,9 @@ def _call_cuml_fit_func_dataframe(

cuml_verbose = self.cuml_params.get("verbose", False)

use_sparse_array = _use_sparse_in_cuml(dataset)
self._sparse_fit = use_sparse_array # param stored internally by cuml model

chunk_size = self.max_records_per_batch

def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
Expand All @@ -1100,6 +1123,7 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
logger.info("Initializing cuml context")

import cupy as cp
import cupyx

if cuda_managed_mem_enabled:
import rmm
Expand All @@ -1118,18 +1142,31 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
# handle the input
# inputs = [(X, Optional(y)), (X, Optional(y))]
logger.info("Loading data into python worker memory")
inputs = []
sizes = []
inputs: List[Any] = []
sizes: List[int] = []
for pdf in pdf_iter:
sizes.append(pdf.shape[0])
if multi_col_names:
features = np.array(pdf[multi_col_names], order=array_order)
elif use_sparse_array:
# sparse vector input
features = _read_csr_matrix_from_unwrapped_spark_vec(pdf)
else:
features = np.array(list(pdf[alias.data]), order=array_order)
# experiments indicate it is faster to convert to numpy array and then to cupy array than directly
# invoking cupy array on the list
if cuda_managed_mem_enabled:
features = cp.array(features)
if use_sparse_array:
concated_nnz = sum(triplet[0].nnz for triplet in inputs) # type: ignore
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
if concated_nnz > np.iinfo(np.int32).max:
logger.warn(
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
"the number of non-zero values of a partition is larger than the int32 index dtype of cupyx csr_matrix"
)
else:
inputs = [
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
(cupyx.scipy.sparse.csr_matrix(row[0]), row[1], row[2])
for row in inputs
]
else:
features = cp.array(features)

label = pdf[alias.label] if alias.label in pdf.columns else None
row_number = (
Expand All @@ -1141,19 +1178,26 @@ def _train_udf(pdf_iter: Iterable[pd.DataFrame]) -> Iterable[pd.DataFrame]:
# *note*: cuml_fit_func may delete components of inputs to free
# memory. do not rely on inputs after this call.
embedding, raw_data = cuml_fit_func(inputs, params).values()

logger.info("Cuml fit complete")

num_sections = (len(embedding) + chunk_size - 1) // chunk_size

for i in range(num_sections):
start = i * chunk_size
end = min((i + 1) * chunk_size, len(embedding))
if use_sparse_array:
# return sparse array as a list in COO format
coo = raw_data[start:end].tocoo()
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
raw_data = list(zip(coo.row, coo.col, coo.data))
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
else:
raw_data = raw_data[start:end].tolist()

yield pd.DataFrame(
data=[
{
"embedding_": embedding[start:end].tolist(),
"raw_data_": raw_data[start:end].tolist(),
"raw_data_": raw_data,
}
]
)
Expand Down Expand Up @@ -1206,17 +1250,22 @@ def __init__(
self,
embedding_: List[pyspark.broadcast.Broadcast],
raw_data_: List[pyspark.broadcast.Broadcast],
sparse_fit: bool,
n_cols: int,
dtype: str,
) -> None:
super(UMAPModel, self).__init__(
embedding_=embedding_,
raw_data_=raw_data_,
sparse_fit=sparse_fit,
n_cols=n_cols,
dtype=dtype,
)
self.embedding_ = embedding_
self.raw_data_ = raw_data_
self.sparse_fit = (
sparse_fit # If true, raw_data_ is a sparse array in COO format
)

@property
def embedding(self) -> List[List[float]]:
Expand Down Expand Up @@ -1259,11 +1308,31 @@ def _construct_umap() -> CumlT:
if len(driver_embedding) == 1
else np.concatenate([chunk.value for chunk in driver_embedding])
)
raw_data = (
driver_raw_data[0].value
if len(driver_raw_data) == 1
else np.concatenate([chunk.value for chunk in driver_raw_data])
)

if self.sparse_fit:
if len(driver_raw_data) == 1:
coo_list = driver_raw_data[0].value.tolist()
else:
coo_list = np.concatenate(
[chunk.value for chunk in driver_raw_data]
).tolist()
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
# Convert from COO format to CSR Matrix
rows, cols, values = zip(*coo_list)
rows_indices = np.array(rows, dtype=int)
cols_indices = np.array(cols, dtype=int)

from scipy.sparse import coo_matrix

csr_shape = (np.max(rows_indices) + 1, self.n_cols)
raw_data = coo_matrix(
(values, (rows_indices, cols_indices)), shape=csr_shape
).tocsr()
else:
raw_data = (
driver_raw_data[0].value
if len(driver_raw_data) == 1
else np.concatenate([chunk.value for chunk in driver_raw_data])
)

del driver_embedding
del driver_raw_data
Expand All @@ -1273,7 +1342,9 @@ def _construct_umap() -> CumlT:
raw_data = raw_data.astype(np.float32)

if is_sparse(raw_data):
raw_data_cuml = SparseCumlArray(raw_data, convert_format=False)
raw_data_cuml = SparseCumlArray(
raw_data,
)
else:
raw_data_cuml = cudf_to_cuml_array(
raw_data,
Expand All @@ -1283,15 +1354,29 @@ def _construct_umap() -> CumlT:
internal_model = CumlUMAP(**cuml_alg_params)
internal_model.embedding_ = cp.array(embedding).data
internal_model._raw_data = raw_data_cuml
internal_model.sparse_fit = self.sparse_fit

return internal_model

def _transform_internal(
umap: CumlT,
df: Union[pd.DataFrame, np.ndarray],
df: Union[pd.DataFrame, np.ndarray, scipy.sparse._csr.csr_matrix],
) -> pd.Series:
from cuml.common.sparse_utils import is_sparse

embedding = umap.transform(df)

if is_sparse(df):
# scipy csr matrix to dense numpy array
df = df.toarray() # type: ignore
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
"""
TBD: we may want to convert this back to pyspark SparseVector so it matches the user input, e.g.,
vector_udt_rows = [
SparseVector(df.shape[1], row.indices, row.data) for row in df
]
...and edit schema accordingly.
"""

is_df_np = isinstance(df, np.ndarray)
is_emb_np = isinstance(embedding, np.ndarray)

Expand Down
84 changes: 81 additions & 3 deletions python/tests/test_umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ def test_umap_model_persistence(gpu_number: int, tmp_path: str) -> None:
X, _ = make_blobs(
100,
20,
centers=42,
centers=5,
cluster_std=0.1,
dtype=np.float32,
random_state=10,
Expand Down Expand Up @@ -343,7 +343,7 @@ def test_umap_broadcast_chunks(gpu_number: int, BROADCAST_LIMIT: int) -> None:
X, _ = make_blobs(
5000,
3000,
centers=42,
centers=5,
cluster_std=0.1,
dtype=np.float32,
random_state=10,
Expand Down Expand Up @@ -393,7 +393,7 @@ def test_umap_sample_fraction(gpu_number: int) -> None:
X, _ = make_blobs(
n_rows,
10,
centers=42,
centers=5,
cluster_std=0.1,
dtype=np.float32,
random_state=10,
Expand Down Expand Up @@ -493,3 +493,81 @@ def assert_umap_model(model: UMAPModel) -> None:
trust_diff = loc_umap - dist_umap

assert trust_diff <= 0.15


@pytest.mark.parametrize("n_rows", [1000])
@pytest.mark.parametrize("n_cols", [8, 64])
@pytest.mark.parametrize("nnz", [3, 5])
def test_umap_sparse_vector(
n_rows: int, n_cols: int, nnz: int, gpu_number: int
) -> None:
import pyspark
rishic3 marked this conversation as resolved.
Show resolved Hide resolved
from packaging import version
from pyspark.ml.linalg import SparseVector
rishic3 marked this conversation as resolved.
Show resolved Hide resolved

if version.parse(pyspark.__version__) < version.parse("3.4.0"):
import logging

err_msg = "pyspark < 3.4 is detected. Cannot import pyspark `unwrap_udt` function for SparseVector. "
"The test case will be skipped. Please install pyspark>=3.4."
logging.info(err_msg)
return

with CleanSparkSession() as spark:
data = []
for i in range(n_rows):
# Generate binary sparse data compatible with Jaccard, with nnz non-zero values per row.
indices = [(i + j) % n_cols for j in range(nnz)]
values = [1] * nnz
sparse_vector = SparseVector(n_cols, dict(zip(indices, values)))
data.append((sparse_vector,))
df = spark.createDataFrame(data, ["features"])
rishic3 marked this conversation as resolved.
Show resolved Hide resolved

umap_estimator = UMAP(metric="jaccard", num_workers=gpu_number).setFeaturesCol(
"features"
)
umap_model = umap_estimator.fit(df)

# Get internally stored raw data as CSR array:
raw_data_coo = umap_model.raw_data
assert len(raw_data_coo) == n_rows * nnz
internal_rows, internal_cols, internal_values = zip(*raw_data_coo)
rows_indices = np.array(internal_rows, dtype=int)
cols_indices = np.array(internal_cols, dtype=int)
from scipy.sparse import coo_matrix

internal_raw_data = coo_matrix(
(internal_values, (rows_indices, cols_indices)), shape=(n_rows, n_cols)
).tocsr()
# Get Spark input data as CSR array:
sparse_vectors = (
df.select("features").rdd.map(lambda row: row.features).collect()
)
rows = []
cols: List[Any] = []
values = []
for row_idx, sparse_vector in enumerate(sparse_vectors):
rows.extend([row_idx] * len(sparse_vector.indices))
cols.extend(sparse_vector.indices)
values.extend(sparse_vector.values)
input_raw_data = coo_matrix(
(values, (rows, cols)), shape=(n_rows, n_cols)
).tocsr()
# Ensure CSR arrays match
assert (internal_raw_data != input_raw_data).nnz == 0

# Local vs dist trustworthiness check
output = umap_model.transform(df).toPandas()
embedding = cp.asarray(output["embedding"].to_list())
dist_umap = trustworthiness(input_raw_data.toarray(), embedding, n_neighbors=15)

from cuml.manifold import UMAP as cumlUMAP

local_model = cumlUMAP(n_neighbors=15, random_state=42, metric="jaccard")
local_model.fit(input_raw_data)
embedding = local_model.transform(input_raw_data)
loc_umap = trustworthiness(input_raw_data.toarray(), embedding, n_neighbors=15)

trust_diff = loc_umap - dist_umap

assert trust_diff <= 0.15
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the range of trustworthiness score? Would the tolerance 0.15 be too large, given the ci actually runs single-GPU umap.