-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SETL-8] Implement CSV <> Row Transform #15
base: develop
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package com.vngrs.etl.codecs | ||
|
||
import com.vngrs.etl.configs.Configs | ||
|
||
/** | ||
* Generic Converter Codec Interface | ||
* | ||
* @tparam A Type of the input | ||
* @tparam B Type of the output | ||
* @tparam CONF Type of the configurations | ||
*/ | ||
trait Codec[A, B, CONF <: Configs] extends Serializable { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This type class has several problems:
|
||
|
||
def apply(a: A, conf: CONF): B | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
package com.vngrs.etl.codecs | ||
|
||
import java.util.Date | ||
|
||
import com.vngrs.etl.configs.ToCsvCodecConfigs | ||
import org.apache.spark.sql.Row | ||
|
||
/** | ||
* [[org.apache.spark.sql.Row]] to `CSV` convertible format codec | ||
*/ | ||
@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.AsInstanceOf")) | ||
final case class RowToCsvCodec() extends ToCsvCodec[Row] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be object |
||
|
||
/** | ||
* Converts [[org.apache.spark.sql.Row]] to `CSV` convertible format | ||
* | ||
* @param row [[org.apache.spark.sql.Row]] | ||
* @param conf Configurations | ||
* @return `CSV` convertible data | ||
*/ | ||
override def apply(row: Row, conf: ToCsvCodecConfigs): Seq[AnyRef] = { | ||
val seq = row.toSeq | ||
|
||
val formatVal = seq.map(_ => (v: Any) => Option(v).map { | ||
case d: Date => conf.dateFormatter.format(d).asInstanceOf[AnyRef] | ||
case a: Any => a.asInstanceOf[AnyRef] | ||
}.orNull) | ||
|
||
seq.zipWithIndex.map { | ||
case (fieldVal, i) => formatVal(i)(fieldVal) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package com.vngrs.etl.codecs | ||
|
||
import com.vngrs.etl.configs.ToCsvCodecConfigs | ||
|
||
/** | ||
* Generic to `CSV` convertible format codec interface | ||
*/ | ||
trait ToCsvCodec[A] extends Codec[A, Seq[AnyRef], ToCsvCodecConfigs] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package com.vngrs.etl.configs | ||
|
||
/** | ||
* Generic configuration interface | ||
*/ | ||
trait Configs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of that interface? Is it only to limit upper type of typeclass? |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
package com.vngrs.etl.configs | ||
|
||
/** | ||
* `CSV` to [[org.apache.spark.sql.Row]] transformer configurations. | ||
* See companion object for default values. | ||
* | ||
* @param useHeader whether to use first record as header | ||
* @param inferSchema whether try to infer schema | ||
* @param dateFormat date format | ||
* @param delimiter delimiter | ||
* @param nullValue this value will be interpreted as `null` | ||
* @param quoteChar quote char | ||
* @param escapeChar escape char | ||
*/ | ||
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) | ||
final case class CsvToRowTransformerConfigs | ||
( | ||
useHeader: Boolean = CsvToRowTransformerConfigs.defaultUseHeader, | ||
inferSchema: Boolean = CsvToRowTransformerConfigs.defaultInferSchema, | ||
dateFormat: String = CsvToRowTransformerConfigs.defaultDateFormat, | ||
delimiter: Char = CsvToRowTransformerConfigs.defaultDelimiter, | ||
nullValue: String = CsvToRowTransformerConfigs.defaultNullValue, | ||
quoteChar: Char = CsvToRowTransformerConfigs.defaultQuoteChar, | ||
escapeChar: Char = CsvToRowTransformerConfigs.defaultEscapeChar | ||
) extends Configs | ||
|
||
/** | ||
* [[com.vngrs.etl.configs.CsvToRowTransformerConfigs]] companion object. | ||
*/ | ||
object CsvToRowTransformerConfigs { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style comment: I don't think newlines makes it more readable. And I don't like default prefix. |
||
|
||
val defaultUseHeader = true | ||
|
||
val defaultInferSchema = true | ||
|
||
val defaultDateFormat = "yyyy-MM-dd'T'HH:mm:ssX" | ||
|
||
val defaultDelimiter = ',' | ||
|
||
val defaultNullValue = "" | ||
|
||
val defaultQuoteChar = '\"' | ||
|
||
val defaultEscapeChar = '\\' | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
package com.vngrs.etl.configs | ||
|
||
import java.text.DateFormat | ||
|
||
/** | ||
* To `CSV` convertible format codec configurations | ||
* | ||
* @param dateFormatter Date formatter | ||
*/ | ||
final case class ToCsvCodecConfigs(dateFormatter: DateFormat) extends Configs |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
package com.vngrs.etl.configs | ||
|
||
import org.apache.commons.csv.QuoteMode | ||
|
||
/** | ||
* To `CSV` transformer configurations. | ||
* See companion object for default values. | ||
* | ||
* @param generateHeader whether to generate header | ||
* @param dateFormat date format | ||
* @param delimiter delimiter | ||
* @param nullValue this value will be interpreted as `null` | ||
* @param escapeChar escape char | ||
* @param quoteChar quote char | ||
* @param quoteMode quote mode | ||
*/ | ||
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) | ||
final case class ToCsvTransformerConfigs | ||
( | ||
generateHeader: Boolean = ToCsvTransformerConfigs.defaultGenerateHeader, | ||
dateFormat: String = ToCsvTransformerConfigs.defaultDateFormat, | ||
delimiter: Char = ToCsvTransformerConfigs.defaultDelimiter, | ||
nullValue: String = ToCsvTransformerConfigs.defaultNullValue, | ||
escapeChar: Char = ToCsvTransformerConfigs.defaultEscapeChar, | ||
quoteChar: Char = ToCsvTransformerConfigs.defaultQuoteChar, | ||
quoteMode: QuoteMode = ToCsvTransformerConfigs.defaultQuoteMode | ||
) extends Configs | ||
|
||
/** | ||
* [[com.vngrs.etl.configs.ToCsvTransformerConfigs]] companion object. | ||
*/ | ||
object ToCsvTransformerConfigs { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this and CsvToRowTransformerConfigs same, we can move them together. |
||
|
||
val defaultGenerateHeader = false | ||
|
||
val defaultDateFormat = "yyyy-MM-dd'T'HH:mm:ssX" | ||
|
||
val defaultDelimiter = ',' | ||
|
||
val defaultNullValue = "" | ||
|
||
val defaultEscapeChar = '\\' | ||
|
||
val defaultQuoteChar = '\"' | ||
|
||
val defaultQuoteMode = QuoteMode.MINIMAL | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package com.vngrs.etl.transformers | ||
|
||
import com.databricks.spark.csv.CsvParser | ||
import com.vngrs.etl.Transform | ||
import com.vngrs.etl.configs.CsvToRowTransformerConfigs | ||
import com.vngrs.etl.utils.rdd._ | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.Row | ||
|
||
/** | ||
* Transforms `CSV` formatted [[String]]s to [[org.apache.spark.sql.Row]]s. | ||
* | ||
* @param config `CSV` to [[org.apache.spark.sql.Row]] transformers configs | ||
*/ | ||
final case class CsvToRowTransformer(config: CsvToRowTransformerConfigs) extends Transform[String, Row] { | ||
|
||
/** | ||
* Transforms `CSV` formatted [[String]]s to [[org.apache.spark.sql.Row]]s. | ||
* | ||
* @param input [[org.apache.spark.rdd.RDD]] | ||
* @return Transformed [[org.apache.spark.rdd.RDD]] | ||
*/ | ||
override def apply(input: RDD[String]): RDD[Row] = { | ||
new CsvParser() | ||
.withUseHeader(config.useHeader) | ||
.withDelimiter(config.delimiter) | ||
.withInferSchema(config.inferSchema) | ||
.withDateFormat(config.dateFormat) | ||
.withQuoteChar(config.quoteChar) | ||
.withEscape(config.escapeChar) | ||
.withNullValue(config.nullValue) | ||
.withTreatEmptyValuesAsNulls(true) | ||
.csvRdd(input.sqlContext, input) | ||
.rdd | ||
} | ||
} | ||
|
||
/** | ||
* [[com.vngrs.etl.transformers.CsvToRowTransformer]]'s companion object | ||
*/ | ||
object CsvToRowTransformer { | ||
|
||
/** | ||
* Creates a [[com.vngrs.etl.transformers.CsvToRowTransformer]] with default configs | ||
* | ||
* @return [[com.vngrs.etl.transformers.CsvToRowTransformer]] | ||
* @see [[com.vngrs.etl.configs.CsvToRowTransformerConfigs]] | ||
*/ | ||
def apply(): CsvToRowTransformer = new CsvToRowTransformer(CsvToRowTransformerConfigs()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style comment: This is pure function, you can drop parentheses |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package com.vngrs.etl.transformers | ||
|
||
import com.vngrs.etl.codecs.RowToCsvCodec | ||
import com.vngrs.etl.configs.ToCsvTransformerConfigs | ||
import com.vngrs.etl.utils.rdd._ | ||
import org.apache.spark.rdd.RDD | ||
import org.apache.spark.sql.Row | ||
|
||
/** | ||
* Transforms [[org.apache.spark.sql.Row]]s to `CSV` formatted [[String]]s. | ||
* | ||
* @param config [[org.apache.spark.sql.Row]] to `CSV` transformers configs | ||
*/ | ||
final case class RowToCsvTransformer | ||
( | ||
config: ToCsvTransformerConfigs | ||
) extends ToCsvTransformer[Row](RowToCsvCodec(), config) { | ||
|
||
/** | ||
* Converts header `CSV` convertible format | ||
* | ||
* @param input [[org.apache.spark.rdd.RDD]] | ||
* @return Header in `CSV` formatted string | ||
*/ | ||
@inline | ||
override protected def createHeader(input: RDD[Row]): Array[AnyRef] = { | ||
input.schema() | ||
.orThrow("A schema is required when generating header in order to convert Row to CSV!") | ||
.fields.map(_.name) | ||
} | ||
} | ||
|
||
/** | ||
* [[com.vngrs.etl.transformers.RowToCsvTransformer]]'s companion object | ||
*/ | ||
object RowToCsvTransformer { | ||
|
||
/** | ||
* Creates a [[com.vngrs.etl.transformers.RowToCsvTransformer]] with default configs | ||
* | ||
* @return [[com.vngrs.etl.transformers.RowToCsvTransformer]] | ||
* @see [[com.vngrs.etl.configs.ToCsvTransformerConfigs]] | ||
*/ | ||
def apply(): RowToCsvTransformer = RowToCsvTransformer(ToCsvTransformerConfigs()) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package com.vngrs.etl.transformers | ||
|
||
import java.text.SimpleDateFormat | ||
|
||
import com.vngrs.etl.Transform | ||
import com.vngrs.etl.codecs.ToCsvCodec | ||
import com.vngrs.etl.configs.{ToCsvCodecConfigs, ToCsvTransformerConfigs} | ||
import org.apache.commons.csv.CSVFormat | ||
import org.apache.spark.rdd.RDD | ||
|
||
/** | ||
* Transforms [[A]]s to `CSV` formatted [[String]]s with help of given `toCsvCodec`. | ||
* | ||
* @param toCsvCodec [[A]] to `CSV` codec | ||
* @param config To `CSV` transformer configurations | ||
* @tparam A Type of the data ([[org.apache.spark.rdd.RDD]]). | ||
*/ | ||
@SuppressWarnings(Array("org.wartremover.warts.Any", "org.wartremover.warts.AsInstanceOf", "org.wartremover.warts.Var")) | ||
abstract class ToCsvTransformer[A] | ||
( | ||
toCsvCodec: ToCsvCodec[A], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style comment: WeDontNeedSomeJavaishLongNamesAbstractFactory weDontNeedSomeJavaishLongNamesAbstractFactory = WeDontNeedSomeJavaishLongNamesAbstractFactory.getDefaultWeDontNeedSomeJavaishLongNamesAbstractFactoryInstance() |
||
config: ToCsvTransformerConfigs | ||
) extends Transform[A, String] with Serializable { | ||
|
||
/** | ||
* Transforms [[A]]s to `CSV` formatted [[String]]s with help of given `toCsvCodec`. | ||
* | ||
* @param input [[org.apache.spark.rdd.RDD]] | ||
* @return Transformed [[org.apache.spark.rdd.RDD]] | ||
*/ | ||
override def apply(input: RDD[A]): RDD[String] = { | ||
|
||
val header = | ||
if (config.generateHeader) { | ||
createHeader(input) | ||
} else { | ||
Array[AnyRef]() | ||
} | ||
|
||
input.mapPartitions[String](iterator => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we implement simpler? Looks like it only 2 things: So it could be implemented like val mapped = xs.map(x => csvFormat.format(toCsvCodec(c, codecConf): _*))
if (config.generateHeader) csvFormat.format(header: _*) :: mapped else mapped |
||
new Iterator[String] { | ||
|
||
private var firstRowAndShouldGenerateHeader = config.generateHeader | ||
|
||
private val codecConf = ToCsvCodecConfigs(new SimpleDateFormat(config.dateFormat)) | ||
|
||
private val csvFormat = createCsvFormat() | ||
|
||
override def hasNext: Boolean = iterator.hasNext | ||
|
||
override def next: String = { | ||
if (firstRowAndShouldGenerateHeader) { | ||
firstRowAndShouldGenerateHeader = false | ||
csvFormat.format(header: _*) | ||
} else { | ||
csvFormat.format(toCsvCodec(iterator.next(), codecConf): _*) | ||
} | ||
} | ||
} | ||
) | ||
} | ||
|
||
/** | ||
* Creates [[org.apache.commons.csv.CSVFormat]] | ||
* | ||
* @return [[org.apache.commons.csv.CSVFormat]] | ||
*/ | ||
@inline | ||
private def createCsvFormat(): CSVFormat = { | ||
CSVFormat.DEFAULT | ||
.withDelimiter(config.delimiter) | ||
.withQuote(config.quoteChar) | ||
.withEscape(config.escapeChar) | ||
.withNullString(config.nullValue) | ||
.withQuoteMode(config.quoteMode) | ||
} | ||
|
||
/** | ||
* Converts header `CSV` convertible format | ||
* | ||
* @param input [[org.apache.spark.rdd.RDD]] | ||
* @return Header in `CSV` formatted string | ||
*/ | ||
@inline | ||
protected def createHeader(input: RDD[A]): Array[AnyRef] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So should users of that transformer both provide a type class instance and extend class in order to use it? |
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs better documentation. What is the purpose? How can I use it?