Skip to content

Commit

Permalink
test: Create test data in-memory
Browse files Browse the repository at this point in the history
instead of reading from files
  • Loading branch information
nightscape committed Nov 1, 2024
1 parent 2472e8f commit 7af6889
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 228 deletions.
200 changes: 0 additions & 200 deletions src/test/resources/v2readwritetest/partition_csv/partition.csv

This file was deleted.

3 changes: 0 additions & 3 deletions src/test/resources/v2readwritetest/simple_csv/simple.csv

This file was deleted.

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import org.scalatest.wordspec.AnyWordSpec

class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteBase with LocalFileTestingUtilities {

private def readSimpleCsv = {
spark.read
.format("csv")
.option("delimiter", ";")
.option("header", "true")
.option("path", "src/test/resources/v2readwritetest/simple_csv/simple.csv")
.load()
private def simpleDf = {
val data = Seq(
("foo", "bar", "1"),
("baz", "bang", "2")
)
spark.createDataFrame(data).toDF("col1", "col2", "col3")
}

/** Checks that the excel data files in given folder equal the provided dataframe */
Expand Down Expand Up @@ -56,9 +55,9 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
for (writeMode <- writeModes) {
s"write a dataframe to xlsx with ${writeMode.toString}" in withExistingCleanTempDir("v2") { targetDir =>
// create a df from csv then write as xlsx
val dfCsv = readSimpleCsv
val df = simpleDf

dfCsv.write
df.write
.format("excel")
.option("path", targetDir)
.option("header", value = true)
Expand All @@ -69,16 +68,16 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
assert(listOfFiles.nonEmpty, s"expected at least one excel file")

// is the result really the same?
assertWrittenExcelData(dfCsv, targetDir)
assertWrittenExcelData(df, targetDir)

}
s"write a dataframe to xlsx with ${writeMode.toString} (partitioned)" in withExistingCleanTempDir("v2") {
targetDir =>
assume(spark.sparkContext.version >= "3.0.1")
// create a df from csv then write as xlsx
val dfCsv = readSimpleCsv
val df = simpleDf

dfCsv.write
df.write
.partitionBy("col1")
.format("excel")
.option("path", targetDir)
Expand All @@ -96,7 +95,7 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
}

// is the result really the same?
assertWrittenExcelData(dfCsv, targetDir)
assertWrittenExcelData(df, targetDir)

}
}
Expand All @@ -107,9 +106,9 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
assume(spark.sparkContext.version >= "3.0.1")
}

val dfCsv = readSimpleCsv
val df = simpleDf

val dfWriter = if (isPartitioned) dfCsv.write else dfCsv.write.partitionBy("col1")
val dfWriter = if (isPartitioned) df.write else df.write.partitionBy("col1")

dfWriter
.format("excel")
Expand All @@ -124,9 +123,9 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
.mode(SaveMode.Append)
.save()

val orderedSchemaColumns = dfCsv.schema.fields.map(f => f.name).sorted
val orderedSchemaColumns = df.schema.fields.map(f => f.name).sorted
val expectedDf =
dfCsv.union(dfCsv).select(orderedSchemaColumns.head, orderedSchemaColumns.tail.toIndexedSeq: _*)
df.union(df).select(orderedSchemaColumns.head, orderedSchemaColumns.tail.toIndexedSeq: _*)

assertWrittenExcelData(expectedDf, targetDir)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
import org.scalatest.wordspec.AnyWordSpec
import org.apache.spark.sql.types.{StructType, StructField, StringType}

class ManyPartitionReadSuite extends AnyWordSpec with DataFrameSuiteBase with LocalFileTestingUtilities {

Expand All @@ -45,14 +46,36 @@ class ManyPartitionReadSuite extends AnyWordSpec with DataFrameSuiteBase with Lo

def createExpected(targetDir: String): DataFrame = {

val dfCsv = spark.read
.format("csv")
.option("delimiter", ",")
.option("header", "true")
.option("path", "src/test/resources/v2readwritetest/partition_csv/partition.csv")
.load()
// Generate data programmatically
val data = (1 to 19).flatMap { col1 =>
// Each col1 value has multiple rows (around 10-11 rows each)
val rowsPerPartition = if (col1 == 1) 8 else if (col1 == 2) 16 else 11
(0 until rowsPerPartition).map { i =>
val index = (col1 - 1) * 11 + i + 1234 // Starting from 1234 as in original data
Row(
Integer.valueOf(col1), // Make it nullable Integer
s"fubar_$index",
s"bazbang_${index + 77000}",
s"barfang_${index + 237708}",
s"lorem_ipsum_$index"
)
}
}

// Define schema explicitly to match expected nullability
val schema = StructType(Array(
StructField("col1", IntegerType, nullable = true),
StructField("col2", StringType, nullable = true),
StructField("col3", StringType, nullable = true),
StructField("col4", StringType, nullable = true),
StructField("col5", StringType, nullable = true)
))

val dfInput = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)



val dfFinal = dfCsv.union(dfCsv)
val dfFinal = dfInput.union(dfInput)

val dfWriter = dfFinal.write
.partitionBy("col1")
Expand All @@ -64,7 +87,7 @@ class ManyPartitionReadSuite extends AnyWordSpec with DataFrameSuiteBase with Lo
dfWriter.save()
dfWriter.save()

val orderedSchemaColumns = dfCsv.schema.fields.map(f => f.name).sorted
val orderedSchemaColumns = dfInput.schema.fields.map(f => f.name).sorted

dfFinal
.union(dfFinal)
Expand Down

0 comments on commit 7af6889

Please sign in to comment.