Skip to content

Commit

Permalink
review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
VenkataKarthikP committed Jan 28, 2024
1 parent 1a750c1 commit 312cc3b
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ import scala.util.Try
* @param columnMappings A map where each key-value pair represents a column in the primary DataFrame
* and its corresponding column in dfToCompare.
* @param matchColumnMappings A map defining the column correlations between the current DataFrame and otherDf.
* These are the columns which we will check for equality, post joining. It's an optional value
* with defaults to None.
* These are the columns which we will check for equality, post joining.
* It's an optional value with defaults to None.
* @param assertion A function that takes a Double (the match ratio) and returns a Boolean.
* It defines the condition for successful synchronization.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand All @@ -19,19 +19,17 @@ package com.amazon.deequ.analyzers
/**
* Represents the state of datasetMatch between two DataFrames in Deequ.
* This state keeps track of the count of matched record count and the total record count.
* It is used to calculate a ratio of match, which is a measure of how well the data
* in the two DataFrames are matched.
* It measures how well the data in the two DataFrames matches.
*
* @param matchedDataCount The count of records that are considered match between the two DataFrames.
* @param totalDataCount The total count of records for check.
*
* The `sum` method allows for aggregation of this state with another, combining the counts from both states.
* This is useful in distributed computations where states from different partitions need to be aggregated.
*
* The `metricValue` method computes the synchronization ratio. It is the ratio of `matchedDataCount`
* to `dataCount`.
* If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN`
* to indicate the undefined state.
* The `metricValue` method computes the synchronization ratio. It is the ratio of `matchedDataCount` to `dataCount`.
* If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN` to indicate
* the undefined state.
*
*/
case class DatasetMatchState(matchedDataCount: Long, totalDataCount: Long)
Expand Down
35 changes: 17 additions & 18 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand Down Expand Up @@ -368,7 +368,7 @@ case class Check(
* val assertionFunction: Double => Boolean = _ > 0.7
*
* val check = new Check(CheckLevel.Error, "Data Synchronization Check")
* .isDatasetMatched(otherDataFrame, columnMappings, assertionFunction)
* .doesDatasetMatch(otherDataFrame, columnMappings, assertionFunction)
*
* val verificationResult = VerificationSuite()
* .onData(baseDataFrame)
Expand All @@ -379,30 +379,29 @@ case class Check(
* This will add a dataset match check to the VerificationSuite, comparing the specified columns of
* baseDataFrame and otherDataFrame based on the provided assertion function.
*
* @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the
* current DataFrame to assess data synchronization.
* @param otherDataset The DataFrame to be compared with the current one. Analyzed in conjunction with the
* current DataFrame to assess data synchronization.
* @param keyColumnMappings A map defining the column correlations between the current DataFrame and otherDf.
* Keys represent column names in the current DataFrame,
* and values are corresponding column names in otherDf.
* @param assertion A function that takes a Double (result of the comparison) and returns a Boolean.
* Defines the condition under which the data in both DataFrames is considered synchronized.
* For example (_ > 0.7) denoting metric value > 0.7 or 70% of records.
* Keys represent column names in the current DataFrame, and values are corresponding
* column names in otherDf.
* @param assertion A function that takes a Double (result of the comparison) and returns a Boolean. Defines the
* condition under which the data in both DataFrames is considered synchronized. For example
* (_ > 0.7) denoting metric value > 0.7 or 70% of records.
* @param matchColumnMappings A map defining the column correlations between the current DataFrame and otherDf.
* These are the columns which we will check for equality, post joining. It's an optional value
* with defaults to None, which will be derived from `keyColumnMappings` if None.
* @param hint Optional. Additional context or information about the synchronization check.
* Helpful for understanding the intent or specifics of the check. Default is None.
* @return A [[com.amazon.deequ.checks.Check]] object representing the outcome
* of the dataset match check. This object can be used in Deequ's verification suite to
* assert data quality constraints.
* These are the columns which we will check for equality, post joining. It's an optional
* value with defaults to None, which will be derived from `keyColumnMappings` if None.
* @param hint Optional. Additional context or information about the synchronization check.
* Helpful for understanding the intent or specifics of the check. Default is None.
* @return A [[com.amazon.deequ.checks.Check]] object representing the outcome of the dataset match check.
* This object can be used in Deequ's verification suite to assert data quality constraints.
*
*/
def isDatasetMatched(otherDf: DataFrame,
def doesDatasetMatch(otherDataset: DataFrame,
keyColumnMappings: Map[String, String],
assertion: Double => Boolean,
matchColumnMappings: Option[Map[String, String]] = None,
hint: Option[String] = None): Check = {
val dataMatchAnalyzer = DatasetMatchAnalyzer(otherDf, keyColumnMappings, assertion, matchColumnMappings)
val dataMatchAnalyzer = DatasetMatchAnalyzer(otherDataset, keyColumnMappings, assertion, matchColumnMappings)
val constraint = AnalysisBasedConstraint[DatasetMatchState, Double, Double](dataMatchAnalyzer, assertion,
hint = hint)
addConstraint(constraint)
Expand Down
36 changes: 18 additions & 18 deletions src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand Down Expand Up @@ -812,7 +812,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
.hasCompleteness("fake", x => x > 0)

val checkHasDataInSyncTest = Check(CheckLevel.Error, "shouldSucceedForAge")
.isDatasetMatched(df, Map("age" -> "age"), _ > 0.99, hint = Some("shouldPass"))
.doesDatasetMatch(df, Map("age" -> "age"), _ > 0.99, hint = Some("shouldPass"))

val verificationResult = VerificationSuite()
.onData(df)
Expand Down Expand Up @@ -994,30 +994,30 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val dfColRenamed = df.withColumnRenamed("id", "id_renamed")

val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check pass")
.isDatasetMatched(dfModified, Map("id" -> "id"), _ > 0.7, hint = Some("shouldPass"))
.doesDatasetMatch(dfModified, Map("id" -> "id"), _ > 0.7, hint = Some("shouldPass"))

val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check fail")
.isDatasetMatched(dfModified, Map("id" -> "id"), _ > 0.9, hint = Some("shouldFail"))
.doesDatasetMatch(dfModified, Map("id" -> "id"), _ > 0.9, hint = Some("shouldFail"))

val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema)
val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame")
.isDatasetMatched(emptyDf, Map("id" -> "id"), _ < 0.5)
.doesDatasetMatch(emptyDf, Map("id" -> "id"), _ < 0.5)

val dataSyncCheckColMismatchDestination =
Check(CheckLevel.Error, "data synchronization check col mismatch in destination")
.isDatasetMatched(dfModified, Map("id" -> "id2"), _ < 0.5)
.doesDatasetMatch(dfModified, Map("id" -> "id2"), _ < 0.5)

val dataSyncCheckColMismatchSource =
Check(CheckLevel.Error, "data synchronization check col mismatch in source")
.isDatasetMatched(dfModified, Map("id2" -> "id"), _ < 0.5)
.doesDatasetMatch(dfModified, Map("id2" -> "id"), _ < 0.5)

val dataSyncCheckColRenamed =
Check(CheckLevel.Error, "data synchronization check col names renamed")
.isDatasetMatched(dfColRenamed, Map("id" -> "id_renamed"), _ == 1.0)
.doesDatasetMatch(dfColRenamed, Map("id" -> "id_renamed"), _ == 1.0)

val dataSyncFullMatch =
Check(CheckLevel.Error, "data synchronization check full match")
.isDatasetMatched(df, Map("id" -> "id"), _ == 1.0)
.doesDatasetMatch(df, Map("id" -> "id"), _ == 1.0)


val verificationResult = VerificationSuite()
Expand Down Expand Up @@ -1080,40 +1080,40 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
val matchColMap = Map("product" -> "product")
val dataSyncCheckWithMatchColumns = Check(CheckLevel.Error,
"data synchronization check with matchColumnMappings")
.isDatasetMatched(df, colMap, _ > 0.7, Some(matchColMap),
.doesDatasetMatch(df, colMap, _ > 0.7, Some(matchColMap),
hint = Some("Check with matchColumnMappings"))

val dataSyncCheckWithAdditionalCols = Check(CheckLevel.Error,
"data synchronization check with additional columns")
.isDatasetMatched(dfWithAdditionalColumns, colMap, _ > 0.7, Some(matchColMap),
.doesDatasetMatch(dfWithAdditionalColumns, colMap, _ > 0.7, Some(matchColMap),
hint = Some("Check with additional columns and matchColumnMappings"))

val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check")
.isDatasetMatched(dfModified, colMap, _ > 0.7, hint = Some("shouldPass"))
.doesDatasetMatch(dfModified, colMap, _ > 0.7, hint = Some("shouldPass"))

val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check")
.isDatasetMatched(dfModified, colMap, _ > 0.9, hint = Some("shouldFail"))
.doesDatasetMatch(dfModified, colMap, _ > 0.9, hint = Some("shouldFail"))

val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema)
val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame")
.isDatasetMatched(emptyDf, colMap, _ < 0.5)
.doesDatasetMatch(emptyDf, colMap, _ < 0.5)

val dataSyncCheckColMismatchDestination =
Check(CheckLevel.Error, "data synchronization check col mismatch in destination")
.isDatasetMatched(dfModified, colMap, _ > 0.9)
.doesDatasetMatch(dfModified, colMap, _ > 0.9)

val dataSyncCheckColMismatchSource =
Check(CheckLevel.Error, "data synchronization check col mismatch in source")
.isDatasetMatched(dfModified, Map("id2" -> "id", "product" -> "product"), _ < 0.5)
.doesDatasetMatch(dfModified, Map("id2" -> "id", "product" -> "product"), _ < 0.5)

val dataSyncCheckColRenamed =
Check(CheckLevel.Error, "data synchronization check col names renamed")
.isDatasetMatched(dfColRenamed, Map("id" -> "id_renamed", "product" -> "product"), _ == 1.0,
.doesDatasetMatch(dfColRenamed, Map("id" -> "id_renamed", "product" -> "product"), _ == 1.0,
hint = Some("shouldPass"))

val dataSyncFullMatch =
Check(CheckLevel.Error, "data synchronization check col full match")
.isDatasetMatched(df, colMap, _ == 1, hint = Some("shouldPass"))
.doesDatasetMatch(df, colMap, _ == 1, hint = Some("shouldPass"))


val verificationResult = VerificationSuite()
Expand Down
26 changes: 13 additions & 13 deletions src/test/scala/com/amazon/deequ/checks/CheckTest.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
* use this file except in compliance with the License. A copy of the License
Expand Down Expand Up @@ -1129,13 +1129,13 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession)

val check = Check(CheckLevel.Error, "must have data in sync")
.isDatasetMatched(dfInformative, colMapAtt1, _ > 0.9, hint = Some("show be in sync"))
.doesDatasetMatch(dfInformative, colMapAtt1, _ > 0.9, hint = Some("show be in sync"))
val context = runChecks(dfInformative, check)

assertSuccess(check, context)

val check2 = Check(CheckLevel.Error, "must have data in sync")
.isDatasetMatched(dfInformative, colMapAtt1, _ > 0.9, Some(colMapAtt1), Some("show be in sync with match col"))
.doesDatasetMatch(dfInformative, colMapAtt1, _ > 0.9, Some(colMapAtt1), Some("show be in sync with match col"))
val context2 = runChecks(dfInformative, check2)

assertSuccess(check2, context2)
Expand All @@ -1146,7 +1146,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed")

val check = Check(CheckLevel.Error, "must fail as columns does not exist")
.isDatasetMatched(dfInformativeRenamed, colMapAtt1, _ > 0.9,
.doesDatasetMatch(dfInformativeRenamed, colMapAtt1, _ > 0.9,
hint = Some("must fail as columns does not exist"))
val context = runChecks(dfInformative, check)
assertEvaluatesTo(check, context, CheckStatus.Error)
Expand All @@ -1158,7 +1158,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val dfInformativeFiltered = dfInformative.filter("att1 > 2")

val check = Check(CheckLevel.Error, "must fail as columns does not exist")
.isDatasetMatched(dfInformativeFiltered, colMapAtt1, _ > 0.9,
.doesDatasetMatch(dfInformativeFiltered, colMapAtt1, _ > 0.9,
hint = Some("must fail as columns does not exist"))
val context = runChecks(dfInformative, check)
assertEvaluatesTo(check, context, CheckStatus.Error)
Expand All @@ -1170,7 +1170,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
.otherwise(col("att1")))

val check = Check(CheckLevel.Error, "must fail as rows mismatches")
.isDatasetMatched(modifiedDf, colMapAtt1, _ > 0.9, hint = Some("must fail as rows mismatches"))
.doesDatasetMatch(modifiedDf, colMapAtt1, _ > 0.9, hint = Some("must fail as rows mismatches"))
val context = runChecks(df, check)
assertEvaluatesTo(check, context, CheckStatus.Error)

Expand All @@ -1182,7 +1182,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
.otherwise(col("att1")))

val check = Check(CheckLevel.Error, "must be success as rows count mismatches at assertion 0.6")
.isDatasetMatched(modifiedDf, colMapAtt1, _ > 0.6,
.doesDatasetMatch(modifiedDf, colMapAtt1, _ > 0.6,
hint = Some("must be success as rows count mismatches at assertion 0.6"))
val context = runChecks(df, check)
assertSuccess(check, context)
Expand All @@ -1193,7 +1193,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession)

val check = Check(CheckLevel.Error, "must have data in sync")
.isDatasetMatched(dfInformative, colMapTwoCols, _ > 0.9, hint = Some("show be in sync"))
.doesDatasetMatch(dfInformative, colMapTwoCols, _ > 0.9, hint = Some("show be in sync"))
val context = runChecks(dfInformative, check)

assertSuccess(check, context)
Expand All @@ -1204,7 +1204,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession)

val check = Check(CheckLevel.Error, "must have data in sync")
.isDatasetMatched(dfInformative, colMapTwoCols, _ > 0.9, Some(colMapAtt1), hint = Some("show be in sync"))
.doesDatasetMatch(dfInformative, colMapTwoCols, _ > 0.9, Some(colMapAtt1), hint = Some("show be in sync"))
val context = runChecks(dfInformative, check)

assertSuccess(check, context)
Expand All @@ -1216,7 +1216,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed")

val check = Check(CheckLevel.Error, "must fail as columns does not exist")
.isDatasetMatched(dfInformativeRenamed, colMapTwoCols, _ > 0.9,
.doesDatasetMatch(dfInformativeRenamed, colMapTwoCols, _ > 0.9,
hint = Some("must fail as columns does not exist"))
val context = runChecks(dfInformative, check)

Expand All @@ -1228,7 +1228,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
val dfInformativeFiltered = dfInformative.filter("att1 > 2")

val check = Check(CheckLevel.Error, "must fail as columns does not exist")
.isDatasetMatched(dfInformativeFiltered, colMapTwoCols, _ > 0.9,
.doesDatasetMatch(dfInformativeFiltered, colMapTwoCols, _ > 0.9,
hint = Some("must fail as columns does not exist"))
val context = runChecks(dfInformative, check)

Expand All @@ -1241,7 +1241,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
.otherwise(col("att1")))

val check = Check(CheckLevel.Error, "must fail as rows mismatches")
.isDatasetMatched(modifiedDf, colMapTwoCols, _ > 0.9, hint = Some("must fail as rows mismatches"))
.doesDatasetMatch(modifiedDf, colMapTwoCols, _ > 0.9, hint = Some("must fail as rows mismatches"))
val context = runChecks(df, check)

assertEvaluatesTo(check, context, CheckStatus.Error)
Expand All @@ -1254,7 +1254,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
.otherwise(col("att1")))

val check = Check(CheckLevel.Error, "must be success as metric value is 0.66")
.isDatasetMatched(modifiedDf, colMapTwoCols, _ > 0.6,
.doesDatasetMatch(modifiedDf, colMapTwoCols, _ > 0.6,
hint = Some("must be success as metric value is 0.66"))
val context = runChecks(df, check)

Expand Down

0 comments on commit 312cc3b

Please sign in to comment.