Skip to content

Commit

Permalink
Add multikey case class
Browse files Browse the repository at this point in the history
  • Loading branch information
martinbomio committed Dec 12, 2018
1 parent db1a2ab commit 875e038
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,29 @@ object DiffType extends Enumeration {
val SAME, DIFFERENT, MISSING_LHS, MISSING_RHS = Value
}

case class MultiKey(keys: Seq[String]) extends AnyVal {
override def toString: String = keys.mkString("_")
}

object MultiKey {
def apply(key: String): MultiKey = MultiKey(Seq(key))
}

/**
* Key-field level [[DiffType]] and delta.
*
* If DiffType are SAME, MISSING_LHS, or MISSING_RHS they will appear once with no Delta
* If DiffType is DIFFERENT, there is one KeyStats for every field that is different for that key
* with that field's Delta
*
* key - primary being compared.
* keys - primary being compared.
* diffType - how the two records of the given key compares.
* delta - a single field's difference including field name, values, and distance
*/
case class KeyStats(key: String, diffType: DiffType.Value, delta: Option[Delta]) {
case class KeyStats(keys: MultiKey, diffType: DiffType.Value, delta: Option[Delta]) {
override def toString: String = {
val deltaStr = delta.map(_.toString).getOrElse("")
s"$key\t$diffType\t$deltaStr"
s"$keys\t$diffType\t$deltaStr"
}
}

Expand Down Expand Up @@ -120,9 +128,9 @@ case class FieldStats(field: String,

/** Big diff between two data sets given a primary key. */
class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T],
diffy: Diffy[T], keyFn: T => String) {
diffy: Diffy[T], keyFn: T => MultiKey) {

private lazy val _deltas: SCollection[(String, (Seq[Delta], DiffType.Value))] =
private lazy val _deltas: BigDiffy.DeltaSCollection =
BigDiffy.computeDeltas(lhs, rhs, diffy, keyFn)

private lazy val globalAndFieldStats: SCollection[(GlobalStats, Iterable[FieldStats])] =
Expand All @@ -133,7 +141,7 @@ class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T],
*
* Output tuples are (key, field, LHS, RHS). Note that LHS and RHS may not be serializable.
*/
lazy val deltas: SCollection[(String, String, Any, Any)] =
lazy val deltas: SCollection[(MultiKey, String, Any, Any)] =
_deltas.flatMap { case (k, (ds, dt)) =>
ds.map(d => (k, d.field, d.left, d.right))
}
Expand Down Expand Up @@ -164,10 +172,10 @@ object BigDiffy extends Command {
val command: String = "bigDiffy"

// (field, deltas, diff type)
type DeltaSCollection = SCollection[(String, (Seq[Delta], DiffType.Value))]
type DeltaSCollection = SCollection[(MultiKey, (Seq[Delta], DiffType.Value))]

private def computeDeltas[T](lhs: SCollection[T], rhs: SCollection[T],
d: Diffy[T], keyFn: T => String): DeltaSCollection = {
d: Diffy[T], keyFn: T => MultiKey): DeltaSCollection = {
// extract keys and prefix records with L/R sub-key
val lKeyed = lhs.map(t => (keyFn(t), ("l", t)))
val rKeyed = rhs.map(t => (keyFn(t), ("r", t)))
Expand Down Expand Up @@ -253,28 +261,28 @@ object BigDiffy extends Command {

/** Diff two data sets. */
def diff[T: ClassTag](lhs: SCollection[T], rhs: SCollection[T],
d: Diffy[T], keyFn: T => String): BigDiffy[T] =
d: Diffy[T], keyFn: T => MultiKey): BigDiffy[T] =
new BigDiffy[T](lhs, rhs, d, keyFn)

/** Diff two Avro data sets. */
def diffAvro[T <: GenericRecord : ClassTag](sc: ScioContext,
lhs: String, rhs: String,
keyFn: T => String,
keyFn: T => MultiKey,
diffy: AvroDiffy[T],
schema: Schema = null): BigDiffy[T] =
diff(sc.avroFile[T](lhs, schema), sc.avroFile[T](rhs, schema), diffy, keyFn)

/** Diff two ProtoBuf data sets. */
def diffProtoBuf[T <: AbstractMessage : ClassTag](sc: ScioContext,
lhs: String, rhs: String,
keyFn: T => String,
keyFn: T => MultiKey,
diffy: ProtoBufDiffy[T]): BigDiffy[T] =
diff(sc.protobufFile(lhs), sc.protobufFile(rhs), diffy, keyFn)

/** Diff two TableRow data sets. */
def diffTableRow(sc: ScioContext,
lhs: String, rhs: String,
keyFn: TableRow => String,
keyFn: TableRow => MultiKey,
diffy: TableRowDiffy): BigDiffy[TableRow] =
diff(sc.bigQueryTable(lhs), sc.bigQueryTable(rhs), diffy, keyFn)

Expand Down Expand Up @@ -327,7 +335,7 @@ object BigDiffy extends Command {
case BQ =>
// Saving to BQ, header irrelevant
bigDiffy.keyStats.map(stat =>
KeyStatsBigQuery(stat.key, stat.diffType.toString, stat.delta.map(d => {
KeyStatsBigQuery(stat.keys.toString, stat.diffType.toString, stat.delta.map(d => {
val dv = d.delta match {
case TypedDelta(dt, v) =>
DeltaValueBigQuery(dt.toString, Option(v))
Expand Down Expand Up @@ -411,7 +419,7 @@ object BigDiffy extends Command {
sys.exit(1)
}

private[diffy] def avroKeyFn(keys: Seq[String]): GenericRecord => String = {
private[diffy] def avroKeyFn(keys: Seq[String]): GenericRecord => MultiKey = {
@tailrec
def get(xs: Array[String], i: Int, r: GenericRecord): String =
if (i == xs.length - 1) {
Expand All @@ -421,10 +429,10 @@ object BigDiffy extends Command {
}

val xs = keys.map(_.split('.'))
(r: GenericRecord) => xs.map { x => get(x, 0, r) }.mkString("_")
(r: GenericRecord) => MultiKey(xs.map(x => get(x, 0, r)))
}

private[diffy] def tableRowKeyFn(keys: Seq[String]): TableRow => String = {
private[diffy] def tableRowKeyFn(keys: Seq[String]): TableRow => MultiKey = {
@tailrec
def get(xs: Array[String], i: Int, r: java.util.Map[String, AnyRef]): String =
if (i == xs.length - 1) {
Expand All @@ -434,7 +442,7 @@ object BigDiffy extends Command {
}

val xs = keys.map(_.split('.'))
(r: TableRow) => xs.map { x => get(x, 0, r) }.mkString("_")
(r: TableRow) => MultiKey(xs.map(x => get(x, 0, r)))
}

def pathWithShards(path: String): String = path.replaceAll("\\/+$", "") + "/part"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import com.google.api.services.bigquery.model.TableRow

class BigDiffyTest extends PipelineSpec {

val keys = (1 to 1000).map("key" + _)
val keys = (1 to 1000).map(k => MultiKey("key" + k))
val coder = AvroCoder.of(classOf[TestRecord])

/** Fixed to a small range so that Std. Dev. & Variance calculations are easier to predict */
Expand All @@ -53,7 +53,7 @@ class BigDiffyTest extends PipelineSpec {
val rhs = lhs.map(CoderUtils.clone(coder, _))
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 1000L, 0L, 0L, 0L))
result.deltas should beEmpty
result.keyStats should containInAnyOrder (keys.map(KeyStats(_, DiffType.SAME, None)))
Expand All @@ -63,19 +63,20 @@ class BigDiffyTest extends PipelineSpec {

it should "work with deltas" in {
runWithContext { sc =>
val keyedDoubles = lhs.map(i =>
(i.getRequiredFields.getStringField.toString , i.getRequiredFields.getDoubleField))
val keyedDoubles = lhs.map { i =>
(MultiKey(i.getRequiredFields.getStringField.toString), i.getRequiredFields.getDoubleField)
}
val rhs = lhs.map(CoderUtils.clone(coder, _)).map { r =>
r.getRequiredFields.setDoubleField(r.getRequiredFields.getDoubleField + 10.0)
r
}
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 0L, 1000L, 0L, 0L))
result.deltas.map(d => (d._1, d._2)) should containInAnyOrder (
keys.map((_, field)))
result.keyStats should containInAnyOrder (keyedDoubles.map{case (k, d) =>
result.keyStats should containInAnyOrder (keyedDoubles.map { case (k, d) =>
KeyStats(k, DiffType.DIFFERENT, Option(Delta("required_fields.double_field", d, d + 10.0,
TypedDelta(DeltaType.NUMERIC, 10.0))))})
result.fieldStats.map(f => (f.field, f.count, f.fraction)) should containSingleValue (
Expand All @@ -94,12 +95,12 @@ class BigDiffyTest extends PipelineSpec {
val rhs = lhs.map(CoderUtils.clone(coder, _))
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs2), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 500L, 0L, 500L, 0L))
result.deltas should beEmpty
result.keyStats should containInAnyOrder (
(1 to 500).map(i => KeyStats("key" + i, DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats("key" + i, DiffType.MISSING_LHS, None)))
(1 to 500).map(i => KeyStats(MultiKey("key" + i), DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats(MultiKey("key" + i), DiffType.MISSING_LHS, None)))
result.fieldStats should beEmpty
}
}
Expand All @@ -109,12 +110,12 @@ class BigDiffyTest extends PipelineSpec {
val rhs = lhs.filter(_.getRequiredFields.getIntField <= 500).map(CoderUtils.clone(coder, _))
val result = BigDiffy.diff[TestRecord](
sc.parallelize(lhs), sc.parallelize(rhs),
new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString)
new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString))
result.globalStats should containSingleValue (GlobalStats(1000L, 500L, 0L, 0L, 500L))
result.deltas should beEmpty
result.keyStats should containInAnyOrder (
(1 to 500).map(i => KeyStats("key" + i, DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats("key" + i, DiffType.MISSING_RHS, None)))
(1 to 500).map(i => KeyStats(MultiKey("key" + i), DiffType.SAME, None)) ++
(501 to 1000).map(i => KeyStats(MultiKey("key" + i), DiffType.MISSING_RHS, None)))
result.fieldStats should beEmpty
}
}
Expand All @@ -123,7 +124,7 @@ class BigDiffyTest extends PipelineSpec {
val record = specificRecordOf[TestRecord].sample.get
val keyValue = BigDiffy.avroKeyFn(Seq("required_fields.int_field"))(record)

keyValue shouldBe record.getRequiredFields.getIntField.toString
keyValue.toString shouldBe record.getRequiredFields.getIntField.toString
}

"BigDiffy avroKeyFn" should "work with multiple key" in {
Expand All @@ -133,15 +134,15 @@ class BigDiffyTest extends PipelineSpec {
val expectedKey =
s"${record.getRequiredFields.getIntField}_${record.getRequiredFields.getDoubleField}"

keyValues shouldBe expectedKey
keyValues.toString shouldBe expectedKey
}

"BigDiffy tableRowKeyFn" should "work with single key" in {
val record = new TableRow()
record.set("key", "foo")
val keyValue = BigDiffy.tableRowKeyFn(Seq("key"))(record)

keyValue shouldBe "foo"
keyValue.toString shouldBe "foo"
}

"BigDiffy tableRowKeyFn" should "work with multiple key" in {
Expand All @@ -154,6 +155,6 @@ class BigDiffyTest extends PipelineSpec {
val keys = Seq("record.key", "record.other_key")
val keyValues = BigDiffy.tableRowKeyFn(keys)(record.asInstanceOf[TableRow])

keyValues shouldBe "foo_bar"
keyValues.toString shouldBe "foo_bar"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package com.spotify.ratatool.examples.diffy

import com.spotify.ratatool.avro.specific.ExampleRecord
import com.spotify.ratatool.diffy.{AvroDiffy, BigDiffy}
import com.spotify.ratatool.diffy.{AvroDiffy, BigDiffy, MultiKey}
import com.spotify.scio.ContextAndArgs
import org.apache.beam.sdk.coders.AvroCoder
import org.apache.beam.sdk.util.CoderUtils

object PreProcessBigDiffy {
def recordKeyFn(r: ExampleRecord): String = {
r.getRecordId.toString
def recordKeyFn(r: ExampleRecord): MultiKey = {
MultiKey(r.getRecordId.toString)
}

def mapFn(coder: => AvroCoder[ExampleRecord])(r: ExampleRecord): ExampleRecord = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package com.spotify.ratatool.examples.diffy
import java.net.URI

import com.spotify.ratatool.GcsConfiguration
import com.spotify.ratatool.diffy.{BigDiffy, ProtoBufDiffy}
import com.spotify.ratatool.diffy.{BigDiffy, MultiKey, ProtoBufDiffy}
import com.spotify.ratatool.examples.proto.Schemas.ExampleRecord
import org.apache.hadoop.fs.{FileSystem, Path}

import com.spotify.scio._

object ProtobufBigDiffyExample {
def recordKeyFn(t: ExampleRecord): String = {
t.getStringField
def recordKeyFn(t: ExampleRecord): MultiKey = {
MultiKey(t.getStringField)
}

def main(cmdlineArgs: Array[String]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@
package com.spotify.ratatool.shapeless

import com.spotify.ratatool.Command
import com.spotify.ratatool.diffy.{BigDiffy, Delta, Diffy}
import com.spotify.ratatool.diffy.{BigDiffy, Delta, Diffy, MultiKey}
import com.spotify.ratatool.diffy.BigDiffy.diff
import com.spotify.scio.values.SCollection
import shapeless._
import shapeless.labelled.FieldType

import scala.reflect.ClassTag

@SerialVersionUID(42L)
Expand Down Expand Up @@ -173,7 +172,7 @@ object CaseClassDiffy {
/** Diff two SCollection[T] **/
def diffCaseClass[T : ClassTag : MapEncoder](lhs: SCollection[T],
rhs: SCollection[T],
keyFn: T => String,
keyFn: T => MultiKey,
diffy: CaseClassDiffy[T]) : BigDiffy[T] =
diff(lhs, rhs, diffy, keyFn)
}

0 comments on commit 875e038

Please sign in to comment.