Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add data synchronization test to verification Suite. #526

Merged
merged 3 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 54 additions & 10 deletions src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package com.amazon.deequ.analyzers
import com.amazon.deequ.analyzers.Analyzers._
import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
import com.amazon.deequ.analyzers.runners._
import com.amazon.deequ.metrics.FullColumn
import com.amazon.deequ.comparison.{DataSynchronization, DataSynchronizationFailed, DataSynchronizationSucceeded}
import com.amazon.deequ.metrics.{DoubleMetric, Entity, FullColumn, Metric}
import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.{Column, DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import scala.language.existentials
import scala.util.Failure
import scala.util.Success
import scala.util.{Failure, Success, Try}
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved

/**
* A state (sufficient statistic) computed from data, from which we can compute a metric.
Expand Down Expand Up @@ -299,6 +293,56 @@ abstract class GroupingAnalyzer[S <: State[_], +M <: Metric[_]] extends Analyzer
}
}

/**
* Data Synchronization Analyzer
*
* @param dfToCompare DataFrame to compare
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
* @param columnMappings columns mappings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can add more documentation here. Are these the key column mappings? What about the other parameter, comparison column mappings? Will that be added in a future PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, will update documentation.
I was planning to do this in phased manner instead of one big PR, will definitely do a follow up PR to add that as well.

* @param assertion assertion logic
*/
case class DataSynchronizationAnalyzer(dfToCompare: DataFrame,
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
columnMappings: Map[String, String],
assertion: Double => Boolean)
extends Analyzer[DataSynchronizationState, DoubleMetric] {

override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = {

val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion)

result match {
case succeeded: DataSynchronizationSucceeded =>
Some(DataSynchronizationState(succeeded.passedCount.getOrElse(0), succeeded.totalCount.getOrElse(0)))
case failed: DataSynchronizationFailed =>
Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0)))
case _ => None
}
}

override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = {

state match {
case Some(s) => DoubleMetric(
Entity.Dataset,
"DataSynchronization",
"",
Try(s.synchronizedDataCount.toDouble / s.dataCount.toDouble),
None
)
case None => DoubleMetric(
Entity.Dataset,
"DataSynchronization",
"",
Try(0.0),
None
)
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
}
}

override private[deequ] def toFailureMetric(failure: Exception) =
metricFromFailure(failure, "DataSynchronization", "", Entity.Dataset)
}


/** Helper method to check conditions on the schema of the data */
object Preconditions {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
* is located at
*
* http://aws.amazon.com/apache2.0/
*
* or in the "license" file accompanying this file. This file is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*
*/

package com.amazon.deequ.analyzers

/**
* To store state of DataSynchronization
*
* @param synchronizedDataCount - Count Of rows that are in sync
* @param dataCount - total count of records to caluculate ratio.
*/
case class DataSynchronizationState(synchronizedDataCount: Long, dataCount: Long)
extends DoubleValuedState[DataSynchronizationState] {
override def sum(other: DataSynchronizationState): DataSynchronizationState = {
DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, dataCount + other.dataCount)
}

override def metricValue(): Double = {
if (dataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / dataCount.toDouble
}
}

object DataSynchronizationState
43 changes: 39 additions & 4 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -16,8 +16,7 @@

package com.amazon.deequ.checks

import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.{Analyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.analyzers.{Analyzer, AnalyzerOptions, DataSynchronizationState, DataSynchronizationAnalyzer, Histogram, KLLParameters, Patterns, State}
import com.amazon.deequ.anomalydetection.{AnomalyDetectionStrategy, AnomalyDetector, DataPoint}
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.constraints.Constraint._
Expand All @@ -27,6 +26,7 @@ import com.amazon.deequ.repository.MetricsRepository
import org.apache.spark.sql.expressions.UserDefinedFunction
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.checks.ColumnCondition.{isAnyNotNull, isEachNotNull}
import org.apache.spark.sql.DataFrame
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved

import scala.util.matching.Regex

Expand Down Expand Up @@ -338,6 +338,38 @@ case class Check(
uniqueValueRatioConstraint(columns, assertion, filter, hint) }
}

/**
* Performs a data synchronization check between the base DataFrame supplied to
* [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's
* [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework.
* This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion.
*
* Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data
* and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]].
*
* @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the
* current DataFrame to assess data synchronization.
* @param columnMappings A map defining the column correlations between the current DataFrame and otherDf.
* Keys represent column names in the current DataFrame,
* and values are corresponding column names in otherDf.
* @param assertion A function that takes a Double (result of the comparison) and returns a Boolean.
* Defines the condition under which the data in both DataFrames is considered synchronized.
* For example (_ > 0.7) denoting metric value > 0.7 or 70% of records.
* @param hint Optional. Additional context or information about the synchronization check.
* Helpful for understanding the intent or specifics of the check. Default is None.
* @return A [[com.amazon.deequ.checks.Check]] object representing the outcome
* of the synchronization check. This object can be used in Deequ's verification suite to
* assert data quality constraints.
*/
def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean,
hint: Option[String] = None): Check = {

val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion)
val constraint = DataSynchronizationConstraint(dataSyncAnalyzer, hint)
addConstraint(constraint)

VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Creates a constraint that asserts on the number of distinct values a column has.
*
Expand Down Expand Up @@ -1092,7 +1124,10 @@ case class Check(
case nc: ConstraintDecorator => nc.inner
case c: Constraint => c
}
.collect { case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer }
.collect {
case constraint: AnalysisBasedConstraint[_, _, _] => constraint.analyzer
case constraint: DataSynchronizationConstraint => constraint.analyzer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't the existing statement match DataSynchronizationConstraint ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, yes we don't need this.

}
.map { _.asInstanceOf[Analyzer[_, Metric[_]]] }
.toSet
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -17,5 +17,11 @@
package com.amazon.deequ.comparison

sealed trait ComparisonResult
case class ComparisonFailed(errorMessage: String) extends ComparisonResult
case class ComparisonSucceeded() extends ComparisonResult

case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult
case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult
Comment on lines +21 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep 1 set of states. Right now, there are 2 sets of states. Comparison[Failed/Succeeded] vs DataSynchronization[Failed/Succeeded]. Having too many states can result in confusion for the end user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning, to consolidate once ReferentialIntegrity is also integrated into verification suite. I will handle that in next PR.


case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None,
totalCount: Option[Long] = None) extends ComparisonResult
case class DataSynchronizationSucceeded(passedCount: Option[Long] = None, totalCount: Option[Long] = None)
extends ComparisonResult
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand Down Expand Up @@ -107,7 +107,7 @@ object DataSynchronization extends ComparisonBase {
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
ComparisonFailed(columnErrors.get)
DataSynchronizationFailed(columnErrors.get)
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ object DataSynchronization extends ComparisonBase {
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
ComparisonFailed(keyColumnErrors.get)
DataSynchronizationFailed(keyColumnErrors.get)
}
}

Expand Down Expand Up @@ -260,12 +260,15 @@ object DataSynchronization extends ComparisonBase {
.reduce((e1, e2) => e1 && e2)

val joined = ds1.join(ds2, joinExpression, "inner")
val ratio = joined.count().toDouble / ds1Count
val passedCount = joined.count()
val totalCount = ds1Count
val ratio = passedCount.toDouble / totalCount.toDouble

if (assertion(ratio)) {
ComparisonSucceeded()
DataSynchronizationSucceeded(Some(passedCount), Some(totalCount))
} else {
ComparisonFailed(s"Value: $ratio does not meet the constraint requirement.")
DataSynchronizationFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" +
s"requirement.", Some(passedCount), Some(totalCount))
}
}
}
Expand Down
29 changes: 28 additions & 1 deletion src/main/scala/com/amazon/deequ/constraints/Constraint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
package com.amazon.deequ.constraints

import com.amazon.deequ.analyzers._
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, Metric}
import com.amazon.deequ.metrics.{BucketDistribution, Distribution, DoubleMetric, Metric}
import org.apache.spark.sql.expressions.UserDefinedFunction

import scala.util.{Success, Try}
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
import scala.util.matching.Regex

object ConstraintStatus extends Enumeration {
Expand Down Expand Up @@ -897,3 +898,29 @@ object Constraint {
}

}

/**
* Data Synchronization Constraint
* @param analyzer Data Synchronization Analyzer
* @param hint hint
*/
case class DataSynchronizationConstraint(analyzer: DataSynchronizationAnalyzer, hint: Option[String])
extends Constraint {

override def evaluate(metrics: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult = {

val anz = Try(metrics.filter(i => i._1.isInstanceOf[DataSynchronizationAnalyzer]).head._2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we write it as the following, for better readability?

val (_, anz) = Try(metrics.filter { case(analyzer, _)  => analyzer.isInstanceOf[DataSynchronizationAnalyzer] }.head)

What happens if .head is called on empty list? Is that possible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will refactor to make it improve readability, thanks for suggestion.

anz match {
case Success(m: DoubleMetric) =>
val result = m.value match {
case Success(value) => analyzer.assertion(value)
case _ => false
}
val status = if (result) ConstraintStatus.Success else ConstraintStatus.Failure
ConstraintResult(this, status, hint, Some(m))

case _ =>
ConstraintResult(this, ConstraintStatus.Failure, hint, anz.toOption)
}
}
}
11 changes: 8 additions & 3 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@ package com.amazon.deequ
import com.amazon.deequ.analyzers._
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.anomalydetection.AbsoluteChangeStrategy
import com.amazon.deequ.checks.Check
import com.amazon.deequ.checks.CheckLevel
import com.amazon.deequ.checks.CheckStatus
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import com.amazon.deequ.constraints.{Constraint, ConstraintResult}
import com.amazon.deequ.io.DfsUtils
import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric}
Expand Down Expand Up @@ -806,6 +804,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val complianceCheckThatShouldFailCompleteness = Check(CheckLevel.Error, "shouldErrorStringType")
.hasCompleteness("fake", x => x > 0)

val checkHasDataInSyncTest = Check(CheckLevel.Error, "shouldSucceedForAge")
.isDataSynchronized(df, Map("age" -> "age"), _ > 0.99, Some("shouldpass"))

val verificationResult = VerificationSuite()
.onData(df)
.addCheck(checkThatShouldSucceed)
Expand All @@ -815,6 +816,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
.addCheck(checkThatShouldFail)
.addCheck(complianceCheckThatShouldFail)
.addCheck(complianceCheckThatShouldFailCompleteness)
.addCheck(checkHasDataInSyncTest)
.run()

val checkSuccessResult = verificationResult.checkResults(checkThatShouldSucceed)
Expand Down Expand Up @@ -846,6 +848,9 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
checkFailedCompletenessResult.constraintResults.map(_.message) shouldBe
List(Some("Input data does not include column fake!"))
assert(checkFailedCompletenessResult.status == CheckStatus.Error)

val checkDataSyncResult = verificationResult.checkResults(checkHasDataInSyncTest)
checkDataSyncResult.status shouldBe CheckStatus.Success
}

"Well-defined checks should produce correct result even if another check throws an exception" in withSparkSession {
Expand Down
Loading