Skip to content

Commit

Permalink
Validated data frames with not null columns against null schema columns.
Browse files Browse the repository at this point in the history
Previously, a dataframe would be considered invalid if it had not-null data,
 but the schema allowed the data to be nullable
  • Loading branch information
Stephen Kestle committed Jun 10, 2021
1 parent 791606f commit e857258
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,17 @@ private[sql] class DataFrameSchemaChecker(df: DataFrame, requiredSchema: StructT
case Success(namedField) =>
val basicMatch =
namedField.name == reqField.name &&
namedField.nullable == reqField.nullable &&
(!namedField.nullable || reqField.nullable) &&
namedField.metadata == reqField.metadata

val contentMatch = reqField.dataType match {
case reqSchema: StructType =>
namedField.dataType match {
case fieldSchema: StructType =>
diff(reqSchema, fieldSchema).isEmpty
case _ => false
}
case _ => reqField == namedField
case namedField.dataType => true
case _ => false
}

basicMatch && contentMatch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ object DataFrameSchemaCheckerTest extends TestSuite with SparkSessionTestWrapper

val expected =
"The [StructField(name,StringType,true)] StructFields are not included in the DataFrame with the following StructFields [StructType(StructField(num1,IntegerType,true), StructField(num2,IntegerType,true))]"

println(c.missingStructFieldsMessage())
assert(c.missingStructFieldsMessage() == expected)

}
Expand Down Expand Up @@ -363,6 +363,40 @@ object DataFrameSchemaCheckerTest extends TestSuite with SparkSessionTestWrapper

}

"validates non-null column against a null schema" - {
val sourceSchema = List(
StructField(
"num",
IntegerType,
false
)
)

val sourceDF =
spark.createDataFrame(
spark.sparkContext.parallelize(Seq[Row]()),
StructType(sourceSchema)
)

val requiredSchema =
StructType(
List(
StructField(
"num",
IntegerType,
true
)
)
)

val c = new DataFrameSchemaChecker(
sourceDF,
requiredSchema
)

c.validateSchema()
}

}

}
Expand Down

0 comments on commit e857258

Please sign in to comment.