Skip to content

Commit

Permalink
Add createEmptyDF to SparkSessionExt (#71)
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang authored and MrPowers committed Apr 24, 2019
1 parent 8e28aef commit e1550e2
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ object SparkSessionExt {
)
}

/**
* Creates an empty DataFrame given schema fields
*
* This is a handy fallback when you fail to read from a data source
*
* val schema = List(StructField("col1", IntegerType))
* val df = Try {
* spark.read.parquet("non-existent-path")
* }.getOrElse(spark.createEmptyDf(schema))
*/
def createEmptyDF[T](fields: List[T]): DataFrame = {
spark.createDataFrame(spark.sparkContext.emptyRDD[Row],
StructType(asSchema(fields)))
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,50 @@ object SparkSessionExtTest extends TestSuite with DataFrameComparer with SparkSe

}

'createEmptyDF - {
"creates an empty DataFrame with a list of StructFields" - {
val actualDF =
spark.createEmptyDF(
List(
StructField(
"num1",
IntegerType,
true
),
StructField(
"num2",
IntegerType,
true
)
)
)


val expectedSchema = List(
StructField(
"num1",
IntegerType,
true
),
StructField(
"num2",
IntegerType,
true
)
)

val expectedDF =
spark.createDataFrame(
spark.sparkContext.parallelize(Seq.empty[Row]),
StructType(expectedSchema)
)

assertSmallDataFrameEquality(
actualDF,
expectedDF
)
}
}
}

}

0 comments on commit e1550e2

Please sign in to comment.