Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Create test data in-memory #901

Merged
merged 1 commit into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 0 additions & 200 deletions src/test/resources/v2readwritetest/partition_csv/partition.csv

This file was deleted.

3 changes: 0 additions & 3 deletions src/test/resources/v2readwritetest/simple_csv/simple.csv

This file was deleted.

Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import org.scalatest.wordspec.AnyWordSpec

class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteBase with LocalFileTestingUtilities {

private def readSimpleCsv = {
spark.read
.format("csv")
.option("delimiter", ";")
.option("header", "true")
.option("path", "src/test/resources/v2readwritetest/simple_csv/simple.csv")
.load()
private def simpleDf = {
val data = Seq(
("foo", "bar", "1"),
("baz", "bang", "2")
)
spark.createDataFrame(data).toDF("col1", "col2", "col3")
}

/** Checks that the excel data files in given folder equal the provided dataframe */
Expand Down Expand Up @@ -56,9 +55,9 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
for (writeMode <- writeModes) {
s"write a dataframe to xlsx with ${writeMode.toString}" in withExistingCleanTempDir("v2") { targetDir =>
// create a df from csv then write as xlsx
val dfCsv = readSimpleCsv
val df = simpleDf

dfCsv.write
df.write
.format("excel")
.option("path", targetDir)
.option("header", value = true)
Expand All @@ -69,16 +68,16 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
assert(listOfFiles.nonEmpty, s"expected at least one excel file")

// is the result really the same?
assertWrittenExcelData(dfCsv, targetDir)
assertWrittenExcelData(df, targetDir)

}
s"write a dataframe to xlsx with ${writeMode.toString} (partitioned)" in withExistingCleanTempDir("v2") {
targetDir =>
assume(spark.sparkContext.version >= "3.0.1")
// create a df from csv then write as xlsx
val dfCsv = readSimpleCsv
val df = simpleDf

dfCsv.write
df.write
.partitionBy("col1")
.format("excel")
.option("path", targetDir)
Expand All @@ -96,7 +95,7 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
}

// is the result really the same?
assertWrittenExcelData(dfCsv, targetDir)
assertWrittenExcelData(df, targetDir)

}
}
Expand All @@ -107,9 +106,9 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
assume(spark.sparkContext.version >= "3.0.1")
}

val dfCsv = readSimpleCsv
val df = simpleDf

val dfWriter = if (isPartitioned) dfCsv.write else dfCsv.write.partitionBy("col1")
val dfWriter = if (isPartitioned) df.write else df.write.partitionBy("col1")

dfWriter
.format("excel")
Expand All @@ -124,9 +123,9 @@ class DataFrameWriterApiComplianceSuite extends AnyWordSpec with DataFrameSuiteB
.mode(SaveMode.Append)
.save()

val orderedSchemaColumns = dfCsv.schema.fields.map(f => f.name).sorted
val orderedSchemaColumns = df.schema.fields.map(f => f.name).sorted
val expectedDf =
dfCsv.union(dfCsv).select(orderedSchemaColumns.head, orderedSchemaColumns.tail.toIndexedSeq: _*)
df.union(df).select(orderedSchemaColumns.head, orderedSchemaColumns.tail.toIndexedSeq: _*)

assertWrittenExcelData(expectedDf, targetDir)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.IntegerType
import org.scalatest.wordspec.AnyWordSpec
import org.apache.spark.sql.types.{StructType, StructField, StringType}

class ManyPartitionReadSuite extends AnyWordSpec with DataFrameSuiteBase with LocalFileTestingUtilities {

Expand All @@ -45,14 +46,36 @@ class ManyPartitionReadSuite extends AnyWordSpec with DataFrameSuiteBase with Lo

def createExpected(targetDir: String): DataFrame = {

val dfCsv = spark.read
.format("csv")
.option("delimiter", ",")
.option("header", "true")
.option("path", "src/test/resources/v2readwritetest/partition_csv/partition.csv")
.load()
// Generate data programmatically
val data = (1 to 19).flatMap { col1 =>
// Each col1 value has multiple rows (around 10-11 rows each)
val rowsPerPartition = if (col1 == 1) 8 else if (col1 == 2) 16 else 11
(0 until rowsPerPartition).map { i =>
val index = (col1 - 1) * 11 + i + 1234 // Starting from 1234 as in original data
Row(
Integer.valueOf(col1), // Make it nullable Integer
s"fubar_$index",
s"bazbang_${index + 77000}",
s"barfang_${index + 237708}",
s"lorem_ipsum_$index"
)
}
}

// Define schema explicitly to match expected nullability
val schema = StructType(Array(
StructField("col1", IntegerType, nullable = true),
StructField("col2", StringType, nullable = true),
StructField("col3", StringType, nullable = true),
StructField("col4", StringType, nullable = true),
StructField("col5", StringType, nullable = true)
))

val dfInput = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)



val dfFinal = dfCsv.union(dfCsv)
val dfFinal = dfInput.union(dfInput)

val dfWriter = dfFinal.write
.partitionBy("col1")
Expand All @@ -64,7 +87,7 @@ class ManyPartitionReadSuite extends AnyWordSpec with DataFrameSuiteBase with Lo
dfWriter.save()
dfWriter.save()

val orderedSchemaColumns = dfCsv.schema.fields.map(f => f.name).sorted
val orderedSchemaColumns = dfInput.schema.fields.map(f => f.name).sorted

dfFinal
.union(dfFinal)
Expand Down
Loading