Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SETL-8] Implement CSV <> Row Transform #15

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ libraryDependencies ++= Seq(
"org.apache.spark" % "spark-mllib_2.10" % versions.spark % "provided",
"org.apache.spark" % "spark-sql_2.10" % versions.spark % "provided",

"com.databricks" % "spark-csv_2.10" % versions.sparkCsv,
"org.apache.commons" % "commons-csv" % versions.apacheCommonsCsv,

"org.scalactic" %% "scalactic" % versions.scalatest,
"org.scalatest" %% "scalatest" % versions.scalatest % "test"
)
Expand Down
3 changes: 3 additions & 0 deletions project/versions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,7 @@ object versions {
val scalatest = "2.2.6"

val spark = sys.env.getOrElse("SPARK_VERSION", "1.6.1")

val sparkCsv = "1.4.0"
val apacheCommonsCsv = "1.4"
}
15 changes: 15 additions & 0 deletions src/main/scala/com/vngrs/etl/codecs/Codec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.vngrs.etl.codecs

import com.vngrs.etl.configs.Configs

/**
* Generic Converter Codec Interface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs better documentation. What is the purpose? How can I use it?

*
* @tparam A Type of the input
* @tparam B Type of the output
* @tparam CONF Type of the configurations
*/
trait Codec[A, B, CONF <: Configs] extends Serializable {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type class has several problems:

  1. Storing config in type class is not a good idea. It doesn't need it. It should only know how to operate on one type.
  2. method should have a more descriptive name. If it is a codec, there should be decode and encode methods.
  3. Type class instances should be composable. That is how complex type class instances build from simple ones. We must have an instance for Timestamps and Row's instance should use it. However, passing it via config prevents that so it is an antipattern.


def apply(a: A, conf: CONF): B
}
33 changes: 33 additions & 0 deletions src/main/scala/com/vngrs/etl/codecs/RowToCsvCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.vngrs.etl.codecs

import java.util.Date

import com.vngrs.etl.configs.ToCsvCodecConfigs
import org.apache.spark.sql.Row

/**
* [[org.apache.spark.sql.Row]] to `CSV` convertible format codec
*/
@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.AsInstanceOf"))
final case class RowToCsvCodec() extends ToCsvCodec[Row] {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be object


/**
* Converts [[org.apache.spark.sql.Row]] to `CSV` convertible format
*
* @param row [[org.apache.spark.sql.Row]]
* @param conf Configurations
* @return `CSV` convertible data
*/
override def apply(row: Row, conf: ToCsvCodecConfigs): Seq[AnyRef] = {
val seq = row.toSeq

val formatVal = seq.map(_ => (v: Any) => Option(v).map {
case d: Date => conf.dateFormatter.format(d).asInstanceOf[AnyRef]
case a: Any => a.asInstanceOf[AnyRef]
}.orNull)

seq.zipWithIndex.map {
case (fieldVal, i) => formatVal(i)(fieldVal)
}
}
}
8 changes: 8 additions & 0 deletions src/main/scala/com/vngrs/etl/codecs/ToCsvCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.vngrs.etl.codecs

import com.vngrs.etl.configs.ToCsvCodecConfigs

/**
* Generic to `CSV` convertible format codec interface
*/
trait ToCsvCodec[A] extends Codec[A, Seq[AnyRef], ToCsvCodecConfigs]
6 changes: 6 additions & 0 deletions src/main/scala/com/vngrs/etl/configs/Configs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.vngrs.etl.configs

/**
* Generic configuration interface
*/
trait Configs
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of that interface? Is it only to limit upper type of typeclass?

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.vngrs.etl.configs

/**
* `CSV` to [[org.apache.spark.sql.Row]] transformer configurations.
* See companion object for default values.
*
* @param useHeader whether to use first record as header
* @param inferSchema whether try to infer schema
* @param dateFormat date format
* @param delimiter delimiter
* @param nullValue this value will be interpreted as `null`
* @param quoteChar quote char
* @param escapeChar escape char
*/
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
final case class CsvToRowTransformerConfigs
(
useHeader: Boolean = CsvToRowTransformerConfigs.defaultUseHeader,
inferSchema: Boolean = CsvToRowTransformerConfigs.defaultInferSchema,
dateFormat: String = CsvToRowTransformerConfigs.defaultDateFormat,
delimiter: Char = CsvToRowTransformerConfigs.defaultDelimiter,
nullValue: String = CsvToRowTransformerConfigs.defaultNullValue,
quoteChar: Char = CsvToRowTransformerConfigs.defaultQuoteChar,
escapeChar: Char = CsvToRowTransformerConfigs.defaultEscapeChar
) extends Configs

/**
* [[com.vngrs.etl.configs.CsvToRowTransformerConfigs]] companion object.
*/
object CsvToRowTransformerConfigs {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style comment: I don't think newlines makes it more readable. And I don't like default prefix.


val defaultUseHeader = true

val defaultInferSchema = true

val defaultDateFormat = "yyyy-MM-dd'T'HH:mm:ssX"

val defaultDelimiter = ','

val defaultNullValue = ""

val defaultQuoteChar = '\"'

val defaultEscapeChar = '\\'
}

10 changes: 10 additions & 0 deletions src/main/scala/com/vngrs/etl/configs/ToCsvCodecConfigs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.vngrs.etl.configs

import java.text.DateFormat

/**
* To `CSV` convertible format codec configurations
*
* @param dateFormatter Date formatter
*/
final case class ToCsvCodecConfigs(dateFormatter: DateFormat) extends Configs
47 changes: 47 additions & 0 deletions src/main/scala/com/vngrs/etl/configs/ToCsvTransformerConfigs.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package com.vngrs.etl.configs

import org.apache.commons.csv.QuoteMode

/**
* To `CSV` transformer configurations.
* See companion object for default values.
*
* @param generateHeader whether to generate header
* @param dateFormat date format
* @param delimiter delimiter
* @param nullValue this value will be interpreted as `null`
* @param escapeChar escape char
* @param quoteChar quote char
* @param quoteMode quote mode
*/
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
final case class ToCsvTransformerConfigs
(
generateHeader: Boolean = ToCsvTransformerConfigs.defaultGenerateHeader,
dateFormat: String = ToCsvTransformerConfigs.defaultDateFormat,
delimiter: Char = ToCsvTransformerConfigs.defaultDelimiter,
nullValue: String = ToCsvTransformerConfigs.defaultNullValue,
escapeChar: Char = ToCsvTransformerConfigs.defaultEscapeChar,
quoteChar: Char = ToCsvTransformerConfigs.defaultQuoteChar,
quoteMode: QuoteMode = ToCsvTransformerConfigs.defaultQuoteMode
) extends Configs

/**
* [[com.vngrs.etl.configs.ToCsvTransformerConfigs]] companion object.
*/
object ToCsvTransformerConfigs {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this and CsvToRowTransformerConfigs same, we can move them together.


val defaultGenerateHeader = false

val defaultDateFormat = "yyyy-MM-dd'T'HH:mm:ssX"

val defaultDelimiter = ','

val defaultNullValue = ""

val defaultEscapeChar = '\\'

val defaultQuoteChar = '\"'

val defaultQuoteMode = QuoteMode.MINIMAL
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.vngrs.etl.transformers

import com.databricks.spark.csv.CsvParser
import com.vngrs.etl.Transform
import com.vngrs.etl.configs.CsvToRowTransformerConfigs
import com.vngrs.etl.utils.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

/**
* Transforms `CSV` formatted [[String]]s to [[org.apache.spark.sql.Row]]s.
*
* @param config `CSV` to [[org.apache.spark.sql.Row]] transformers configs
*/
final case class CsvToRowTransformer(config: CsvToRowTransformerConfigs) extends Transform[String, Row] {

/**
* Transforms `CSV` formatted [[String]]s to [[org.apache.spark.sql.Row]]s.
*
* @param input [[org.apache.spark.rdd.RDD]]
* @return Transformed [[org.apache.spark.rdd.RDD]]
*/
override def apply(input: RDD[String]): RDD[Row] = {
new CsvParser()
.withUseHeader(config.useHeader)
.withDelimiter(config.delimiter)
.withInferSchema(config.inferSchema)
.withDateFormat(config.dateFormat)
.withQuoteChar(config.quoteChar)
.withEscape(config.escapeChar)
.withNullValue(config.nullValue)
.withTreatEmptyValuesAsNulls(true)
.csvRdd(input.sqlContext, input)
.rdd
}
}

/**
* [[com.vngrs.etl.transformers.CsvToRowTransformer]]'s companion object
*/
object CsvToRowTransformer {

/**
* Creates a [[com.vngrs.etl.transformers.CsvToRowTransformer]] with default configs
*
* @return [[com.vngrs.etl.transformers.CsvToRowTransformer]]
* @see [[com.vngrs.etl.configs.CsvToRowTransformerConfigs]]
*/
def apply(): CsvToRowTransformer = new CsvToRowTransformer(CsvToRowTransformerConfigs())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style comment: This is pure function, you can drop parentheses

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.vngrs.etl.transformers

import com.vngrs.etl.codecs.RowToCsvCodec
import com.vngrs.etl.configs.ToCsvTransformerConfigs
import com.vngrs.etl.utils.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

/**
* Transforms [[org.apache.spark.sql.Row]]s to `CSV` formatted [[String]]s.
*
* @param config [[org.apache.spark.sql.Row]] to `CSV` transformers configs
*/
final case class RowToCsvTransformer
(
config: ToCsvTransformerConfigs
) extends ToCsvTransformer[Row](RowToCsvCodec(), config) {

/**
* Converts header `CSV` convertible format
*
* @param input [[org.apache.spark.rdd.RDD]]
* @return Header in `CSV` formatted string
*/
@inline
override protected def createHeader(input: RDD[Row]): Array[AnyRef] = {
input.schema()
.orThrow("A schema is required when generating header in order to convert Row to CSV!")
.fields.map(_.name)
}
}

/**
* [[com.vngrs.etl.transformers.RowToCsvTransformer]]'s companion object
*/
object RowToCsvTransformer {

/**
* Creates a [[com.vngrs.etl.transformers.RowToCsvTransformer]] with default configs
*
* @return [[com.vngrs.etl.transformers.RowToCsvTransformer]]
* @see [[com.vngrs.etl.configs.ToCsvTransformerConfigs]]
*/
def apply(): RowToCsvTransformer = RowToCsvTransformer(ToCsvTransformerConfigs())
}
86 changes: 86 additions & 0 deletions src/main/scala/com/vngrs/etl/transformers/ToCsvTransformer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.vngrs.etl.transformers

import java.text.SimpleDateFormat

import com.vngrs.etl.Transform
import com.vngrs.etl.codecs.ToCsvCodec
import com.vngrs.etl.configs.{ToCsvCodecConfigs, ToCsvTransformerConfigs}
import org.apache.commons.csv.CSVFormat
import org.apache.spark.rdd.RDD

/**
* Transforms [[A]]s to `CSV` formatted [[String]]s with help of given `toCsvCodec`.
*
* @param toCsvCodec [[A]] to `CSV` codec
* @param config To `CSV` transformer configurations
* @tparam A Type of the data ([[org.apache.spark.rdd.RDD]]).
*/
@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.AsInstanceOf", "org.wartremover.warts.Var"))
abstract class ToCsvTransformer[A]
(
toCsvCodec: ToCsvCodec[A],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style comment:

WeDontNeedSomeJavaishLongNamesAbstractFactory weDontNeedSomeJavaishLongNamesAbstractFactory = WeDontNeedSomeJavaishLongNamesAbstractFactory.getDefaultWeDontNeedSomeJavaishLongNamesAbstractFactoryInstance()

config: ToCsvTransformerConfigs
) extends Transform[A, String] with Serializable {

/**
* Transforms [[A]]s to `CSV` formatted [[String]]s with help of given `toCsvCodec`.
*
* @param input [[org.apache.spark.rdd.RDD]]
* @return Transformed [[org.apache.spark.rdd.RDD]]
*/
override def apply(input: RDD[A]): RDD[String] = {

val header =
if (config.generateHeader) {
createHeader(input)
} else {
Array[AnyRef]()
}

input.mapPartitions[String](iterator =>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we implement simpler? Looks like it only 2 things:
1- Convert elements to CSV string
2- Add headers if configured to generate headers

So it could be implemented like

val mapped = xs.map(x => csvFormat.format(toCsvCodec(c, codecConf): _*))
if (config.generateHeader) csvFormat.format(header: _*) :: mapped else mapped

new Iterator[String] {

private var firstRowAndShouldGenerateHeader = config.generateHeader

private val codecConf = ToCsvCodecConfigs(new SimpleDateFormat(config.dateFormat))

private val csvFormat = createCsvFormat()

override def hasNext: Boolean = iterator.hasNext

override def next: String = {
if (firstRowAndShouldGenerateHeader) {
firstRowAndShouldGenerateHeader = false
csvFormat.format(header: _*)
} else {
csvFormat.format(toCsvCodec(iterator.next(), codecConf): _*)
}
}
}
)
}

/**
* Creates [[org.apache.commons.csv.CSVFormat]]
*
* @return [[org.apache.commons.csv.CSVFormat]]
*/
@inline
private def createCsvFormat(): CSVFormat = {
CSVFormat.DEFAULT
.withDelimiter(config.delimiter)
.withQuote(config.quoteChar)
.withEscape(config.escapeChar)
.withNullString(config.nullValue)
.withQuoteMode(config.quoteMode)
}

/**
* Converts header `CSV` convertible format
*
* @param input [[org.apache.spark.rdd.RDD]]
* @return Header in `CSV` formatted string
*/
@inline
protected def createHeader(input: RDD[A]): Array[AnyRef]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So should users of that transformer both provide a type class instance and extend class in order to use it?

}
9 changes: 8 additions & 1 deletion src/main/scala/com/vngrs/etl/utils/rdd.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.vngrs.etl.utils
import com.vngrs.etl.exceptions.NoSchemaException
import com.vngrs.etl.utils.rdd.SchemaResult.{EmptyRdd, Found, NotFound}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.StructType

import scala.util.{Failure, Success, Try}
Expand All @@ -26,6 +26,13 @@ object rdd {
* @return
*/
def nonEmpty(): Boolean = !rdd.isEmpty()

/**
* Creates a new [[org.apache.spark.sql.SQLContext]] with [[org.apache.spark.SparkContext]] inside the `rdd`.
*
* @return newly created [[org.apache.spark.sql.SQLContext]]
*/
def sqlContext: SQLContext = new SQLContext(rdd.sparkContext)
}

/**
Expand Down
Loading