Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support col match and change to DatasetMatch #529

Merged
merged 3 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package com.amazon.deequ.analyzers

import com.amazon.deequ.analyzers.Analyzers.metricFromFailure
import com.amazon.deequ.comparison.DataSynchronization
import com.amazon.deequ.comparison.DataSynchronizationFailed
import com.amazon.deequ.comparison.DataSynchronizationSucceeded
import com.amazon.deequ.comparison.DatasetMatchFailed
import com.amazon.deequ.comparison.DatasetMatchSucceeded
import com.amazon.deequ.metrics.DoubleMetric
import com.amazon.deequ.metrics.Entity
import org.apache.spark.sql.DataFrame
Expand All @@ -29,59 +29,67 @@ import scala.util.Try


/**
* An Analyzer for Deequ that performs a data synchronization check between two DataFrames.
* It evaluates the degree of synchronization based on specified column mappings and an assertion function.
* An Analyzer for Deequ that performs a dataset match check between two DataFrames.
* It evaluates the degree of match based on specified column mappings and an assertion function.
*
* The analyzer computes a ratio of synchronized data points to the total data points, represented as a DoubleMetric.
* Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for DataSynchronization implementation
* The analyzer computes a ratio of matched data points to the total data points, represented as a DoubleMetric.
* Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for dataset match implementation
*
* @param dfToCompare The DataFrame to compare with the primary DataFrame that is setup
* during [[com.amazon.deequ.VerificationSuite.onData]] setup.
* @param columnMappings A map where each key-value pair represents a column in the primary DataFrame
* and its corresponding column in dfToCompare.
* @param matchColumnMappings A map defining the column correlations between the current DataFrame and otherDf.
* These are the columns which we will check for equality, post joining. It's an optional value
* with defaults to None.
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
* @param assertion A function that takes a Double (the match ratio) and returns a Boolean.
* It defines the condition for successful synchronization.
*
* Usage:
* This analyzer is used in Deequ's VerificationSuite based if `isDataSynchronized` check is defined or could be used
* This analyzer is used in Deequ's VerificationSuite based if `isDatasetMatched` check is defined or could be used
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
* manually as well.
*
* Example:
* val analyzer = DataSynchronizationAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8)
* val analyzer = DatasetMatchAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8)
* val verificationResult = VerificationSuite().onData(df).addAnalyzer(analyzer).run()
*
* // or could do something like below
* val verificationResult = VerificationSuite().onData(df).isDataSynchronized(dfToCompare, Map("col1" -> "col2"),
* val verificationResult = VerificationSuite().onData(df).isDatasetMatched(dfToCompare, Map("col1" -> "col2"),
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
* _ > 0.8).run()
*
*
* The computeStateFrom method calculates the synchronization state by comparing the specified columns of the two
* The computeStateFrom method calculates the datasetmatch state by comparing the specified columns of the two
* DataFrames.
* The computeMetricFrom method then converts this state into a DoubleMetric representing the synchronization ratio.
* The computeMetricFrom method then converts this state into a DoubleMetric representing the match ratio.
*
*/
case class DataSynchronizationAnalyzer(dfToCompare: DataFrame,
columnMappings: Map[String, String],
assertion: Double => Boolean)
extends Analyzer[DataSynchronizationState, DoubleMetric] {
case class DatasetMatchAnalyzer(dfToCompare: DataFrame,
columnMappings: Map[String, String],
assertion: Double => Boolean,
matchColumnMappings: Option[Map[String, String]] = None)
extends Analyzer[DatasetMatchState, DoubleMetric] {

override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = {
override def computeStateFrom(data: DataFrame): Option[DatasetMatchState] = {

val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion)
val result = if (matchColumnMappings.isDefined) {
DataSynchronization.columnMatch(data, dfToCompare, columnMappings, matchColumnMappings.get, assertion)
} else {
DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion)
}

result match {
case succeeded: DataSynchronizationSucceeded =>
Some(DataSynchronizationState(succeeded.passedCount, succeeded.totalCount))
case failed: DataSynchronizationFailed =>
Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0)))
case succeeded: DatasetMatchSucceeded =>
Some(DatasetMatchState(succeeded.passedCount, succeeded.totalCount))
case failed: DatasetMatchFailed =>
Some(DatasetMatchState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0)))
case _ => None
}
}

override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = {
override def computeMetricFrom(state: Option[DatasetMatchState]): DoubleMetric = {

val metric = state match {
case Some(s) => Try(s.synchronizedDataCount.toDouble / s.totalDataCount.toDouble)
case Some(s) => Try(s.matchedDataCount.toDouble / s.totalDataCount.toDouble)
case _ => Failure(new IllegalStateException("No state available for DataSynchronizationAnalyzer"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,32 @@
package com.amazon.deequ.analyzers

/**
* Represents the state of data synchronization between two DataFrames in Deequ.
* This state keeps track of the count of synchronized record count and the total record count.
* It is used to calculate a ratio of synchronization, which is a measure of how well the data
* in the two DataFrames are synchronized.
* Represents the state of datasetMatch between two DataFrames in Deequ.
* This state keeps track of the count of matched record count and the total record count.
* It is used to calculate a ratio of match, which is a measure of how well the data
* in the two DataFrames are matched.
*
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
* @param synchronizedDataCount The count of records that are considered synchronized between the two DataFrames.
* @param matchedDataCount The count of records that are considered match between the two DataFrames.
* @param totalDataCount The total count of records for check.
*
* The `sum` method allows for aggregation of this state with another, combining the counts from both states.
* This is useful in distributed computations where states from different partitions need to be aggregated.
*
* The `metricValue` method computes the synchronization ratio. It is the ratio of `synchronizedDataCount`
* The `metricValue` method computes the synchronization ratio. It is the ratio of `matchedDataCount`
* to `dataCount`.
* If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN`
* to indicate the undefined state.
*
*/
case class DataSynchronizationState(synchronizedDataCount: Long, totalDataCount: Long)
extends DoubleValuedState[DataSynchronizationState] {
override def sum(other: DataSynchronizationState): DataSynchronizationState = {
DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, totalDataCount + other.totalDataCount)
case class DatasetMatchState(matchedDataCount: Long, totalDataCount: Long)
extends DoubleValuedState[DatasetMatchState] {
override def sum(other: DatasetMatchState): DatasetMatchState = {
DatasetMatchState(matchedDataCount + other.matchedDataCount, totalDataCount + other.totalDataCount)
}

override def metricValue(): Double = {
if (totalDataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / totalDataCount.toDouble
if (totalDataCount == 0L) Double.NaN else matchedDataCount.toDouble / totalDataCount.toDouble
}
}

object DataSynchronizationState
object DatasetMatchState
35 changes: 20 additions & 15 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package com.amazon.deequ.checks
import com.amazon.deequ.analyzers.runners.AnalyzerContext
import com.amazon.deequ.analyzers.Analyzer
import com.amazon.deequ.analyzers.AnalyzerOptions
import com.amazon.deequ.analyzers.DataSynchronizationAnalyzer
import com.amazon.deequ.analyzers.DataSynchronizationState
import com.amazon.deequ.analyzers.DatasetMatchAnalyzer
import com.amazon.deequ.analyzers.DatasetMatchState
import com.amazon.deequ.analyzers.Histogram
import com.amazon.deequ.analyzers.KLLParameters
import com.amazon.deequ.analyzers.Patterns
Expand Down Expand Up @@ -351,13 +351,13 @@ case class Check(
}

/**
* Performs a data synchronization check between the base DataFrame supplied to
* Performs a dataset check between the base DataFrame supplied to
* [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's
* [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework.
* This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion.
* This method compares specified columns of both DataFrames and assesses match based on a custom assertion.
*
* Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data
* and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]].
* Utilizes [[com.amazon.deequ.analyzers.DatasetMatchAnalyzer]] for comparing the data
* and Constraint [[com.amazon.deequ.constraints.DatasetMatchConstraint]].
*
* Usage:
* To use this method, create a VerificationSuite and invoke this method as part of adding checks:
Expand All @@ -368,37 +368,42 @@ case class Check(
* val assertionFunction: Double => Boolean = _ > 0.7
*
* val check = new Check(CheckLevel.Error, "Data Synchronization Check")
* .isDataSynchronized(otherDataFrame, columnMappings, assertionFunction)
* .isDatasetMatched(otherDataFrame, columnMappings, assertionFunction)
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
*
* val verificationResult = VerificationSuite()
* .onData(baseDataFrame)
* .addCheck(check)
* .run()
* }}}
*
* This will add a data synchronization check to the VerificationSuite, comparing the specified columns of
* This will add a dataset match check to the VerificationSuite, comparing the specified columns of
* baseDataFrame and otherDataFrame based on the provided assertion function.
*
*
* @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the
* current DataFrame to assess data synchronization.
* @param columnMappings A map defining the column correlations between the current DataFrame and otherDf.
* @param keyColumnMappings A map defining the column correlations between the current DataFrame and otherDf.
* Keys represent column names in the current DataFrame,
* and values are corresponding column names in otherDf.
* @param assertion A function that takes a Double (result of the comparison) and returns a Boolean.
* Defines the condition under which the data in both DataFrames is considered synchronized.
* For example (_ > 0.7) denoting metric value > 0.7 or 70% of records.
* @param matchColumnMappings A map defining the column correlations between the current DataFrame and otherDf.
* These are the columns which we will check for equality, post joining. It's an optional value
* with defaults to None, which will be derived from `keyColumnMappings` if None.
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
* @param hint Optional. Additional context or information about the synchronization check.
* Helpful for understanding the intent or specifics of the check. Default is None.
* @return A [[com.amazon.deequ.checks.Check]] object representing the outcome
* of the synchronization check. This object can be used in Deequ's verification suite to
* of the dataset match check. This object can be used in Deequ's verification suite to
* assert data quality constraints.
*
*/
def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean,
hint: Option[String] = None): Check = {
val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion)
val constraint = AnalysisBasedConstraint[DataSynchronizationState, Double, Double](dataSyncAnalyzer, assertion,
def isDatasetMatched(otherDf: DataFrame,
VenkataKarthikP marked this conversation as resolved.
Show resolved Hide resolved
keyColumnMappings: Map[String, String],
assertion: Double => Boolean,
matchColumnMappings: Option[Map[String, String]] = None,
hint: Option[String] = None): Check = {
val dataMatchAnalyzer = DatasetMatchAnalyzer(otherDf, keyColumnMappings, assertion, matchColumnMappings)
val constraint = AnalysisBasedConstraint[DatasetMatchState, Double, Double](dataMatchAnalyzer, assertion,
hint = hint)
addConstraint(constraint)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ sealed trait ComparisonResult
case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult
case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult

case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None,
totalCount: Option[Long] = None) extends ComparisonResult
case class DataSynchronizationSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult
case class DatasetMatchFailed(errorMessage: String, passedCount: Option[Long] = None,
totalCount: Option[Long] = None) extends ComparisonResult
case class DatasetMatchSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,13 @@ object DataSynchronization extends ComparisonBase {
val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _))

if (!nonKeyColsMatch) {
DataSynchronizationFailed("Non key columns in the given data frames do not match.")
DatasetMatchFailed("Non key columns in the given data frames do not match.")
} else {
val mergedMaps = colKeyMap ++ colsDS1.map(x => x -> x).toMap
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
DataSynchronizationFailed(columnErrors.get)
DatasetMatchFailed(columnErrors.get)
}
}

Expand Down Expand Up @@ -138,17 +138,17 @@ object DataSynchronization extends ComparisonBase {
val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _))

if (nonKeyColumns1NotInDataset.nonEmpty) {
DataSynchronizationFailed(s"The following columns were not found in the first dataset: " +
DatasetMatchFailed(s"The following columns were not found in the first dataset: " +
s"${nonKeyColumns1NotInDataset.mkString(", ")}")
} else if (nonKeyColumns2NotInDataset.nonEmpty) {
DataSynchronizationFailed(s"The following columns were not found in the second dataset: " +
DatasetMatchFailed(s"The following columns were not found in the second dataset: " +
s"${nonKeyColumns2NotInDataset.mkString(", ")}")
} else {
val mergedMaps = colKeyMap ++ compCols
finalAssertion(ds1, ds2, mergedMaps, assertion)
}
} else {
DataSynchronizationFailed(keyColumnErrors.get)
DatasetMatchFailed(keyColumnErrors.get)
}
}

Expand All @@ -157,23 +157,23 @@ object DataSynchronization extends ComparisonBase {
colKeyMap: Map[String, String],
optionalCompCols: Option[Map[String, String]] = None,
optionalOutcomeColumnName: Option[String] = None):
Either[DataSynchronizationFailed, DataFrame] = {
Either[DatasetMatchFailed, DataFrame] = {
val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap)
if (columnErrors.isEmpty) {
val compColsEither: Either[DataSynchronizationFailed, Map[String, String]] = if (optionalCompCols.isDefined) {
val compColsEither: Either[DatasetMatchFailed, Map[String, String]] = if (optionalCompCols.isDefined) {
optionalCompCols.get match {
case compCols if compCols.isEmpty => Left(DataSynchronizationFailed("Empty column comparison map provided."))
case compCols if compCols.isEmpty => Left(DatasetMatchFailed("Empty column comparison map provided."))
case compCols =>
val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _))
val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _))
if (ds1CompColsNotInDataset.nonEmpty) {
Left(
DataSynchronizationFailed(s"The following columns were not found in the first dataset: " +
DatasetMatchFailed(s"The following columns were not found in the first dataset: " +
s"${ds1CompColsNotInDataset.mkString(", ")}")
)
} else if (ds2CompColsNotInDataset.nonEmpty) {
Left(
DataSynchronizationFailed(s"The following columns were not found in the second dataset: " +
DatasetMatchFailed(s"The following columns were not found in the second dataset: " +
s"${ds2CompColsNotInDataset.mkString(", ")}")
)
} else {
Expand All @@ -186,7 +186,7 @@ object DataSynchronization extends ComparisonBase {
val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _))

if (!nonKeyColsMatch) {
Left(DataSynchronizationFailed("Non key columns in the given data frames do not match."))
Left(DatasetMatchFailed("Non key columns in the given data frames do not match."))
} else {
Right(ds1NonKeyCols.map { c => c -> c}.toMap)
}
Expand All @@ -198,11 +198,11 @@ object DataSynchronization extends ComparisonBase {
case Success(df) => Right(df)
case Failure(ex) =>
ex.printStackTrace()
Left(DataSynchronizationFailed(s"Comparison failed due to ${ex.getCause.getClass}"))
Left(DatasetMatchFailed(s"Comparison failed due to ${ex.getCause.getClass}"))
}
}
} else {
Left(DataSynchronizationFailed(columnErrors.get))
Left(DatasetMatchFailed(columnErrors.get))
}
}

Expand Down Expand Up @@ -255,7 +255,7 @@ object DataSynchronization extends ComparisonBase {
val ds2Count = ds2.count()

if (ds1Count != ds2Count) {
DataSynchronizationFailed(s"The row counts of the two data frames do not match.")
DatasetMatchFailed(s"The row counts of the two data frames do not match.")
} else {
val joinExpression: Column = mergedMaps
.map { case (col1, col2) => ds1(col1) === ds2(col2)}
Expand All @@ -267,9 +267,9 @@ object DataSynchronization extends ComparisonBase {
val ratio = passedCount.toDouble / totalCount.toDouble

if (assertion(ratio)) {
DataSynchronizationSucceeded(passedCount, totalCount)
DatasetMatchSucceeded(passedCount, totalCount)
} else {
DataSynchronizationFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" +
DatasetMatchFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" +
s"requirement.", Some(passedCount), Some(totalCount))
}
}
Expand Down
Loading