Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable UMAP to properly handle sparse data #772

Merged
merged 15 commits into from
Nov 9, 2024
11 changes: 0 additions & 11 deletions python/benchmark/benchmark/bench_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,17 +192,6 @@ def gpu_cache_df(df: DataFrame) -> DataFrame:

cluster_centers = gpu_model.cluster_centers_

# temporary patch for DB with spark-rapids plugin
# this part is not timed so overhead is not critical, but should be reverted
# once https://github.com/NVIDIA/spark-rapids/issues/10770 is fixed
db_version = os.environ.get("DATABRICKS_RUNTIME_VERSION")
if db_version:
dim = len(cluster_centers[0])
# inject unsupported expr (slice) that is essentially a noop
df_for_scoring = df_for_scoring.select(
F.slice(feature_col, 1, dim).alias(feature_col), output_col
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing since it looks resolved: NVIDIA/spark-rapids#10770

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good find.

if num_cpus > 0:
from pyspark.ml.clustering import KMeans as SparkKMeans

Expand Down
63 changes: 43 additions & 20 deletions python/benchmark/benchmark/bench_umap.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import numpy as np
from pandas import DataFrame as PandasDataFrame
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import col, sum
from pyspark.sql.functions import array, col, sum

from benchmark.base import BenchmarkBase
from benchmark.utils import inspect_default_params_from_func, with_benchmark
Expand Down Expand Up @@ -105,7 +105,7 @@ def score(

pdf: PandasDataFrame = transformed_df.toPandas()
embedding = np.array(pdf[transformed_col].to_list())
input = np.array(pdf[data_col].to_list())
input = np.array(pdf[data_col].to_list()).astype(np.float32)
score = trustworthiness(input, embedding, n_neighbors=15)

return score
Expand Down Expand Up @@ -162,39 +162,45 @@ def gpu_cache_df(df: DataFrame) -> DataFrame:
else:
gpu_estimator = gpu_estimator.setFeaturesCols(input_cols)

output_col = "embedding"
gpu_estimator = gpu_estimator.setOutputCol(output_col)

gpu_model, fit_time = with_benchmark(
"gpu fit", lambda: gpu_estimator.fit(train_df)
)

def transform(model: UMAPModel, df: DataFrame) -> DataFrame:
transformed_df = model.transform(df)
transformed_df.count()
return transformed_df

transformed_df, transform_time = with_benchmark(
"gpu transform", lambda: transform(gpu_model, train_df)
output_col = "embedding"
transformed_df = gpu_model.setOutputCol(output_col).transform(train_df)
_, transform_time = with_benchmark(
"gpu transform", lambda: transformed_df.foreach(lambda _: None)
)

total_time = round(time.time() - func_start_time, 2)
print(f"gpu total took: {total_time} sec")
data_col = "features"

df_for_scoring = transformed_df
feature_col = first_col
if not is_single_col:
feature_col = "features_array"
df_for_scoring = transformed_df.select(
array(*input_cols).alias("features_array"), output_col
)
elif is_vector_col:
df_for_scoring = transformed_df.select(
vector_to_array(col(feature_col)).alias(feature_col), output_col
)

if num_cpus > 0:
from pyspark.ml.feature import PCA as SparkPCA

assert num_gpus <= 0

if is_array_col:
vector_df = train_df.select(
array_to_vector(train_df[first_col]).alias(first_col)
)
elif not is_vector_col:
vector_assembler = VectorAssembler(outputCol="features").setInputCols(
vector_assembler = VectorAssembler(outputCol=first_col).setInputCols(
input_cols
)
vector_df = vector_assembler.transform(train_df).drop(*input_cols)
first_col = "features"
else:
vector_df = train_df

Expand All @@ -209,11 +215,10 @@ def cpu_cache_df(df: DataFrame) -> DataFrame:
"prepare dataset", lambda: cpu_cache_df(vector_df)
)

output_col = "pca_features"

params = self.class_params
print(f"Passing {params} to SparkPCA")

output_col = "pca_features"
cpu_pca = SparkPCA(**params).setInputCol(first_col).setOutputCol(output_col)

cpu_model, fit_time = with_benchmark(
Expand All @@ -233,9 +238,27 @@ def cpu_transform(df: DataFrame) -> None:

total_time = round(time.time() - func_start_time, 2)
print(f"cpu total took: {total_time} sec")
data_col = first_col

score = self.score(transformed_df, data_col, output_col)
# spark ml does not remove the mean in the transformed features, so do that here
# needed for scoring
standard_scaler = (
StandardScaler()
.setWithStd(False)
.setWithMean(True)
.setInputCol(output_col)
.setOutputCol(output_col + "_mean_removed")
)

scaler_model = standard_scaler.fit(transformed_df)
transformed_df = scaler_model.transform(transformed_df).drop(output_col)

feature_col = first_col
output_col = output_col + "_mean_removed"
df_for_scoring = transformed_df.select(
vector_to_array(col(output_col)).alias(output_col), feature_col
)

score = self.score(df_for_scoring, feature_col, output_col)
print(f"trustworthiness score: {score}")

report_dict = {
Expand Down
Loading