Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataframe_to_mds fails with non-nullable ArrayType #870

Open
sthudium25 opened this issue Jan 30, 2025 · 2 comments
Open

dataframe_to_mds fails with non-nullable ArrayType #870

sthudium25 opened this issue Jan 30, 2025 · 2 comments
Labels
bug Something isn't working

Comments

@sthudium25
Copy link

sthudium25 commented Jan 30, 2025

Environment

  • OS: Ubuntu 22.04.3 LTS
  • Python: 3.10.12
  • Java: Zulu 8.74.0.17-CA-linux64
  • Run in Databricks 14.3 LTS, Spark 3.5.0, Scala 2.12

To reproduce

Steps to reproduce the behavior:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType

spark = SparkSession.builder.appName("ArrayDoubleTypeDF").getOrCreate()

# 2. Create a minimal DataFrame with two double columns
data = [
    (1.0, 2.0),
    (3.0, 4.0),
    (5.0, 7.5), 
]
columns = ["grouping1", "grouping2"]
df = spark.createDataFrame(data, columns)

# 3. Build an expression that gathers numerical columns into an array of DoubleType()
gather_numerical_features = (
    "concat(array(', '.join([f'{col}' for col in columns])))"
)

# 4. Create the new array column using F.expr
df = df.withColumn("numerical_features", F.expr(gather_numerical_features))

# NOTE: For me, this adds StructField("numerical_features", ArrayType(DoubleType(), False), True) to the dataframe schema.

mds_kwargs = {'out': <local_path>, 'columns': {'id': 'int32', 'values':'ndarray:float64:4'}} 

# Convert the dataset to an MDS format. It divides the dataframe into 2 parts, one parts per worker and merge the `index.json` from 2 sub-parts into one in a parent directory.
dataframe_to_mds(df.repartition(2), merge_index=True, mds_kwargs=mds_kwargs)

I set the "columns" in mds_kwargs according to the recommendations here

Expected behavior

Successful conversion of my spark dataframe to mds.

Additional context

However, the non-nullable arrays cannot pass the check in dataframe_to_mds. Specifically, in the converter, line 112, returns None given a spark type of the form: ArrayType(<some_numeric_type>, False). This, raises the error in L115.

Is this the desired behavior?

Last note as a fix that doesn't require modification of the streaming library

If you cast the array explicitly to an array of double, you will get a compatible schema. Like so:

modify the gather SQL statement to:

gather_numerical_features = f"cast({gather_numerical_features} as array<double>

@sthudium25 sthudium25 added the bug Something isn't working label Jan 30, 2025
@ethantang-db
Copy link
Contributor

Do you mind posting the full stack trace for us to look into it also please? Thanks!

@sthudium25
Copy link
Author

Sure thing, attached here. I should also have mentioned I'm using mosaicml-streaming==0.10.0

mds_to_dataframe_array_error.txt

To me, it just looks like the SPARK_TO_MDS type mapping, which tries to match the full datatype of the spark array fails.
For instance, the mapping ArrayType(DoubleType()): 'ndarray:float64', fails to match the internal dataType if the tuple contains nullable=False. Could this be changed to check for the elementType in the case that the user's dataframe contains an array column?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants