You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Run in Databricks 14.3 LTS, Spark 3.5.0, Scala 2.12
To reproduce
Steps to reproduce the behavior:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, ArrayType, DoubleType
spark = SparkSession.builder.appName("ArrayDoubleTypeDF").getOrCreate()
# 2. Create a minimal DataFrame with two double columns
data = [
(1.0, 2.0),
(3.0, 4.0),
(5.0, 7.5),
]
columns = ["grouping1", "grouping2"]
df = spark.createDataFrame(data, columns)
# 3. Build an expression that gathers numerical columns into an array of DoubleType()
gather_numerical_features = (
"concat(array(', '.join([f'{col}' for col in columns])))"
)
# 4. Create the new array column using F.expr
df = df.withColumn("numerical_features", F.expr(gather_numerical_features))
# NOTE: For me, this adds StructField("numerical_features", ArrayType(DoubleType(), False), True) to the dataframe schema.
mds_kwargs = {'out': <local_path>, 'columns': {'id': 'int32', 'values':'ndarray:float64:4'}}
# Convert the dataset to an MDS format. It divides the dataframe into 2 parts, one parts per worker and merge the `index.json` from 2 sub-parts into one in a parent directory.
dataframe_to_mds(df.repartition(2), merge_index=True, mds_kwargs=mds_kwargs)
I set the "columns" in mds_kwargs according to the recommendations here
Expected behavior
Successful conversion of my spark dataframe to mds.
Additional context
However, the non-nullable arrays cannot pass the check in dataframe_to_mds. Specifically, in the converter, line 112, returns None given a spark type of the form: ArrayType(<some_numeric_type>, False). This, raises the error in L115.
Is this the desired behavior?
Last note as a fix that doesn't require modification of the streaming library
If you cast the array explicitly to an array of double, you will get a compatible schema. Like so:
modify the gather SQL statement to:
gather_numerical_features = f"cast({gather_numerical_features} as array<double>
The text was updated successfully, but these errors were encountered:
To me, it just looks like the SPARK_TO_MDS type mapping, which tries to match the full datatype of the spark array fails.
For instance, the mapping ArrayType(DoubleType()): 'ndarray:float64', fails to match the internal dataType if the tuple contains nullable=False. Could this be changed to check for the elementType in the case that the user's dataframe contains an array column?
Environment
To reproduce
Steps to reproduce the behavior:
I set the "columns" in mds_kwargs according to the recommendations here
Expected behavior
Successful conversion of my spark dataframe to mds.
Additional context
However, the non-nullable arrays cannot pass the check in
dataframe_to_mds
. Specifically, in the converter, line 112, returns None given a spark type of the form:ArrayType(<some_numeric_type>, False)
. This, raises the error in L115.Is this the desired behavior?
Last note as a fix that doesn't require modification of the
streaming
libraryIf you cast the array explicitly to an array of double, you will get a compatible schema. Like so:
modify the gather SQL statement to:
gather_numerical_features = f"cast({gather_numerical_features} as array<double>
The text was updated successfully, but these errors were encountered: