From 875e0382d9ae12f0b211e60ec22a5146bb1a6801 Mon Sep 17 00:00:00 2001 From: martinbomio Date: Wed, 12 Dec 2018 17:10:57 -0300 Subject: [PATCH] Add multikey case class --- .../com/spotify/ratatool/diffy/BigDiffy.scala | 42 +++++++++++-------- .../spotify/ratatool/diffy/BigDiffyTest.scala | 33 ++++++++------- .../examples/diffy/PreProcessBigDiffy.scala | 6 +-- .../diffy/ProtobufBigDiffyExample.scala | 7 ++-- .../ratatool/shapeless/CaseClassDiffy.scala | 5 +-- 5 files changed, 51 insertions(+), 42 deletions(-) diff --git a/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala b/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala index 9cca7ef1..e32a1c38 100644 --- a/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala +++ b/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala @@ -49,6 +49,14 @@ object DiffType extends Enumeration { val SAME, DIFFERENT, MISSING_LHS, MISSING_RHS = Value } +case class MultiKey(keys: Seq[String]) extends AnyVal { + override def toString: String = keys.mkString("_") +} + +object MultiKey { + def apply(key: String): MultiKey = MultiKey(Seq(key)) +} + /** * Key-field level [[DiffType]] and delta. * @@ -56,14 +64,14 @@ object DiffType extends Enumeration { * If DiffType is DIFFERENT, there is one KeyStats for every field that is different for that key * with that field's Delta * - * key - primary being compared. + * keys - primary being compared. * diffType - how the two records of the given key compares. * delta - a single field's difference including field name, values, and distance */ -case class KeyStats(key: String, diffType: DiffType.Value, delta: Option[Delta]) { +case class KeyStats(keys: MultiKey, diffType: DiffType.Value, delta: Option[Delta]) { override def toString: String = { val deltaStr = delta.map(_.toString).getOrElse("") - s"$key\t$diffType\t$deltaStr" + s"$keys\t$diffType\t$deltaStr" } } @@ -120,9 +128,9 @@ case class FieldStats(field: String, /** Big diff between two data sets given a primary key. */ class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T], - diffy: Diffy[T], keyFn: T => String) { + diffy: Diffy[T], keyFn: T => MultiKey) { - private lazy val _deltas: SCollection[(String, (Seq[Delta], DiffType.Value))] = + private lazy val _deltas: BigDiffy.DeltaSCollection = BigDiffy.computeDeltas(lhs, rhs, diffy, keyFn) private lazy val globalAndFieldStats: SCollection[(GlobalStats, Iterable[FieldStats])] = @@ -133,7 +141,7 @@ class BigDiffy[T](lhs: SCollection[T], rhs: SCollection[T], * * Output tuples are (key, field, LHS, RHS). Note that LHS and RHS may not be serializable. */ - lazy val deltas: SCollection[(String, String, Any, Any)] = + lazy val deltas: SCollection[(MultiKey, String, Any, Any)] = _deltas.flatMap { case (k, (ds, dt)) => ds.map(d => (k, d.field, d.left, d.right)) } @@ -164,10 +172,10 @@ object BigDiffy extends Command { val command: String = "bigDiffy" // (field, deltas, diff type) - type DeltaSCollection = SCollection[(String, (Seq[Delta], DiffType.Value))] + type DeltaSCollection = SCollection[(MultiKey, (Seq[Delta], DiffType.Value))] private def computeDeltas[T](lhs: SCollection[T], rhs: SCollection[T], - d: Diffy[T], keyFn: T => String): DeltaSCollection = { + d: Diffy[T], keyFn: T => MultiKey): DeltaSCollection = { // extract keys and prefix records with L/R sub-key val lKeyed = lhs.map(t => (keyFn(t), ("l", t))) val rKeyed = rhs.map(t => (keyFn(t), ("r", t))) @@ -253,13 +261,13 @@ object BigDiffy extends Command { /** Diff two data sets. */ def diff[T: ClassTag](lhs: SCollection[T], rhs: SCollection[T], - d: Diffy[T], keyFn: T => String): BigDiffy[T] = + d: Diffy[T], keyFn: T => MultiKey): BigDiffy[T] = new BigDiffy[T](lhs, rhs, d, keyFn) /** Diff two Avro data sets. */ def diffAvro[T <: GenericRecord : ClassTag](sc: ScioContext, lhs: String, rhs: String, - keyFn: T => String, + keyFn: T => MultiKey, diffy: AvroDiffy[T], schema: Schema = null): BigDiffy[T] = diff(sc.avroFile[T](lhs, schema), sc.avroFile[T](rhs, schema), diffy, keyFn) @@ -267,14 +275,14 @@ object BigDiffy extends Command { /** Diff two ProtoBuf data sets. */ def diffProtoBuf[T <: AbstractMessage : ClassTag](sc: ScioContext, lhs: String, rhs: String, - keyFn: T => String, + keyFn: T => MultiKey, diffy: ProtoBufDiffy[T]): BigDiffy[T] = diff(sc.protobufFile(lhs), sc.protobufFile(rhs), diffy, keyFn) /** Diff two TableRow data sets. */ def diffTableRow(sc: ScioContext, lhs: String, rhs: String, - keyFn: TableRow => String, + keyFn: TableRow => MultiKey, diffy: TableRowDiffy): BigDiffy[TableRow] = diff(sc.bigQueryTable(lhs), sc.bigQueryTable(rhs), diffy, keyFn) @@ -327,7 +335,7 @@ object BigDiffy extends Command { case BQ => // Saving to BQ, header irrelevant bigDiffy.keyStats.map(stat => - KeyStatsBigQuery(stat.key, stat.diffType.toString, stat.delta.map(d => { + KeyStatsBigQuery(stat.keys.toString, stat.diffType.toString, stat.delta.map(d => { val dv = d.delta match { case TypedDelta(dt, v) => DeltaValueBigQuery(dt.toString, Option(v)) @@ -411,7 +419,7 @@ object BigDiffy extends Command { sys.exit(1) } - private[diffy] def avroKeyFn(keys: Seq[String]): GenericRecord => String = { + private[diffy] def avroKeyFn(keys: Seq[String]): GenericRecord => MultiKey = { @tailrec def get(xs: Array[String], i: Int, r: GenericRecord): String = if (i == xs.length - 1) { @@ -421,10 +429,10 @@ object BigDiffy extends Command { } val xs = keys.map(_.split('.')) - (r: GenericRecord) => xs.map { x => get(x, 0, r) }.mkString("_") + (r: GenericRecord) => MultiKey(xs.map(x => get(x, 0, r))) } - private[diffy] def tableRowKeyFn(keys: Seq[String]): TableRow => String = { + private[diffy] def tableRowKeyFn(keys: Seq[String]): TableRow => MultiKey = { @tailrec def get(xs: Array[String], i: Int, r: java.util.Map[String, AnyRef]): String = if (i == xs.length - 1) { @@ -434,7 +442,7 @@ object BigDiffy extends Command { } val xs = keys.map(_.split('.')) - (r: TableRow) => xs.map { x => get(x, 0, r) }.mkString("_") + (r: TableRow) => MultiKey(xs.map(x => get(x, 0, r))) } def pathWithShards(path: String): String = path.replaceAll("\\/+$", "") + "/part" diff --git a/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala b/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala index cd1f399c..0a15c62c 100644 --- a/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala +++ b/ratatool-diffy/src/test/scala/com/spotify/ratatool/diffy/BigDiffyTest.scala @@ -31,7 +31,7 @@ import com.google.api.services.bigquery.model.TableRow class BigDiffyTest extends PipelineSpec { - val keys = (1 to 1000).map("key" + _) + val keys = (1 to 1000).map(k => MultiKey("key" + k)) val coder = AvroCoder.of(classOf[TestRecord]) /** Fixed to a small range so that Std. Dev. & Variance calculations are easier to predict */ @@ -53,7 +53,7 @@ class BigDiffyTest extends PipelineSpec { val rhs = lhs.map(CoderUtils.clone(coder, _)) val result = BigDiffy.diff[TestRecord]( sc.parallelize(lhs), sc.parallelize(rhs), - new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString) + new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString)) result.globalStats should containSingleValue (GlobalStats(1000L, 1000L, 0L, 0L, 0L)) result.deltas should beEmpty result.keyStats should containInAnyOrder (keys.map(KeyStats(_, DiffType.SAME, None))) @@ -63,19 +63,20 @@ class BigDiffyTest extends PipelineSpec { it should "work with deltas" in { runWithContext { sc => - val keyedDoubles = lhs.map(i => - (i.getRequiredFields.getStringField.toString , i.getRequiredFields.getDoubleField)) + val keyedDoubles = lhs.map { i => + (MultiKey(i.getRequiredFields.getStringField.toString), i.getRequiredFields.getDoubleField) + } val rhs = lhs.map(CoderUtils.clone(coder, _)).map { r => r.getRequiredFields.setDoubleField(r.getRequiredFields.getDoubleField + 10.0) r } val result = BigDiffy.diff[TestRecord]( sc.parallelize(lhs), sc.parallelize(rhs), - new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString) + new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString)) result.globalStats should containSingleValue (GlobalStats(1000L, 0L, 1000L, 0L, 0L)) result.deltas.map(d => (d._1, d._2)) should containInAnyOrder ( keys.map((_, field))) - result.keyStats should containInAnyOrder (keyedDoubles.map{case (k, d) => + result.keyStats should containInAnyOrder (keyedDoubles.map { case (k, d) => KeyStats(k, DiffType.DIFFERENT, Option(Delta("required_fields.double_field", d, d + 10.0, TypedDelta(DeltaType.NUMERIC, 10.0))))}) result.fieldStats.map(f => (f.field, f.count, f.fraction)) should containSingleValue ( @@ -94,12 +95,12 @@ class BigDiffyTest extends PipelineSpec { val rhs = lhs.map(CoderUtils.clone(coder, _)) val result = BigDiffy.diff[TestRecord]( sc.parallelize(lhs2), sc.parallelize(rhs), - new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString) + new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString)) result.globalStats should containSingleValue (GlobalStats(1000L, 500L, 0L, 500L, 0L)) result.deltas should beEmpty result.keyStats should containInAnyOrder ( - (1 to 500).map(i => KeyStats("key" + i, DiffType.SAME, None)) ++ - (501 to 1000).map(i => KeyStats("key" + i, DiffType.MISSING_LHS, None))) + (1 to 500).map(i => KeyStats(MultiKey("key" + i), DiffType.SAME, None)) ++ + (501 to 1000).map(i => KeyStats(MultiKey("key" + i), DiffType.MISSING_LHS, None))) result.fieldStats should beEmpty } } @@ -109,12 +110,12 @@ class BigDiffyTest extends PipelineSpec { val rhs = lhs.filter(_.getRequiredFields.getIntField <= 500).map(CoderUtils.clone(coder, _)) val result = BigDiffy.diff[TestRecord]( sc.parallelize(lhs), sc.parallelize(rhs), - new AvroDiffy[TestRecord](), _.getRequiredFields.getStringField.toString) + new AvroDiffy[TestRecord](), x => MultiKey(x.getRequiredFields.getStringField.toString)) result.globalStats should containSingleValue (GlobalStats(1000L, 500L, 0L, 0L, 500L)) result.deltas should beEmpty result.keyStats should containInAnyOrder ( - (1 to 500).map(i => KeyStats("key" + i, DiffType.SAME, None)) ++ - (501 to 1000).map(i => KeyStats("key" + i, DiffType.MISSING_RHS, None))) + (1 to 500).map(i => KeyStats(MultiKey("key" + i), DiffType.SAME, None)) ++ + (501 to 1000).map(i => KeyStats(MultiKey("key" + i), DiffType.MISSING_RHS, None))) result.fieldStats should beEmpty } } @@ -123,7 +124,7 @@ class BigDiffyTest extends PipelineSpec { val record = specificRecordOf[TestRecord].sample.get val keyValue = BigDiffy.avroKeyFn(Seq("required_fields.int_field"))(record) - keyValue shouldBe record.getRequiredFields.getIntField.toString + keyValue.toString shouldBe record.getRequiredFields.getIntField.toString } "BigDiffy avroKeyFn" should "work with multiple key" in { @@ -133,7 +134,7 @@ class BigDiffyTest extends PipelineSpec { val expectedKey = s"${record.getRequiredFields.getIntField}_${record.getRequiredFields.getDoubleField}" - keyValues shouldBe expectedKey + keyValues.toString shouldBe expectedKey } "BigDiffy tableRowKeyFn" should "work with single key" in { @@ -141,7 +142,7 @@ class BigDiffyTest extends PipelineSpec { record.set("key", "foo") val keyValue = BigDiffy.tableRowKeyFn(Seq("key"))(record) - keyValue shouldBe "foo" + keyValue.toString shouldBe "foo" } "BigDiffy tableRowKeyFn" should "work with multiple key" in { @@ -154,6 +155,6 @@ class BigDiffyTest extends PipelineSpec { val keys = Seq("record.key", "record.other_key") val keyValues = BigDiffy.tableRowKeyFn(keys)(record.asInstanceOf[TableRow]) - keyValues shouldBe "foo_bar" + keyValues.toString shouldBe "foo_bar" } } diff --git a/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/PreProcessBigDiffy.scala b/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/PreProcessBigDiffy.scala index df3c72ad..a8ccc573 100644 --- a/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/PreProcessBigDiffy.scala +++ b/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/PreProcessBigDiffy.scala @@ -18,14 +18,14 @@ package com.spotify.ratatool.examples.diffy import com.spotify.ratatool.avro.specific.ExampleRecord -import com.spotify.ratatool.diffy.{AvroDiffy, BigDiffy} +import com.spotify.ratatool.diffy.{AvroDiffy, BigDiffy, MultiKey} import com.spotify.scio.ContextAndArgs import org.apache.beam.sdk.coders.AvroCoder import org.apache.beam.sdk.util.CoderUtils object PreProcessBigDiffy { - def recordKeyFn(r: ExampleRecord): String = { - r.getRecordId.toString + def recordKeyFn(r: ExampleRecord): MultiKey = { + MultiKey(r.getRecordId.toString) } def mapFn(coder: => AvroCoder[ExampleRecord])(r: ExampleRecord): ExampleRecord = { diff --git a/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/ProtobufBigDiffyExample.scala b/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/ProtobufBigDiffyExample.scala index 1ec1019e..bd860ab8 100644 --- a/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/ProtobufBigDiffyExample.scala +++ b/ratatool-examples/src/main/scala/com/spotify/ratatool/examples/diffy/ProtobufBigDiffyExample.scala @@ -20,14 +20,15 @@ package com.spotify.ratatool.examples.diffy import java.net.URI import com.spotify.ratatool.GcsConfiguration -import com.spotify.ratatool.diffy.{BigDiffy, ProtoBufDiffy} +import com.spotify.ratatool.diffy.{BigDiffy, MultiKey, ProtoBufDiffy} import com.spotify.ratatool.examples.proto.Schemas.ExampleRecord import org.apache.hadoop.fs.{FileSystem, Path} + import com.spotify.scio._ object ProtobufBigDiffyExample { - def recordKeyFn(t: ExampleRecord): String = { - t.getStringField + def recordKeyFn(t: ExampleRecord): MultiKey = { + MultiKey(t.getStringField) } def main(cmdlineArgs: Array[String]): Unit = { diff --git a/ratatool-shapeless/src/main/scala/com/spotify/ratatool/shapeless/CaseClassDiffy.scala b/ratatool-shapeless/src/main/scala/com/spotify/ratatool/shapeless/CaseClassDiffy.scala index c06f1fa4..2a03953d 100644 --- a/ratatool-shapeless/src/main/scala/com/spotify/ratatool/shapeless/CaseClassDiffy.scala +++ b/ratatool-shapeless/src/main/scala/com/spotify/ratatool/shapeless/CaseClassDiffy.scala @@ -18,12 +18,11 @@ package com.spotify.ratatool.shapeless import com.spotify.ratatool.Command -import com.spotify.ratatool.diffy.{BigDiffy, Delta, Diffy} +import com.spotify.ratatool.diffy.{BigDiffy, Delta, Diffy, MultiKey} import com.spotify.ratatool.diffy.BigDiffy.diff import com.spotify.scio.values.SCollection import shapeless._ import shapeless.labelled.FieldType - import scala.reflect.ClassTag @SerialVersionUID(42L) @@ -173,7 +172,7 @@ object CaseClassDiffy { /** Diff two SCollection[T] **/ def diffCaseClass[T : ClassTag : MapEncoder](lhs: SCollection[T], rhs: SCollection[T], - keyFn: T => String, + keyFn: T => MultiKey, diffy: CaseClassDiffy[T]) : BigDiffy[T] = diff(lhs, rhs, diffy, keyFn) }