diff --git a/build.sbt b/build.sbt index ac8205e..848fab5 100644 --- a/build.sbt +++ b/build.sbt @@ -11,6 +11,9 @@ libraryDependencies ++= Seq( "org.apache.spark" % "spark-mllib_2.10" % versions.spark % "provided", "org.apache.spark" % "spark-sql_2.10" % versions.spark % "provided", + "com.databricks" % "spark-csv_2.10" % versions.sparkCsv, + "org.apache.commons" % "commons-csv" % versions.apacheCommonsCsv, + "org.scalactic" %% "scalactic" % versions.scalatest, "org.scalatest" %% "scalatest" % versions.scalatest % "test" ) diff --git a/project/versions.scala b/project/versions.scala index 5c8e20c..aac44b3 100644 --- a/project/versions.scala +++ b/project/versions.scala @@ -3,4 +3,7 @@ object versions { val scalatest = "2.2.6" val spark = sys.env.getOrElse("SPARK_VERSION", "1.6.1") + + val sparkCsv = "1.4.0" + val apacheCommonsCsv = "1.4" } \ No newline at end of file diff --git a/src/main/scala/com/vngrs/etl/codecs/Codec.scala b/src/main/scala/com/vngrs/etl/codecs/Codec.scala new file mode 100644 index 0000000..f206671 --- /dev/null +++ b/src/main/scala/com/vngrs/etl/codecs/Codec.scala @@ -0,0 +1,15 @@ +package com.vngrs.etl.codecs + +import com.vngrs.etl.configs.Configs + +/** + * Generic Converter Codec Interface + * + * @tparam A Type of the input + * @tparam B Type of the output + * @tparam CONF Type of the configurations + */ +trait Codec[A, B, CONF <: Configs] extends Serializable { + + def apply(a: A, conf: CONF): B +} diff --git a/src/main/scala/com/vngrs/etl/codecs/RowToCsvCodec.scala b/src/main/scala/com/vngrs/etl/codecs/RowToCsvCodec.scala new file mode 100644 index 0000000..7a6c895 --- /dev/null +++ b/src/main/scala/com/vngrs/etl/codecs/RowToCsvCodec.scala @@ -0,0 +1,33 @@ +package com.vngrs.etl.codecs + +import java.util.Date + +import com.vngrs.etl.configs.ToCsvCodecConfigs +import org.apache.spark.sql.Row + +/** + * [[org.apache.spark.sql.Row]] to `CSV` convertible format codec + */ +@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.AsInstanceOf")) +final case class RowToCsvCodec() extends ToCsvCodec[Row] { + + /** + * Converts [[org.apache.spark.sql.Row]] to `CSV` convertible format + * + * @param row [[org.apache.spark.sql.Row]] + * @param conf Configurations + * @return `CSV` convertible data + */ + override def apply(row: Row, conf: ToCsvCodecConfigs): Seq[AnyRef] = { + val seq = row.toSeq + + val formatVal = seq.map(_ => (v: Any) => Option(v).map { + case d: Date => conf.dateFormatter.format(d).asInstanceOf[AnyRef] + case a: Any => a.asInstanceOf[AnyRef] + }.orNull) + + seq.zipWithIndex.map { + case (fieldVal, i) => formatVal(i)(fieldVal) + } + } +} diff --git a/src/main/scala/com/vngrs/etl/codecs/ToCsvCodec.scala b/src/main/scala/com/vngrs/etl/codecs/ToCsvCodec.scala new file mode 100644 index 0000000..14ef97b --- /dev/null +++ b/src/main/scala/com/vngrs/etl/codecs/ToCsvCodec.scala @@ -0,0 +1,8 @@ +package com.vngrs.etl.codecs + +import com.vngrs.etl.configs.ToCsvCodecConfigs + +/** + * Generic to `CSV` convertible format codec interface + */ +trait ToCsvCodec[A] extends Codec[A, Seq[AnyRef], ToCsvCodecConfigs] diff --git a/src/main/scala/com/vngrs/etl/configs/Configs.scala b/src/main/scala/com/vngrs/etl/configs/Configs.scala new file mode 100644 index 0000000..61148bc --- /dev/null +++ b/src/main/scala/com/vngrs/etl/configs/Configs.scala @@ -0,0 +1,6 @@ +package com.vngrs.etl.configs + +/** + * Generic configuration interface + */ +trait Configs diff --git a/src/main/scala/com/vngrs/etl/configs/CsvToRowTransformerConfigs.scala b/src/main/scala/com/vngrs/etl/configs/CsvToRowTransformerConfigs.scala new file mode 100644 index 0000000..068d272 --- /dev/null +++ b/src/main/scala/com/vngrs/etl/configs/CsvToRowTransformerConfigs.scala @@ -0,0 +1,46 @@ +package com.vngrs.etl.configs + +/** + * `CSV` to [[org.apache.spark.sql.Row]] transformer configurations. + * See companion object for default values. + * + * @param useHeader whether to use first record as header + * @param inferSchema whether try to infer schema + * @param dateFormat date format + * @param delimiter delimiter + * @param nullValue this value will be interpreted as `null` + * @param quoteChar quote char + * @param escapeChar escape char + */ +@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) +final case class CsvToRowTransformerConfigs +( + useHeader: Boolean = CsvToRowTransformerConfigs.defaultUseHeader, + inferSchema: Boolean = CsvToRowTransformerConfigs.defaultInferSchema, + dateFormat: String = CsvToRowTransformerConfigs.defaultDateFormat, + delimiter: Char = CsvToRowTransformerConfigs.defaultDelimiter, + nullValue: String = CsvToRowTransformerConfigs.defaultNullValue, + quoteChar: Char = CsvToRowTransformerConfigs.defaultQuoteChar, + escapeChar: Char = CsvToRowTransformerConfigs.defaultEscapeChar +) extends Configs + +/** + * [[com.vngrs.etl.configs.CsvToRowTransformerConfigs]] companion object. + */ +object CsvToRowTransformerConfigs { + + val defaultUseHeader = true + + val defaultInferSchema = true + + val defaultDateFormat = "yyyy-MM-dd'T'HH:mm:ssX" + + val defaultDelimiter = ',' + + val defaultNullValue = "" + + val defaultQuoteChar = '\"' + + val defaultEscapeChar = '\\' +} + diff --git a/src/main/scala/com/vngrs/etl/configs/ToCsvCodecConfigs.scala b/src/main/scala/com/vngrs/etl/configs/ToCsvCodecConfigs.scala new file mode 100644 index 0000000..06b173d --- /dev/null +++ b/src/main/scala/com/vngrs/etl/configs/ToCsvCodecConfigs.scala @@ -0,0 +1,10 @@ +package com.vngrs.etl.configs + +import java.text.DateFormat + +/** + * To `CSV` convertible format codec configurations + * + * @param dateFormatter Date formatter + */ +final case class ToCsvCodecConfigs(dateFormatter: DateFormat) extends Configs diff --git a/src/main/scala/com/vngrs/etl/configs/ToCsvTransformerConfigs.scala b/src/main/scala/com/vngrs/etl/configs/ToCsvTransformerConfigs.scala new file mode 100644 index 0000000..b26ab4b --- /dev/null +++ b/src/main/scala/com/vngrs/etl/configs/ToCsvTransformerConfigs.scala @@ -0,0 +1,47 @@ +package com.vngrs.etl.configs + +import org.apache.commons.csv.QuoteMode + +/** + * To `CSV` transformer configurations. + * See companion object for default values. + * + * @param generateHeader whether to generate header + * @param dateFormat date format + * @param delimiter delimiter + * @param nullValue this value will be interpreted as `null` + * @param escapeChar escape char + * @param quoteChar quote char + * @param quoteMode quote mode + */ +@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) +final case class ToCsvTransformerConfigs +( + generateHeader: Boolean = ToCsvTransformerConfigs.defaultGenerateHeader, + dateFormat: String = ToCsvTransformerConfigs.defaultDateFormat, + delimiter: Char = ToCsvTransformerConfigs.defaultDelimiter, + nullValue: String = ToCsvTransformerConfigs.defaultNullValue, + escapeChar: Char = ToCsvTransformerConfigs.defaultEscapeChar, + quoteChar: Char = ToCsvTransformerConfigs.defaultQuoteChar, + quoteMode: QuoteMode = ToCsvTransformerConfigs.defaultQuoteMode +) extends Configs + +/** + * [[com.vngrs.etl.configs.ToCsvTransformerConfigs]] companion object. + */ +object ToCsvTransformerConfigs { + + val defaultGenerateHeader = false + + val defaultDateFormat = "yyyy-MM-dd'T'HH:mm:ssX" + + val defaultDelimiter = ',' + + val defaultNullValue = "" + + val defaultEscapeChar = '\\' + + val defaultQuoteChar = '\"' + + val defaultQuoteMode = QuoteMode.MINIMAL +} diff --git a/src/main/scala/com/vngrs/etl/transformers/CsvToRowTransformer.scala b/src/main/scala/com/vngrs/etl/transformers/CsvToRowTransformer.scala new file mode 100644 index 0000000..2fe4c79 --- /dev/null +++ b/src/main/scala/com/vngrs/etl/transformers/CsvToRowTransformer.scala @@ -0,0 +1,50 @@ +package com.vngrs.etl.transformers + +import com.databricks.spark.csv.CsvParser +import com.vngrs.etl.Transform +import com.vngrs.etl.configs.CsvToRowTransformerConfigs +import com.vngrs.etl.utils.rdd._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row + +/** + * Transforms `CSV` formatted [[String]]s to [[org.apache.spark.sql.Row]]s. + * + * @param config `CSV` to [[org.apache.spark.sql.Row]] transformers configs + */ +final case class CsvToRowTransformer(config: CsvToRowTransformerConfigs) extends Transform[String, Row] { + + /** + * Transforms `CSV` formatted [[String]]s to [[org.apache.spark.sql.Row]]s. + * + * @param input [[org.apache.spark.rdd.RDD]] + * @return Transformed [[org.apache.spark.rdd.RDD]] + */ + override def apply(input: RDD[String]): RDD[Row] = { + new CsvParser() + .withUseHeader(config.useHeader) + .withDelimiter(config.delimiter) + .withInferSchema(config.inferSchema) + .withDateFormat(config.dateFormat) + .withQuoteChar(config.quoteChar) + .withEscape(config.escapeChar) + .withNullValue(config.nullValue) + .withTreatEmptyValuesAsNulls(true) + .csvRdd(input.sqlContext, input) + .rdd + } +} + +/** + * [[com.vngrs.etl.transformers.CsvToRowTransformer]]'s companion object + */ +object CsvToRowTransformer { + + /** + * Creates a [[com.vngrs.etl.transformers.CsvToRowTransformer]] with default configs + * + * @return [[com.vngrs.etl.transformers.CsvToRowTransformer]] + * @see [[com.vngrs.etl.configs.CsvToRowTransformerConfigs]] + */ + def apply(): CsvToRowTransformer = new CsvToRowTransformer(CsvToRowTransformerConfigs()) +} \ No newline at end of file diff --git a/src/main/scala/com/vngrs/etl/transformers/RowToCsvTransformer.scala b/src/main/scala/com/vngrs/etl/transformers/RowToCsvTransformer.scala new file mode 100644 index 0000000..a5f4dd9 --- /dev/null +++ b/src/main/scala/com/vngrs/etl/transformers/RowToCsvTransformer.scala @@ -0,0 +1,45 @@ +package com.vngrs.etl.transformers + +import com.vngrs.etl.codecs.RowToCsvCodec +import com.vngrs.etl.configs.ToCsvTransformerConfigs +import com.vngrs.etl.utils.rdd._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row + +/** + * Transforms [[org.apache.spark.sql.Row]]s to `CSV` formatted [[String]]s. + * + * @param config [[org.apache.spark.sql.Row]] to `CSV` transformers configs + */ +final case class RowToCsvTransformer +( + config: ToCsvTransformerConfigs +) extends ToCsvTransformer[Row](RowToCsvCodec(), config) { + + /** + * Converts header `CSV` convertible format + * + * @param input [[org.apache.spark.rdd.RDD]] + * @return Header in `CSV` formatted string + */ + @inline + override protected def createHeader(input: RDD[Row]): Array[AnyRef] = { + input.schema() + .orThrow("A schema is required when generating header in order to convert Row to CSV!") + .fields.map(_.name) + } +} + +/** + * [[com.vngrs.etl.transformers.RowToCsvTransformer]]'s companion object + */ +object RowToCsvTransformer { + + /** + * Creates a [[com.vngrs.etl.transformers.RowToCsvTransformer]] with default configs + * + * @return [[com.vngrs.etl.transformers.RowToCsvTransformer]] + * @see [[com.vngrs.etl.configs.ToCsvTransformerConfigs]] + */ + def apply(): RowToCsvTransformer = RowToCsvTransformer(ToCsvTransformerConfigs()) +} \ No newline at end of file diff --git a/src/main/scala/com/vngrs/etl/transformers/ToCsvTransformer.scala b/src/main/scala/com/vngrs/etl/transformers/ToCsvTransformer.scala new file mode 100644 index 0000000..7e54ad2 --- /dev/null +++ b/src/main/scala/com/vngrs/etl/transformers/ToCsvTransformer.scala @@ -0,0 +1,86 @@ +package com.vngrs.etl.transformers + +import java.text.SimpleDateFormat + +import com.vngrs.etl.Transform +import com.vngrs.etl.codecs.ToCsvCodec +import com.vngrs.etl.configs.{ToCsvCodecConfigs, ToCsvTransformerConfigs} +import org.apache.commons.csv.CSVFormat +import org.apache.spark.rdd.RDD + +/** + * Transforms [[A]]s to `CSV` formatted [[String]]s with help of given `toCsvCodec`. + * + * @param toCsvCodec [[A]] to `CSV` codec + * @param config To `CSV` transformer configurations + * @tparam A Type of the data ([[org.apache.spark.rdd.RDD]]). + */ +@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.AsInstanceOf", "org.wartremover.warts.Var")) +abstract class ToCsvTransformer[A] +( + toCsvCodec: ToCsvCodec[A], + config: ToCsvTransformerConfigs +) extends Transform[A, String] with Serializable { + + /** + * Transforms [[A]]s to `CSV` formatted [[String]]s with help of given `toCsvCodec`. + * + * @param input [[org.apache.spark.rdd.RDD]] + * @return Transformed [[org.apache.spark.rdd.RDD]] + */ + override def apply(input: RDD[A]): RDD[String] = { + + val header = + if (config.generateHeader) { + createHeader(input) + } else { + Array[AnyRef]() + } + + input.mapPartitions[String](iterator => + new Iterator[String] { + + private var firstRowAndShouldGenerateHeader = config.generateHeader + + private val codecConf = ToCsvCodecConfigs(new SimpleDateFormat(config.dateFormat)) + + private val csvFormat = createCsvFormat() + + override def hasNext: Boolean = iterator.hasNext + + override def next: String = { + if (firstRowAndShouldGenerateHeader) { + firstRowAndShouldGenerateHeader = false + csvFormat.format(header: _*) + } else { + csvFormat.format(toCsvCodec(iterator.next(), codecConf): _*) + } + } + } + ) + } + + /** + * Creates [[org.apache.commons.csv.CSVFormat]] + * + * @return [[org.apache.commons.csv.CSVFormat]] + */ + @inline + private def createCsvFormat(): CSVFormat = { + CSVFormat.DEFAULT + .withDelimiter(config.delimiter) + .withQuote(config.quoteChar) + .withEscape(config.escapeChar) + .withNullString(config.nullValue) + .withQuoteMode(config.quoteMode) + } + + /** + * Converts header `CSV` convertible format + * + * @param input [[org.apache.spark.rdd.RDD]] + * @return Header in `CSV` formatted string + */ + @inline + protected def createHeader(input: RDD[A]): Array[AnyRef] +} diff --git a/src/main/scala/com/vngrs/etl/utils/rdd.scala b/src/main/scala/com/vngrs/etl/utils/rdd.scala index 2e45b58..1562b56 100644 --- a/src/main/scala/com/vngrs/etl/utils/rdd.scala +++ b/src/main/scala/com/vngrs/etl/utils/rdd.scala @@ -3,7 +3,7 @@ package com.vngrs.etl.utils import com.vngrs.etl.exceptions.NoSchemaException import com.vngrs.etl.utils.rdd.SchemaResult.{EmptyRdd, Found, NotFound} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.types.StructType import scala.util.{Failure, Success, Try} @@ -26,6 +26,13 @@ object rdd { * @return */ def nonEmpty(): Boolean = !rdd.isEmpty() + + /** + * Creates a new [[org.apache.spark.sql.SQLContext]] with [[org.apache.spark.SparkContext]] inside the `rdd`. + * + * @return newly created [[org.apache.spark.sql.SQLContext]] + */ + def sqlContext: SQLContext = new SQLContext(rdd.sparkContext) } /** diff --git a/src/test/scala/com/vngrs/etl/codecs/RowToCsvCodecSpec.scala b/src/test/scala/com/vngrs/etl/codecs/RowToCsvCodecSpec.scala new file mode 100644 index 0000000..4a02036 --- /dev/null +++ b/src/test/scala/com/vngrs/etl/codecs/RowToCsvCodecSpec.scala @@ -0,0 +1,94 @@ +package com.vngrs.etl.codecs + +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.{Calendar, TimeZone} + +import com.vngrs.etl.BaseSpec +import com.vngrs.etl.configs.ToCsvCodecConfigs +import org.apache.spark.sql.catalyst.expressions.GenericRow + +/** + * [[org.apache.spark.sql.Row]] to `CSV` convertible format codec + */ +@SuppressWarnings(Array("org.wartremover.warts.Null")) +class RowToCsvCodecSpec extends BaseSpec { + + "A Row To CSV codec" should "convert Row to CSV convertible format" in { + val data = new GenericRow(Array[Any](1L, "John", "Doe", 35L)) + + val codec = RowToCsvCodec() + val conf = ToCsvCodecConfigs(new SimpleDateFormat()) + + codec(data, conf) should contain theSameElementsAs Seq[Any](1L, "John", "Doe", 35L) + } + + it should "convert Timestamp to string" in { + // to prevent this test to fail in different environments + val defaultTimeZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + + val cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + cal.clear() + cal.set(2000, Calendar.JANUARY, 1, 0, 0) + + val data = new GenericRow(Array[Any](new Timestamp(cal.getTimeInMillis))) + + val codec = RowToCsvCodec() + val conf = ToCsvCodecConfigs(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX")) + + codec(data, conf) should contain theSameElementsAs Seq[Any]("2000-01-01T00:00:00Z") + + // to prevent this test to fail other tests because of timezone setup + TimeZone.setDefault(defaultTimeZone) + } + + it should "convert util.Date to string" in { + // to prevent this test to fail in different environments + val defaultTimeZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + + val cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + cal.clear() + cal.set(2000, Calendar.JANUARY, 1, 0, 0) + + val data = new GenericRow(Array[Any](cal.getTime)) + + val codec = RowToCsvCodec() + val conf = ToCsvCodecConfigs(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX")) + + codec(data, conf) should contain theSameElementsAs Seq[Any]("2000-01-01T00:00:00Z") + + // to prevent this test to fail other tests because of timezone setup + TimeZone.setDefault(defaultTimeZone) + } + + it should "convert sql.Date to string" in { + // to prevent this test to fail in different environments + val defaultTimeZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + + val cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + cal.clear() + cal.set(2000, Calendar.JANUARY, 1, 0, 0) + + val data = new GenericRow(Array[Any](new java.sql.Date(cal.getTimeInMillis))) + + val codec = RowToCsvCodec() + val conf = ToCsvCodecConfigs(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssX")) + + codec(data, conf) should contain theSameElementsAs Seq[Any]("2000-01-01T00:00:00Z") + + // to prevent this test to fail other tests because of timezone setup + TimeZone.setDefault(defaultTimeZone) + } + + it should "handle empty rows" in { + val data = new GenericRow(Array[Any]()) + + val codec = RowToCsvCodec() + val conf = ToCsvCodecConfigs(new SimpleDateFormat("")) + + codec(data, conf) should contain theSameElementsAs Seq[Any]() + } +} diff --git a/src/test/scala/com/vngrs/etl/configs/CsvToRowTransformerConfigsSpec.scala b/src/test/scala/com/vngrs/etl/configs/CsvToRowTransformerConfigsSpec.scala new file mode 100644 index 0000000..01c82bd --- /dev/null +++ b/src/test/scala/com/vngrs/etl/configs/CsvToRowTransformerConfigsSpec.scala @@ -0,0 +1,21 @@ +package com.vngrs.etl.configs + +import com.vngrs.etl.BaseSpec + +/** + * `CSV` to [[org.apache.spark.sql.Row]] Transformer Configs Specs + */ +class CsvToRowTransformerConfigsSpec extends BaseSpec { + + "A CSV to Row transformer configs specs" should "has default values" in { + val config = CsvToRowTransformerConfigs() + + config.useHeader should equal (CsvToRowTransformerConfigs.defaultUseHeader) + config.inferSchema should equal (CsvToRowTransformerConfigs.defaultInferSchema) + config.dateFormat should equal (CsvToRowTransformerConfigs.defaultDateFormat) + config.delimiter should equal (CsvToRowTransformerConfigs.defaultDelimiter) + config.nullValue should equal (CsvToRowTransformerConfigs.defaultNullValue) + config.escapeChar should equal (CsvToRowTransformerConfigs.defaultEscapeChar) + config.quoteChar should equal (CsvToRowTransformerConfigs.defaultQuoteChar) + } +} diff --git a/src/test/scala/com/vngrs/etl/configs/ToCsvTransformerConfigsSpec.scala b/src/test/scala/com/vngrs/etl/configs/ToCsvTransformerConfigsSpec.scala new file mode 100644 index 0000000..91260b9 --- /dev/null +++ b/src/test/scala/com/vngrs/etl/configs/ToCsvTransformerConfigsSpec.scala @@ -0,0 +1,21 @@ +package com.vngrs.etl.configs + +import com.vngrs.etl.BaseSpec + +/** + * To `CSV` Transformer Configs Specs + */ +class ToCsvTransformerConfigsSpec extends BaseSpec { + + "A to CSV transformer configs specs" should "has default values" in { + val config = ToCsvTransformerConfigs() + + config.generateHeader should equal (ToCsvTransformerConfigs.defaultGenerateHeader) + config.dateFormat should equal (ToCsvTransformerConfigs.defaultDateFormat) + config.delimiter should equal (ToCsvTransformerConfigs.defaultDelimiter) + config.nullValue should equal (ToCsvTransformerConfigs.defaultNullValue) + config.escapeChar should equal (ToCsvTransformerConfigs.defaultEscapeChar) + config.quoteChar should equal (ToCsvTransformerConfigs.defaultQuoteChar) + config.quoteMode should equal (ToCsvTransformerConfigs.defaultQuoteMode) + } +} diff --git a/src/test/scala/com/vngrs/etl/transformers/CsvToRowTransformerSpec.scala b/src/test/scala/com/vngrs/etl/transformers/CsvToRowTransformerSpec.scala new file mode 100644 index 0000000..43d513b --- /dev/null +++ b/src/test/scala/com/vngrs/etl/transformers/CsvToRowTransformerSpec.scala @@ -0,0 +1,344 @@ +package com.vngrs.etl.transformers + +import java.util.{Calendar, TimeZone} + +import com.vngrs.etl.SparkSpec +import com.vngrs.etl.configs.CsvToRowTransformerConfigs +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} + +/** + * `CSV` formatted [[String]]s to [[org.apache.spark.sql.Row]]s Transformer Specs + */ +class CsvToRowTransformerSpec extends SparkSpec { + + "A CSV to Row transformer" should "read data with header" in { + val csvData = Seq( + """id,name,surname,birth_date""", + """1,John,Doe,1980-01-01T00:00:00Z""", + """2,Jane,Doe,1985-01-01T00:00:00Z""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = true)) + + val rowRdd = transformer(csvRdd) + + val rowData = rowRdd.collect() + + rowData.length should equal (2) + rowData.head.getString(1) should equal ("John") + rowData.last.getString(2) should equal ("Doe") + } + + it should "read data without header" in { + val csvData = Seq( + """1,John,Doe,1980-01-01T00:00:00Z""", + """2,Jane,Doe,1985-01-01T00:00:00Z""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = false)) + + val rowRdd = transformer(csvRdd) + + val rowData = rowRdd.collect() + + rowData.length should equal (2) + rowData.head.getString(1) should equal ("John") + rowData.last.getString(2) should equal ("Doe") + } + + it should "infer schema with header" in { + val expectedSchema = StructType(Seq( + StructField("id", DataTypes.IntegerType), + StructField("name", DataTypes.StringType), + StructField("surname", DataTypes.StringType), + StructField("birth_date", DataTypes.TimestampType) + )) + + val csvData = Seq( + """id,name,surname,birth_date""", + """1,John,Doe,1980-01-01T00:00:00Z""", + """2,Jane,Doe,1985-01-01T00:00:00Z""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer() + + val rowRdd = transformer(csvRdd) + + rowRdd.first().schema should equal (expectedSchema) + } + + it should "infer schema without header" in { + val expectedSchema = StructType(Seq( + StructField("C0", DataTypes.IntegerType), + StructField("C1", DataTypes.StringType), + StructField("C2", DataTypes.StringType), + StructField("C3", DataTypes.TimestampType) + )) + + val csvData = Seq( + """1,John,Doe,1980-01-01T00:00:00Z""", + """2,Jane,Doe,1985-01-01T00:00:00Z""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = false)) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().schema should equal (expectedSchema) + } + + it should "parse date time with timezone with default date time pattern" in { + // to prevent this test to fail in different environments + val defaultTimeZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + + val csvData = Seq( + "2000-01-01T03:00:00+03:00" + ) + + val csvRdd = parallelize(csvData) + + val cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + + cal.clear() + cal.set(2000, Calendar.JANUARY, 1, 0, 0, 0) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = false)) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().getTimestamp(0).getTime should equal (cal.getTimeInMillis) + + // to prevent this test to fail other tests because of timezone setup + TimeZone.setDefault(defaultTimeZone) + } + + it should "parse date time with custom date time pattern" in { + val csvData = Seq( + "2000/01/01 00:00Z" + ) + + val csvRdd = parallelize(csvData) + + val cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + + cal.clear() + cal.set(2000, Calendar.JANUARY, 1, 0, 0) + + val transformer = CsvToRowTransformer( + CsvToRowTransformerConfigs(useHeader = false, inferSchema = true, dateFormat = "yyyy/MM/dd HH:mmX") + ) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().getTimestamp(0).getTime should equal (cal.getTimeInMillis) + } + + it should "accepts tab as delimiter char" in { + val csvData = Seq( + """1 John Doe 1980-01-01T00:00:00Z""", + """2 Jane Doe 1985-01-01T00:00:00Z""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs( + useHeader = false, + inferSchema = false, + delimiter = '\t' + )) + + val rowRdd = transformer(csvRdd) + + val rowData = rowRdd.collect() + + rowData.length should equal (2) + rowData.head.getString(1) should equal ("John") + } + + it should "accepts other delimiter chars" in { + val csvData = Seq( + """1|John|Doe|1980-01-01T00:00:00Z""", + """2|Jane|Doe|1985-01-01T00:00:00Z""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs( + useHeader = false, + inferSchema = false, + delimiter = '|' + )) + + val rowRdd = transformer(csvRdd) + + val rowData = rowRdd.collect() + + rowData.length should equal (2) + rowData.head.getString(1) should equal ("John") + } + + it should "infer schema when custom delimiter char is given" in { + val expectedSchema = StructType(Seq( + StructField("id", DataTypes.IntegerType), + StructField("name", DataTypes.StringType), + StructField("surname", DataTypes.StringType), + StructField("birth_date", DataTypes.TimestampType) + )) + + val csvData = Seq( + """id name surname birth_date""", + """1 John Doe 1980-01-01T00:00:00Z""", + """2 Jane Doe 1985-01-01T00:00:00Z""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs( + useHeader = true, + inferSchema = true, + delimiter = '\t' + )) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().schema should equal (expectedSchema) + } + + it should "handle null values in data" in { + val csvData = Seq( + "test,,test" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = false, inferSchema = false)) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().isNullAt(1) should equal (true) + } + + it should "handle custom null values in data" in { + val csvData = Seq( + "null,test" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs( + useHeader = false, + inferSchema = false, + nullValue = "null" + )) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().isNullAt(0) should equal (true) + } + + it should "not infer empty string as null when custom null value is given" in { + val csvData = Seq( + "null," + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs( + useHeader = false, + inferSchema = false, + nullValue = "null" + )) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().isNullAt(1) should equal (false) + } + + it should "infer column even first row of it is null" in { + val csvData = Seq( + ",test", + "1,test" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = false)) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().schema.fields(0).dataType should equal (DataTypes.IntegerType) + } + + it should "handle default escape character" in { + val csvData = Seq( + """test\,o\,test""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = false, inferSchema = false)) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().schema.fields.length should equal (1) + } + + it should "handle custom escape character" in { + val csvData = Seq( + """test|,o|,test""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs( + useHeader = false, + inferSchema = false, + escapeChar = '|' + )) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().schema.fields.length should equal (1) + } + + it should "handle default quote character" in { + val csvData = Seq( + """"test_quotes"""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs(useHeader = false, inferSchema = false)) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().getString(0) should equal ("test_quotes") + } + + it should "handle custom quote character" in { + val csvData = Seq( + """|test_quotes|""" + ) + + val csvRdd = parallelize(csvData) + + val transformer = CsvToRowTransformer(CsvToRowTransformerConfigs( + useHeader = false, + inferSchema = false, + quoteChar = '|' + )) + + val rowRdd = transformer(csvRdd) + + rowRdd.first().getString(0) should equal ("test_quotes") + } +} diff --git a/src/test/scala/com/vngrs/etl/transformers/RowToCsvTransformerSpec.scala b/src/test/scala/com/vngrs/etl/transformers/RowToCsvTransformerSpec.scala new file mode 100644 index 0000000..69def39 --- /dev/null +++ b/src/test/scala/com/vngrs/etl/transformers/RowToCsvTransformerSpec.scala @@ -0,0 +1,281 @@ +package com.vngrs.etl.transformers + +import java.text.SimpleDateFormat +import java.util.{Calendar, TimeZone} + +import com.vngrs.etl.SparkSpec +import com.vngrs.etl.codecs.RowToCsvCodec +import com.vngrs.etl.configs.{ToCsvCodecConfigs, ToCsvTransformerConfigs} +import com.vngrs.etl.exceptions.NoSchemaException +import org.apache.commons.csv.QuoteMode +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema} +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} + +/** + * [[org.apache.spark.sql.Row]]s to `CSV` formatted [[String]]s Transformer Specs + */ +// Following wart does not work with scalatest's intercept functionality +@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements", "org.wartremover.warts.Null")) +class RowToCsvTransformerSpec extends SparkSpec { + + "A Row to CSV transformer" should "convert rows to csv formatted strings" in { + val data = Seq[Row]( + new GenericRow(Array[Any](1L, "John", "Doe", 35L)), + new GenericRow(Array[Any](2L, "Jane", "Doe", 30L)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer() + + transformer(dataRdd).collect() should contain theSameElementsAs Array("1,John,Doe,35", "2,Jane,Doe,30") + + } + + it should "handle empty rows" in { + val data = Seq[Row]( + new GenericRow(Array[Any]()), + new GenericRow(Array[Any]()) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer() + + transformer(dataRdd).collect() should contain theSameElementsAs Array("", "") + } + + it should "generate header if requested" in { + val schema = StructType(Seq( + StructField("id", DataTypes.LongType), + StructField("name", DataTypes.StringType), + StructField("surname", DataTypes.StringType), + StructField("age", DataTypes.LongType) + )) + + val data = Seq[Row]( + new GenericRowWithSchema(Array[Any](1L, "John", "Doe", 35L), schema), + new GenericRowWithSchema(Array[Any](2L, "Jane", "Doe", 30L), schema) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(generateHeader = true)) + + transformer(dataRdd).collect().head should equal ("id,name,surname,age") + } + + it should "generate header with custom delimiter if requested" in { + val schema = StructType(Seq( + StructField("id", DataTypes.LongType), + StructField("name", DataTypes.StringType), + StructField("surname", DataTypes.StringType), + StructField("age", DataTypes.LongType) + )) + + val data = Seq[Row]( + new GenericRowWithSchema(Array[Any](1L, "John", "Doe", 35L), schema), + new GenericRowWithSchema(Array[Any](2L, "Jane", "Doe", 30L), schema) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(generateHeader = true, delimiter = '\t')) + + transformer(dataRdd).collect().head should equal ("id name surname age") + } + + it should "throw an exception if header generation requested and there is no schema in the row" in { + val data = Seq[Row]( + new GenericRow(Array[Any](1L, "John", "Doe", 35L)), + new GenericRow(Array[Any](2L, "Jane", "Doe", 30L)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(generateHeader = true)) + + intercept[NoSchemaException] { + transformer(dataRdd).collect() + } + } + + it should "convert date objects to string" in { + // to prevent this test to fail in different environments + val defaultTimeZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + + val cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + cal.clear() + cal.set(2000, Calendar.JANUARY, 1, 0, 0) + + val data = Seq[Row]( + new GenericRow(Array[Any](cal.getTime)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer() + + transformer(dataRdd).collect().head should equal ("2000-01-01T00:00:00Z") + + // to prevent this test to fail other tests because of timezone setup + TimeZone.setDefault(defaultTimeZone) + } + + it should "convert date objects to string with custom date format" in { + // to prevent this test to fail in different environments + val defaultTimeZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + + val cal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + cal.clear() + cal.set(2000, Calendar.JANUARY, 1, 0, 0) + + val data = Seq[Row]( + new GenericRow(Array[Any](cal.getTime)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(dateFormat = "yyyy/MM/dd HH:mm:ss")) + + transformer(dataRdd).collect().head should equal ("2000/01/01 00:00:00") + + // to prevent this test to fail other tests because of timezone setup + TimeZone.setDefault(defaultTimeZone) + } + + it should "use custom delimiter supplied with config" in { + val data = Seq[Row]( + new GenericRow(Array[Any](1L, "John", "Doe", 35L)), + new GenericRow(Array[Any](2L, "Jane", "Doe", 30L)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(delimiter = '\t')) + + transformer(dataRdd).collect() should contain theSameElementsAs Array("1 John Doe 35", "2 Jane Doe 30") + } + + it should "put default null value string for null values" in { + val data = Seq[Row]( + new GenericRow(Array[Any](null, null)), + new GenericRow(Array[Any](null, null)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer() + + transformer(dataRdd).collect() should contain theSameElementsAs Array(",", ",") + } + + it should "put custom null value string for null values" in { + val data = Seq[Row]( + new GenericRow(Array[Any](null, null)), + new GenericRow(Array[Any](null, null)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(nullValue = "null_value")) + + val expectedData = Array("null_value,null_value", "null_value,null_value") + + transformer(dataRdd).collect() should contain theSameElementsAs expectedData + } + + it should "put default escape char when comma found in string values (when quote mode is none)" in { + val data = Seq[Row]( + new GenericRow(Array[Any]("test", ",data")), + new GenericRow(Array[Any]("test,", "data")) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(quoteMode = QuoteMode.NONE)) + + transformer(dataRdd).collect() should contain theSameElementsAs Array("test,\\,data", "test\\,,data") + } + + it should "put custom escape char when comma found in string values (when quote mode is none)" in { + val data = Seq[Row]( + new GenericRow(Array[Any]("test", ",data")), + new GenericRow(Array[Any]("test,", "data")) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(escapeChar = '|', quoteMode = QuoteMode.NONE)) + + transformer(dataRdd).collect() should contain theSameElementsAs Array("test,|,data", "test|,,data") + } + + it should "put default quotes when special char is in data" in { + val data = Seq[Row]( + new GenericRow(Array[Any]("test", "!data")), + new GenericRow(Array[Any]("!test", "data")) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer() + + transformer(dataRdd).collect() should contain theSameElementsAs Array("test,\"!data\"", "\"!test\",data") + } + + it should "put custom quotes when special char is in data" in { + val data = Seq[Row]( + new GenericRow(Array[Any]("test", "!data")), + new GenericRow(Array[Any]("!test", "data")) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(quoteChar = '|')) + + transformer(dataRdd).collect() should contain theSameElementsAs Array("test,|!data|", "|!test|,data") + } + + it should "put no quotes when quote mode is none" in { + val data = Seq[Row]( + new GenericRow(Array[Any]("test", "!data")), + new GenericRow(Array[Any]("!test", "data")) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(quoteMode = QuoteMode.NONE)) + + transformer(dataRdd).collect() should contain theSameElementsAs Array("test,!data", "!test,data") + } + + it should "put always quotes when quote mode is all" in { + val data = Seq[Row]( + new GenericRow(Array[Any]("test", "data")), + new GenericRow(Array[Any]("test", "data")) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(quoteMode = QuoteMode.ALL)) + + transformer(dataRdd).collect() should contain theSameElementsAs Array("\"test\",\"data\"", "\"test\",\"data\"") + } + + it should "put quotes to non numeric when quote mode is non numeric" in { + val data = Seq[Row]( + new GenericRow(Array[Any]("test", 1)), + new GenericRow(Array[Any]("test", 1)) + ) + + val dataRdd = parallelize(data) + + val transformer = RowToCsvTransformer(ToCsvTransformerConfigs(quoteMode = QuoteMode.NON_NUMERIC)) + + transformer(dataRdd).collect() should contain theSameElementsAs Array("\"test\",1", "\"test\",1") + } +}