From b2e8065174c587e2729dbc85b009159ffb160a41 Mon Sep 17 00:00:00 2001 From: penikala <11186040+VenkataKarthikP@users.noreply.github.com> Date: Wed, 14 Feb 2024 07:20:04 -0800 Subject: [PATCH] support col match and change to DatasetMatch (#529) * update col match and other improvements * review comments * review comments --- .../analyzers/DataSynchronizationState.scala | 48 ------------ ...lyzer.scala => DatasetMatchAnalyzer.scala} | 54 ++++++++------ .../deequ/analyzers/DatasetMatchState.scala | 46 ++++++++++++ .../scala/com/amazon/deequ/checks/Check.scala | 58 ++++++++------- .../deequ/comparison/ComparisonResult.scala | 6 +- .../comparison/DataSynchronization.scala | 32 ++++---- .../amazon/deequ/constraints/Constraint.scala | 6 +- .../amazon/deequ/VerificationSuiteTest.scala | 62 +++++++++++----- .../com/amazon/deequ/checks/CheckTest.scala | 74 +++++++++++++------ .../comparison/DataSynchronizationTest.scala | 52 ++++++------- 10 files changed, 253 insertions(+), 185 deletions(-) delete mode 100644 src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala rename src/main/scala/com/amazon/deequ/analyzers/{DataSynchronizationAnalyzer.scala => DatasetMatchAnalyzer.scala} (51%) create mode 100644 src/main/scala/com/amazon/deequ/analyzers/DatasetMatchState.scala diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala b/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala deleted file mode 100644 index e0321df3..00000000 --- a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationState.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not - * use this file except in compliance with the License. A copy of the License - * is located at - * - * http://aws.amazon.com/apache2.0/ - * - * or in the "license" file accompanying this file. This file is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.deequ.analyzers - -/** - * Represents the state of data synchronization between two DataFrames in Deequ. - * This state keeps track of the count of synchronized record count and the total record count. - * It is used to calculate a ratio of synchronization, which is a measure of how well the data - * in the two DataFrames are synchronized. - * - * @param synchronizedDataCount The count of records that are considered synchronized between the two DataFrames. - * @param totalDataCount The total count of records for check. - * - * The `sum` method allows for aggregation of this state with another, combining the counts from both states. - * This is useful in distributed computations where states from different partitions need to be aggregated. - * - * The `metricValue` method computes the synchronization ratio. It is the ratio of `synchronizedDataCount` - * to `dataCount`. - * If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN` - * to indicate the undefined state. - * - */ -case class DataSynchronizationState(synchronizedDataCount: Long, totalDataCount: Long) - extends DoubleValuedState[DataSynchronizationState] { - override def sum(other: DataSynchronizationState): DataSynchronizationState = { - DataSynchronizationState(synchronizedDataCount + other.synchronizedDataCount, totalDataCount + other.totalDataCount) - } - - override def metricValue(): Double = { - if (totalDataCount == 0L) Double.NaN else synchronizedDataCount.toDouble / totalDataCount.toDouble - } -} - -object DataSynchronizationState diff --git a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchAnalyzer.scala similarity index 51% rename from src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala rename to src/main/scala/com/amazon/deequ/analyzers/DatasetMatchAnalyzer.scala index 1d7e3753..cdf0e506 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/DataSynchronizationAnalyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchAnalyzer.scala @@ -18,8 +18,8 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers.metricFromFailure import com.amazon.deequ.comparison.DataSynchronization -import com.amazon.deequ.comparison.DataSynchronizationFailed -import com.amazon.deequ.comparison.DataSynchronizationSucceeded +import com.amazon.deequ.comparison.DatasetMatchFailed +import com.amazon.deequ.comparison.DatasetMatchSucceeded import com.amazon.deequ.metrics.DoubleMetric import com.amazon.deequ.metrics.Entity import org.apache.spark.sql.DataFrame @@ -29,59 +29,67 @@ import scala.util.Try /** - * An Analyzer for Deequ that performs a data synchronization check between two DataFrames. - * It evaluates the degree of synchronization based on specified column mappings and an assertion function. + * An Analyzer for Deequ that performs a dataset match check between two DataFrames. + * It evaluates the degree of match based on specified column mappings and an assertion function. * - * The analyzer computes a ratio of synchronized data points to the total data points, represented as a DoubleMetric. - * Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for DataSynchronization implementation + * The analyzer computes a ratio of matched data points to the total data points, represented as a DoubleMetric. + * Refer to [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] for dataset match implementation * * @param dfToCompare The DataFrame to compare with the primary DataFrame that is setup * during [[com.amazon.deequ.VerificationSuite.onData]] setup. * @param columnMappings A map where each key-value pair represents a column in the primary DataFrame * and its corresponding column in dfToCompare. + * @param matchColumnMappings A map defining the column correlations between the current DataFrame and otherDf. + * These are the columns which we will check for equality, post joining. + * It's an optional value with defaults to None. * @param assertion A function that takes a Double (the match ratio) and returns a Boolean. * It defines the condition for successful synchronization. * * Usage: - * This analyzer is used in Deequ's VerificationSuite based if `isDataSynchronized` check is defined or could be used + * This analyzer is used in Deequ's VerificationSuite based if `doesDatasetMatch` check is defined or could be used * manually as well. * * Example: - * val analyzer = DataSynchronizationAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8) + * val analyzer = DatasetMatchAnalyzer(dfToCompare, Map("col1" -> "col2"), _ > 0.8) * val verificationResult = VerificationSuite().onData(df).addAnalyzer(analyzer).run() * * // or could do something like below - * val verificationResult = VerificationSuite().onData(df).isDataSynchronized(dfToCompare, Map("col1" -> "col2"), + * val verificationResult = VerificationSuite().onData(df).doesDatasetMatch(dfToCompare, Map("col1" -> "col2"), * _ > 0.8).run() * * - * The computeStateFrom method calculates the synchronization state by comparing the specified columns of the two + * The computeStateFrom method calculates the datasetmatch state by comparing the specified columns of the two * DataFrames. - * The computeMetricFrom method then converts this state into a DoubleMetric representing the synchronization ratio. + * The computeMetricFrom method then converts this state into a DoubleMetric representing the match ratio. * */ -case class DataSynchronizationAnalyzer(dfToCompare: DataFrame, - columnMappings: Map[String, String], - assertion: Double => Boolean) - extends Analyzer[DataSynchronizationState, DoubleMetric] { +case class DatasetMatchAnalyzer(dfToCompare: DataFrame, + columnMappings: Map[String, String], + assertion: Double => Boolean, + matchColumnMappings: Option[Map[String, String]] = None) + extends Analyzer[DatasetMatchState, DoubleMetric] { - override def computeStateFrom(data: DataFrame): Option[DataSynchronizationState] = { + override def computeStateFrom(data: DataFrame): Option[DatasetMatchState] = { - val result = DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion) + val result = if (matchColumnMappings.isDefined) { + DataSynchronization.columnMatch(data, dfToCompare, columnMappings, matchColumnMappings.get, assertion) + } else { + DataSynchronization.columnMatch(data, dfToCompare, columnMappings, assertion) + } result match { - case succeeded: DataSynchronizationSucceeded => - Some(DataSynchronizationState(succeeded.passedCount, succeeded.totalCount)) - case failed: DataSynchronizationFailed => - Some(DataSynchronizationState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0))) + case succeeded: DatasetMatchSucceeded => + Some(DatasetMatchState(succeeded.passedCount, succeeded.totalCount)) + case failed: DatasetMatchFailed => + Some(DatasetMatchState(failed.passedCount.getOrElse(0), failed.totalCount.getOrElse(0))) case _ => None } } - override def computeMetricFrom(state: Option[DataSynchronizationState]): DoubleMetric = { + override def computeMetricFrom(state: Option[DatasetMatchState]): DoubleMetric = { val metric = state match { - case Some(s) => Try(s.synchronizedDataCount.toDouble / s.totalDataCount.toDouble) + case Some(s) => Try(s.matchedDataCount.toDouble / s.totalDataCount.toDouble) case _ => Failure(new IllegalStateException("No state available for DataSynchronizationAnalyzer")) } diff --git a/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchState.scala b/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchState.scala new file mode 100644 index 00000000..9e1c45e9 --- /dev/null +++ b/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchState.scala @@ -0,0 +1,46 @@ +/** + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.analyzers + +/** + * Represents the state of datasetMatch between two DataFrames in Deequ. + * This state keeps track of the count of matched record count and the total record count. + * It measures how well the data in the two DataFrames matches. + * + * @param matchedDataCount The count of records that are considered match between the two DataFrames. + * @param totalDataCount The total count of records for check. + * + * The `sum` method allows for aggregation of this state with another, combining the counts from both states. + * This is useful in distributed computations where states from different partitions need to be aggregated. + * + * The `metricValue` method computes the synchronization ratio. It is the ratio of `matchedDataCount` to `dataCount`. + * If `dataCount` is zero, which means no data points were examined, the method returns `Double.NaN` to indicate + * the undefined state. + * + */ +case class DatasetMatchState(matchedDataCount: Long, totalDataCount: Long) + extends DoubleValuedState[DatasetMatchState] { + override def sum(other: DatasetMatchState): DatasetMatchState = { + DatasetMatchState(matchedDataCount + other.matchedDataCount, totalDataCount + other.totalDataCount) + } + + override def metricValue(): Double = { + if (totalDataCount == 0L) Double.NaN else matchedDataCount.toDouble / totalDataCount.toDouble + } +} + +object DatasetMatchState diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 9f6f6ea0..88404146 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -1,5 +1,5 @@ /** - * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -19,8 +19,8 @@ package com.amazon.deequ.checks import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.analyzers.Analyzer import com.amazon.deequ.analyzers.AnalyzerOptions -import com.amazon.deequ.analyzers.DataSynchronizationAnalyzer -import com.amazon.deequ.analyzers.DataSynchronizationState +import com.amazon.deequ.analyzers.DatasetMatchAnalyzer +import com.amazon.deequ.analyzers.DatasetMatchState import com.amazon.deequ.analyzers.Histogram import com.amazon.deequ.analyzers.KLLParameters import com.amazon.deequ.analyzers.Patterns @@ -351,13 +351,13 @@ case class Check( } /** - * Performs a data synchronization check between the base DataFrame supplied to + * Performs a dataset check between the base DataFrame supplied to * [[com.amazon.deequ.VerificationSuite.onData]] and other DataFrame supplied to this check using Deequ's * [[com.amazon.deequ.comparison.DataSynchronization.columnMatch]] framework. - * This method compares specified columns of both DataFrames and assesses synchronization based on a custom assertion. + * This method compares specified columns of both DataFrames and assesses match based on a custom assertion. * - * Utilizes [[com.amazon.deequ.analyzers.DataSynchronizationAnalyzer]] for comparing the data - * and Constraint [[com.amazon.deequ.constraints.DataSynchronizationConstraint]]. + * Utilizes [[com.amazon.deequ.analyzers.DatasetMatchAnalyzer]] for comparing the data + * and Constraint [[com.amazon.deequ.constraints.DatasetMatchConstraint]]. * * Usage: * To use this method, create a VerificationSuite and invoke this method as part of adding checks: @@ -368,7 +368,7 @@ case class Check( * val assertionFunction: Double => Boolean = _ > 0.7 * * val check = new Check(CheckLevel.Error, "Data Synchronization Check") - * .isDataSynchronized(otherDataFrame, columnMappings, assertionFunction) + * .doesDatasetMatch(otherDataFrame, columnMappings, assertionFunction) * * val verificationResult = VerificationSuite() * .onData(baseDataFrame) @@ -376,29 +376,33 @@ case class Check( * .run() * }}} * - * This will add a data synchronization check to the VerificationSuite, comparing the specified columns of + * This will add a dataset match check to the VerificationSuite, comparing the specified columns of * baseDataFrame and otherDataFrame based on the provided assertion function. * - * - * @param otherDf The DataFrame to be compared with the current one. Analyzed in conjunction with the - * current DataFrame to assess data synchronization. - * @param columnMappings A map defining the column correlations between the current DataFrame and otherDf. - * Keys represent column names in the current DataFrame, - * and values are corresponding column names in otherDf. - * @param assertion A function that takes a Double (result of the comparison) and returns a Boolean. - * Defines the condition under which the data in both DataFrames is considered synchronized. - * For example (_ > 0.7) denoting metric value > 0.7 or 70% of records. - * @param hint Optional. Additional context or information about the synchronization check. - * Helpful for understanding the intent or specifics of the check. Default is None. - * @return A [[com.amazon.deequ.checks.Check]] object representing the outcome - * of the synchronization check. This object can be used in Deequ's verification suite to - * assert data quality constraints. + * @param otherDataset The DataFrame to be compared with the current one. Analyzed in conjunction with the + * current DataFrame to assess data synchronization. + * @param keyColumnMappings A map defining the column correlations between the current DataFrame and otherDf. + * Keys represent column names in the current DataFrame, and values are corresponding + * column names in otherDf. + * @param assertion A function that takes a Double (result of the comparison) and returns a Boolean. Defines the + * condition under which the data in both DataFrames is considered synchronized. For example + * (_ > 0.7) denoting metric value > 0.7 or 70% of records. + * @param matchColumnMappings A map defining the column correlations between the current DataFrame and otherDf. + * These are the columns which we will check for equality, post joining. It's an optional + * value with defaults to None, which will be derived from `keyColumnMappings` if None. + * @param hint Optional. Additional context or information about the synchronization check. + * Helpful for understanding the intent or specifics of the check. Default is None. + * @return A [[com.amazon.deequ.checks.Check]] object representing the outcome of the dataset match check. + * This object can be used in Deequ's verification suite to assert data quality constraints. * */ - def isDataSynchronized(otherDf: DataFrame, columnMappings: Map[String, String], assertion: Double => Boolean, - hint: Option[String] = None): Check = { - val dataSyncAnalyzer = DataSynchronizationAnalyzer(otherDf, columnMappings, assertion) - val constraint = AnalysisBasedConstraint[DataSynchronizationState, Double, Double](dataSyncAnalyzer, assertion, + def doesDatasetMatch(otherDataset: DataFrame, + keyColumnMappings: Map[String, String], + assertion: Double => Boolean, + matchColumnMappings: Option[Map[String, String]] = None, + hint: Option[String] = None): Check = { + val dataMatchAnalyzer = DatasetMatchAnalyzer(otherDataset, keyColumnMappings, assertion, matchColumnMappings) + val constraint = AnalysisBasedConstraint[DatasetMatchState, Double, Double](dataMatchAnalyzer, assertion, hint = hint) addConstraint(constraint) } diff --git a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala index 67b4d4b4..643fb036 100644 --- a/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala +++ b/src/main/scala/com/amazon/deequ/comparison/ComparisonResult.scala @@ -21,6 +21,6 @@ sealed trait ComparisonResult case class ComparisonFailed(errorMessage: String, ratio: Double = 0) extends ComparisonResult case class ComparisonSucceeded(ratio: Double = 0) extends ComparisonResult -case class DataSynchronizationFailed(errorMessage: String, passedCount: Option[Long] = None, - totalCount: Option[Long] = None) extends ComparisonResult -case class DataSynchronizationSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult +case class DatasetMatchFailed(errorMessage: String, passedCount: Option[Long] = None, + totalCount: Option[Long] = None) extends ComparisonResult +case class DatasetMatchSucceeded(passedCount: Long, totalCount: Long) extends ComparisonResult diff --git a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala index 992dc48d..de207823 100644 --- a/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala +++ b/src/main/scala/com/amazon/deequ/comparison/DataSynchronization.scala @@ -102,13 +102,13 @@ object DataSynchronization extends ComparisonBase { val nonKeyColsMatch = colsDS1.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { - DataSynchronizationFailed("Non key columns in the given data frames do not match.") + DatasetMatchFailed("Non key columns in the given data frames do not match.") } else { val mergedMaps = colKeyMap ++ colsDS1.map(x => x -> x).toMap finalAssertion(ds1, ds2, mergedMaps, assertion) } } else { - DataSynchronizationFailed(columnErrors.get) + DatasetMatchFailed(columnErrors.get) } } @@ -138,17 +138,17 @@ object DataSynchronization extends ComparisonBase { val nonKeyColumns2NotInDataset = compCols.values.filterNot(columnExists(ds2, _)) if (nonKeyColumns1NotInDataset.nonEmpty) { - DataSynchronizationFailed(s"The following columns were not found in the first dataset: " + + DatasetMatchFailed(s"The following columns were not found in the first dataset: " + s"${nonKeyColumns1NotInDataset.mkString(", ")}") } else if (nonKeyColumns2NotInDataset.nonEmpty) { - DataSynchronizationFailed(s"The following columns were not found in the second dataset: " + + DatasetMatchFailed(s"The following columns were not found in the second dataset: " + s"${nonKeyColumns2NotInDataset.mkString(", ")}") } else { val mergedMaps = colKeyMap ++ compCols finalAssertion(ds1, ds2, mergedMaps, assertion) } } else { - DataSynchronizationFailed(keyColumnErrors.get) + DatasetMatchFailed(keyColumnErrors.get) } } @@ -157,23 +157,23 @@ object DataSynchronization extends ComparisonBase { colKeyMap: Map[String, String], optionalCompCols: Option[Map[String, String]] = None, optionalOutcomeColumnName: Option[String] = None): - Either[DataSynchronizationFailed, DataFrame] = { + Either[DatasetMatchFailed, DataFrame] = { val columnErrors = areKeyColumnsValid(ds1, ds2, colKeyMap) if (columnErrors.isEmpty) { - val compColsEither: Either[DataSynchronizationFailed, Map[String, String]] = if (optionalCompCols.isDefined) { + val compColsEither: Either[DatasetMatchFailed, Map[String, String]] = if (optionalCompCols.isDefined) { optionalCompCols.get match { - case compCols if compCols.isEmpty => Left(DataSynchronizationFailed("Empty column comparison map provided.")) + case compCols if compCols.isEmpty => Left(DatasetMatchFailed("Empty column comparison map provided.")) case compCols => val ds1CompColsNotInDataset = compCols.keys.filterNot(columnExists(ds1, _)) val ds2CompColsNotInDataset = compCols.values.filterNot(columnExists(ds2, _)) if (ds1CompColsNotInDataset.nonEmpty) { Left( - DataSynchronizationFailed(s"The following columns were not found in the first dataset: " + + DatasetMatchFailed(s"The following columns were not found in the first dataset: " + s"${ds1CompColsNotInDataset.mkString(", ")}") ) } else if (ds2CompColsNotInDataset.nonEmpty) { Left( - DataSynchronizationFailed(s"The following columns were not found in the second dataset: " + + DatasetMatchFailed(s"The following columns were not found in the second dataset: " + s"${ds2CompColsNotInDataset.mkString(", ")}") ) } else { @@ -186,7 +186,7 @@ object DataSynchronization extends ComparisonBase { val nonKeyColsMatch = ds1NonKeyCols.forall(columnExists(ds2, _)) if (!nonKeyColsMatch) { - Left(DataSynchronizationFailed("Non key columns in the given data frames do not match.")) + Left(DatasetMatchFailed("Non key columns in the given data frames do not match.")) } else { Right(ds1NonKeyCols.map { c => c -> c}.toMap) } @@ -198,11 +198,11 @@ object DataSynchronization extends ComparisonBase { case Success(df) => Right(df) case Failure(ex) => ex.printStackTrace() - Left(DataSynchronizationFailed(s"Comparison failed due to ${ex.getCause.getClass}")) + Left(DatasetMatchFailed(s"Comparison failed due to ${ex.getCause.getClass}")) } } } else { - Left(DataSynchronizationFailed(columnErrors.get)) + Left(DatasetMatchFailed(columnErrors.get)) } } @@ -255,7 +255,7 @@ object DataSynchronization extends ComparisonBase { val ds2Count = ds2.count() if (ds1Count != ds2Count) { - DataSynchronizationFailed(s"The row counts of the two data frames do not match.") + DatasetMatchFailed(s"The row counts of the two data frames do not match.") } else { val joinExpression: Column = mergedMaps .map { case (col1, col2) => ds1(col1) === ds2(col2)} @@ -267,9 +267,9 @@ object DataSynchronization extends ComparisonBase { val ratio = passedCount.toDouble / totalCount.toDouble if (assertion(ratio)) { - DataSynchronizationSucceeded(passedCount, totalCount) + DatasetMatchSucceeded(passedCount, totalCount) } else { - DataSynchronizationFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" + + DatasetMatchFailed(s"Data Synchronization Comparison Metric Value: $ratio does not meet the constraint" + s"requirement.", Some(passedCount), Some(totalCount)) } } diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 5bb8d477..74070687 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -903,17 +903,17 @@ object Constraint { } /** - * Data Synchronization Constraint + * DatasetMatch Constraint * @param analyzer Data Synchronization Analyzer * @param hint hint */ -case class DataSynchronizationConstraint(analyzer: DataSynchronizationAnalyzer, hint: Option[String]) +case class DatasetMatchConstraint(analyzer: DatasetMatchAnalyzer, hint: Option[String]) extends Constraint { override def evaluate(metrics: Map[Analyzer[_, Metric[_]], Metric[_]]): ConstraintResult = { metrics.collectFirst { - case (_: DataSynchronizationAnalyzer, metric: Metric[Double]) => metric + case (_: DatasetMatchAnalyzer, metric: Metric[Double]) => metric } match { case Some(metric) => val result = metric.value match { diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index e260d2f1..a468b8a3 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -1,5 +1,5 @@ /** - * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -35,6 +35,7 @@ import com.amazon.deequ.utils.TempFileUtils import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.lit import org.apache.spark.sql.functions.when import org.scalamock.scalatest.MockFactory import org.scalatest.Matchers @@ -811,7 +812,7 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .hasCompleteness("fake", x => x > 0) val checkHasDataInSyncTest = Check(CheckLevel.Error, "shouldSucceedForAge") - .isDataSynchronized(df, Map("age" -> "age"), _ > 0.99, Some("shouldPass")) + .doesDatasetMatch(df, Map("age" -> "age"), _ > 0.99, hint = Some("shouldPass")) val verificationResult = VerificationSuite() .onData(df) @@ -993,30 +994,30 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val dfColRenamed = df.withColumnRenamed("id", "id_renamed") val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check pass") - .isDataSynchronized(dfModified, Map("id" -> "id"), _ > 0.7, Some("shouldPass")) + .doesDatasetMatch(dfModified, Map("id" -> "id"), _ > 0.7, hint = Some("shouldPass")) val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check fail") - .isDataSynchronized(dfModified, Map("id" -> "id"), _ > 0.9, Some("shouldFail")) + .doesDatasetMatch(dfModified, Map("id" -> "id"), _ > 0.9, hint = Some("shouldFail")) val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema) val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame") - .isDataSynchronized(emptyDf, Map("id" -> "id"), _ < 0.5) + .doesDatasetMatch(emptyDf, Map("id" -> "id"), _ < 0.5) val dataSyncCheckColMismatchDestination = Check(CheckLevel.Error, "data synchronization check col mismatch in destination") - .isDataSynchronized(dfModified, Map("id" -> "id2"), _ < 0.5) + .doesDatasetMatch(dfModified, Map("id" -> "id2"), _ < 0.5) val dataSyncCheckColMismatchSource = Check(CheckLevel.Error, "data synchronization check col mismatch in source") - .isDataSynchronized(dfModified, Map("id2" -> "id"), _ < 0.5) + .doesDatasetMatch(dfModified, Map("id2" -> "id"), _ < 0.5) val dataSyncCheckColRenamed = Check(CheckLevel.Error, "data synchronization check col names renamed") - .isDataSynchronized(dfColRenamed, Map("id" -> "id_renamed"), _ == 1.0) + .doesDatasetMatch(dfColRenamed, Map("id" -> "id_renamed"), _ == 1.0) val dataSyncFullMatch = Check(CheckLevel.Error, "data synchronization check full match") - .isDataSynchronized(df, Map("id" -> "id"), _ == 1.0) + .doesDatasetMatch(df, Map("id" -> "id"), _ == 1.0) val verificationResult = VerificationSuite() @@ -1073,32 +1074,46 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec val dfColRenamed = df.withColumnRenamed("id", "id_renamed") val colMap = Map("id" -> "id", "product" -> "product") + // Additional DataFrames for testing matchColumnMappings + val dfWithAdditionalColumns = df.withColumn("newColumn", lit(1)) + + val matchColMap = Map("product" -> "product") + val dataSyncCheckWithMatchColumns = Check(CheckLevel.Error, + "data synchronization check with matchColumnMappings") + .doesDatasetMatch(df, colMap, _ > 0.7, Some(matchColMap), + hint = Some("Check with matchColumnMappings")) + + val dataSyncCheckWithAdditionalCols = Check(CheckLevel.Error, + "data synchronization check with additional columns") + .doesDatasetMatch(dfWithAdditionalColumns, colMap, _ > 0.7, Some(matchColMap), + hint = Some("Check with additional columns and matchColumnMappings")) + val dataSyncCheckPass = Check(CheckLevel.Error, "data synchronization check") - .isDataSynchronized(dfModified, colMap, _ > 0.7, Some("shouldPass")) + .doesDatasetMatch(dfModified, colMap, _ > 0.7, hint = Some("shouldPass")) val dataSyncCheckFail = Check(CheckLevel.Error, "data synchronization check") - .isDataSynchronized(dfModified, colMap, _ > 0.9, Some("shouldFail")) + .doesDatasetMatch(dfModified, colMap, _ > 0.9, hint = Some("shouldFail")) val emptyDf = sparkSession.createDataFrame(sparkSession.sparkContext.emptyRDD[Row], df.schema) val dataSyncCheckEmpty = Check(CheckLevel.Error, "data synchronization check on empty DataFrame") - .isDataSynchronized(emptyDf, colMap, _ < 0.5) + .doesDatasetMatch(emptyDf, colMap, _ < 0.5) val dataSyncCheckColMismatchDestination = Check(CheckLevel.Error, "data synchronization check col mismatch in destination") - .isDataSynchronized(dfModified, colMap, _ > 0.9) + .doesDatasetMatch(dfModified, colMap, _ > 0.9) val dataSyncCheckColMismatchSource = Check(CheckLevel.Error, "data synchronization check col mismatch in source") - .isDataSynchronized(dfModified, Map("id2" -> "id", "product" -> "product"), _ < 0.5) + .doesDatasetMatch(dfModified, Map("id2" -> "id", "product" -> "product"), _ < 0.5) val dataSyncCheckColRenamed = Check(CheckLevel.Error, "data synchronization check col names renamed") - .isDataSynchronized(dfColRenamed, Map("id" -> "id_renamed", "product" -> "product"), _ == 1.0, - Some("shouldPass")) + .doesDatasetMatch(dfColRenamed, Map("id" -> "id_renamed", "product" -> "product"), _ == 1.0, + hint = Some("shouldPass")) val dataSyncFullMatch = Check(CheckLevel.Error, "data synchronization check col full match") - .isDataSynchronized(df, colMap, _ == 1, Some("shouldPass")) + .doesDatasetMatch(df, colMap, _ == 1, hint = Some("shouldPass")) val verificationResult = VerificationSuite() @@ -1110,6 +1125,8 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec .addCheck(dataSyncCheckColMismatchSource) .addCheck(dataSyncCheckColRenamed) .addCheck(dataSyncFullMatch) + .addCheck(dataSyncCheckWithMatchColumns) + .addCheck(dataSyncCheckWithAdditionalCols) .run() val passResult = verificationResult.checkResults(dataSyncCheckPass) @@ -1147,6 +1164,17 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec List(None) assert(fullMatchResult.status == CheckStatus.Success) + // Assertions for the new checks + val matchColumnsResult = verificationResult.checkResults(dataSyncCheckWithMatchColumns) + matchColumnsResult.constraintResults.map(_.message) shouldBe + List(None) // or any expected result + assert(matchColumnsResult.status == CheckStatus.Success) // or expected status + + val additionalColsResult = verificationResult.checkResults(dataSyncCheckWithAdditionalCols) + additionalColsResult.constraintResults.map(_.message) shouldBe + List(None) // or any expected result + assert(additionalColsResult.status == CheckStatus.Success) // or expected status + } } diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala index 505e6d13..5a21079a 100644 --- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala +++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala @@ -1,5 +1,5 @@ /** - * Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"). You may not * use this file except in compliance with the License. A copy of the License @@ -18,21 +18,30 @@ package com.amazon.deequ package checks import com.amazon.deequ.analyzers._ -import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext} -import com.amazon.deequ.anomalydetection.{Anomaly, AnomalyDetectionStrategy} -import com.amazon.deequ.constraints.{ConstrainableDataTypes, ConstraintStatus} -import com.amazon.deequ.metrics.{DoubleMetric, Entity} +import com.amazon.deequ.analyzers.runners.AnalysisRunner +import com.amazon.deequ.analyzers.runners.AnalyzerContext +import com.amazon.deequ.anomalydetection.Anomaly +import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy +import com.amazon.deequ.constraints.ConstrainableDataTypes +import com.amazon.deequ.constraints.ConstraintStatus +import com.amazon.deequ.metrics.DoubleMetric +import com.amazon.deequ.metrics.Entity import com.amazon.deequ.repository.memory.InMemoryMetricsRepository -import com.amazon.deequ.repository.{MetricsRepository, ResultKey} +import com.amazon.deequ.repository.MetricsRepository +import com.amazon.deequ.repository.ResultKey import com.amazon.deequ.utils.FixtureSupport -import org.apache.spark.sql.functions.{col, when} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.functions.when import org.apache.spark.sql.types._ -import org.apache.spark.sql.{DataFrame, Row, SparkSession} +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.Row +import org.apache.spark.sql.SparkSession import org.scalamock.scalatest.MockFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import scala.util.{Success, Try} +import scala.util.Success +import scala.util.Try class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with FixtureSupport with MockFactory { @@ -1120,10 +1129,16 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) val check = Check(CheckLevel.Error, "must have data in sync") - .isDataSynchronized(dfInformative, colMapAtt1, _ > 0.9, Some("show be in sync")) + .doesDatasetMatch(dfInformative, colMapAtt1, _ > 0.9, hint = Some("show be in sync")) val context = runChecks(dfInformative, check) assertSuccess(check, context) + + val check2 = Check(CheckLevel.Error, "must have data in sync") + .doesDatasetMatch(dfInformative, colMapAtt1, _ > 0.9, Some(colMapAtt1), Some("show be in sync with match col")) + val context2 = runChecks(dfInformative, check2) + + assertSuccess(check2, context2) } "yield failure when column doesnt exist in data sync test for 1 col" in withSparkSession { sparkSession => @@ -1131,10 +1146,11 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed") val check = Check(CheckLevel.Error, "must fail as columns does not exist") - .isDataSynchronized(dfInformativeRenamed, colMapAtt1, _ > 0.9, Some("must fail as columns does not exist")) + .doesDatasetMatch(dfInformativeRenamed, colMapAtt1, _ > 0.9, + hint = Some("must fail as columns does not exist")) val context = runChecks(dfInformative, check) assertEvaluatesTo(check, context, CheckStatus.Error) - println(context) + } "yield failure when row count varies in data sync test for 1 col" in withSparkSession { sparkSession => @@ -1142,7 +1158,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix val dfInformativeFiltered = dfInformative.filter("att1 > 2") val check = Check(CheckLevel.Error, "must fail as columns does not exist") - .isDataSynchronized(dfInformativeFiltered, colMapAtt1, _ > 0.9, Some("must fail as columns does not exist")) + .doesDatasetMatch(dfInformativeFiltered, colMapAtt1, _ > 0.9, + hint = Some("must fail as columns does not exist")) val context = runChecks(dfInformative, check) assertEvaluatesTo(check, context, CheckStatus.Error) } @@ -1153,7 +1170,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix .otherwise(col("att1"))) val check = Check(CheckLevel.Error, "must fail as rows mismatches") - .isDataSynchronized(modifiedDf, colMapAtt1, _ > 0.9, Some("must fail as rows mismatches")) + .doesDatasetMatch(modifiedDf, colMapAtt1, _ > 0.9, hint = Some("must fail as rows mismatches")) val context = runChecks(df, check) assertEvaluatesTo(check, context, CheckStatus.Error) @@ -1165,8 +1182,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix .otherwise(col("att1"))) val check = Check(CheckLevel.Error, "must be success as rows count mismatches at assertion 0.6") - .isDataSynchronized(modifiedDf, colMapAtt1, _ > 0.6, - Some("must be success as rows count mismatches at assertion 0.6")) + .doesDatasetMatch(modifiedDf, colMapAtt1, _ > 0.6, + hint = Some("must be success as rows count mismatches at assertion 0.6")) val context = runChecks(df, check) assertSuccess(check, context) } @@ -1176,19 +1193,31 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) val check = Check(CheckLevel.Error, "must have data in sync") - .isDataSynchronized(dfInformative, colMapTwoCols, _ > 0.9, Some("show be in sync")) + .doesDatasetMatch(dfInformative, colMapTwoCols, _ > 0.9, hint = Some("show be in sync")) val context = runChecks(dfInformative, check) assertSuccess(check, context) } + "yield success for basic data sync test for multiple columns and one col match" in + withSparkSession { sparkSession => + val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) + + val check = Check(CheckLevel.Error, "must have data in sync") + .doesDatasetMatch(dfInformative, colMapTwoCols, _ > 0.9, Some(colMapAtt1), hint = Some("show be in sync")) + val context = runChecks(dfInformative, check) + + assertSuccess(check, context) + } + "yield failure when column doesnt exist in data sync test for multiple columns" in withSparkSession { sparkSession => val dfInformative = getDfWithConditionallyInformativeColumns(sparkSession) val dfInformativeRenamed = dfInformative.withColumnRenamed("att1", "att1_renamed") val check = Check(CheckLevel.Error, "must fail as columns does not exist") - .isDataSynchronized(dfInformativeRenamed, colMapTwoCols, _ > 0.9, Some("must fail as columns does not exist")) + .doesDatasetMatch(dfInformativeRenamed, colMapTwoCols, _ > 0.9, + hint = Some("must fail as columns does not exist")) val context = runChecks(dfInformative, check) assertEvaluatesTo(check, context, CheckStatus.Error) @@ -1199,7 +1228,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix val dfInformativeFiltered = dfInformative.filter("att1 > 2") val check = Check(CheckLevel.Error, "must fail as columns does not exist") - .isDataSynchronized(dfInformativeFiltered, colMapTwoCols, _ > 0.9, Some("must fail as columns does not exist")) + .doesDatasetMatch(dfInformativeFiltered, colMapTwoCols, _ > 0.9, + hint = Some("must fail as columns does not exist")) val context = runChecks(dfInformative, check) assertEvaluatesTo(check, context, CheckStatus.Error) @@ -1211,7 +1241,7 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix .otherwise(col("att1"))) val check = Check(CheckLevel.Error, "must fail as rows mismatches") - .isDataSynchronized(modifiedDf, colMapTwoCols, _ > 0.9, Some("must fail as rows mismatches")) + .doesDatasetMatch(modifiedDf, colMapTwoCols, _ > 0.9, hint = Some("must fail as rows mismatches")) val context = runChecks(df, check) assertEvaluatesTo(check, context, CheckStatus.Error) @@ -1224,8 +1254,8 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix .otherwise(col("att1"))) val check = Check(CheckLevel.Error, "must be success as metric value is 0.66") - .isDataSynchronized(modifiedDf, colMapTwoCols, _ > 0.6, - Some("must be success as metric value is 0.66")) + .doesDatasetMatch(modifiedDf, colMapTwoCols, _ > 0.6, + hint = Some("must be success as metric value is 0.66")) val context = runChecks(df, check) assertSuccess(check, context) diff --git a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala index dd3a002d..f7b7e30f 100644 --- a/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala +++ b/src/test/scala/com/amazon/deequ/comparison/DataSynchronizationTest.scala @@ -57,7 +57,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.60 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) - assert(result.isInstanceOf[DataSynchronizationSucceeded]) + assert(result.isInstanceOf[DatasetMatchSucceeded]) } "match == 0.83 when id is colKey and state is compCols" in withSparkSession { spark => @@ -88,7 +88,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.80 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) - assert(result.isInstanceOf[DataSynchronizationSucceeded]) + assert(result.isInstanceOf[DatasetMatchSucceeded]) } "return false because col name isn't unique" in withSparkSession { spark => @@ -119,7 +119,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.66 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) - assert(result.isInstanceOf[DataSynchronizationFailed]) + assert(result.isInstanceOf[DatasetMatchFailed]) } "match >= 0.66 when id is unique col, rest compCols" in withSparkSession { spark => @@ -150,7 +150,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.60 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) - assert(result.isInstanceOf[DataSynchronizationSucceeded]) + assert(result.isInstanceOf[DatasetMatchSucceeded]) } "match >= 0.66 (same test as above only the data sets change)" in withSparkSession{ spark => @@ -181,7 +181,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.60 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) - assert(result.isInstanceOf[DataSynchronizationSucceeded]) + assert(result.isInstanceOf[DatasetMatchSucceeded]) } "return false because the id col in ds1 isn't unique" in withSparkSession { spark => @@ -213,7 +213,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.40 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) - assert(result.asInstanceOf[DataSynchronizationFailed].errorMessage == + assert(result.asInstanceOf[DatasetMatchFailed].errorMessage == "The selected columns are not comparable due to duplicates present in the dataset." + "Comparison keys must be unique, but in Dataframe 1, there are 6 unique records and 7 rows, " + "and in Dataframe 2, there are 6 unique records and 6 rows, based on the combination of keys " + @@ -249,7 +249,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.40 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion) - assert(result.isInstanceOf[DataSynchronizationFailed]) + assert(result.isInstanceOf[DatasetMatchFailed]) } "return false because col state isn't unique" in withSparkSession { spark => @@ -280,7 +280,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.66 val result = (DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compCols, assertion)) - assert(result.isInstanceOf[DataSynchronizationFailed]) + assert(result.isInstanceOf[DatasetMatchFailed]) } "check all columns and return an assertion of .66" in withSparkSession { spark => @@ -310,7 +310,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.66 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(result.isInstanceOf[DataSynchronizationSucceeded]) + assert(result.isInstanceOf[DatasetMatchSucceeded]) } "return false because state column isn't unique" in withSparkSession { spark => @@ -340,7 +340,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.66 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(result.isInstanceOf[DataSynchronizationFailed]) + assert(result.isInstanceOf[DatasetMatchFailed]) } "check all columns" in withSparkSession { spark => @@ -370,7 +370,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0.66 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(result.isInstanceOf[DataSynchronizationSucceeded]) + assert(result.isInstanceOf[DatasetMatchSucceeded]) } "cols exist but 0 matches" in withSparkSession { spark => import spark.implicits._ @@ -399,7 +399,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val assertion: Double => Boolean = _ >= 0 val result = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(result.isInstanceOf[DataSynchronizationSucceeded]) + assert(result.isInstanceOf[DatasetMatchSucceeded]) } } @@ -643,7 +643,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { // Overall val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(overallResult.isInstanceOf[DataSynchronizationSucceeded]) + assert(overallResult.isInstanceOf[DatasetMatchSucceeded]) // Row Level val outcomeColName = "outcome" @@ -670,7 +670,7 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { // Overall val assertion: Double => Boolean = _ >= 0.6 // 4 out of 6 rows match val overallResult = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, assertion) - assert(overallResult.isInstanceOf[DataSynchronizationSucceeded]) + assert(overallResult.isInstanceOf[DatasetMatchSucceeded]) // Row Level val outcomeColName = "outcome" @@ -700,8 +700,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap1 = Map(nonExistCol1 -> nonExistCol2) val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap1, assertion) - assert(overallResult1.isInstanceOf[DataSynchronizationFailed]) - val failedOverallResult1 = overallResult1.asInstanceOf[DataSynchronizationFailed] + assert(overallResult1.isInstanceOf[DatasetMatchFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[DatasetMatchFailed] assert(failedOverallResult1.errorMessage.contains("key columns were not found in the first dataset")) assert(failedOverallResult1.errorMessage.contains(nonExistCol1)) @@ -716,8 +716,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap2 = Map(nonExistCol1 -> idColumnName) val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap2, assertion) - assert(overallResult2.isInstanceOf[DataSynchronizationFailed]) - val failedOverallResult2 = overallResult2.asInstanceOf[DataSynchronizationFailed] + assert(overallResult2.isInstanceOf[DatasetMatchFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[DatasetMatchFailed] assert(failedOverallResult2.errorMessage.contains("key columns were not found in the first dataset")) assert(failedOverallResult2.errorMessage.contains(nonExistCol1)) @@ -732,8 +732,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val colKeyMap3 = Map(idColumnName -> nonExistCol2) val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap3, assertion) - assert(overallResult3.isInstanceOf[DataSynchronizationFailed]) - val failedOverallResult3 = overallResult3.asInstanceOf[DataSynchronizationFailed] + assert(overallResult3.isInstanceOf[DatasetMatchFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[DatasetMatchFailed] assert(failedOverallResult3.errorMessage.contains("key columns were not found in the second dataset")) assert(failedOverallResult3.errorMessage.contains(nonExistCol2)) @@ -759,8 +759,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap1 = Map(nonExistCol1 -> nonExistCol2) val overallResult1 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap1, assertion) - assert(overallResult1.isInstanceOf[DataSynchronizationFailed]) - val failedOverallResult1 = overallResult1.asInstanceOf[DataSynchronizationFailed] + assert(overallResult1.isInstanceOf[DatasetMatchFailed]) + val failedOverallResult1 = overallResult1.asInstanceOf[DatasetMatchFailed] assert(failedOverallResult1.errorMessage.contains( s"The following columns were not found in the first dataset: $nonExistCol1")) @@ -775,8 +775,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap2 = Map(nonExistCol1 -> "State") val overallResult2 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap2, assertion) - assert(overallResult2.isInstanceOf[DataSynchronizationFailed]) - val failedOverallResult2 = overallResult2.asInstanceOf[DataSynchronizationFailed] + assert(overallResult2.isInstanceOf[DatasetMatchFailed]) + val failedOverallResult2 = overallResult2.asInstanceOf[DatasetMatchFailed] assert(failedOverallResult2.errorMessage.contains( s"The following columns were not found in the first dataset: $nonExistCol1")) @@ -791,8 +791,8 @@ class DataSynchronizationTest extends AnyWordSpec with SparkContextSpec { val compColsMap3 = Map("state" -> nonExistCol2) val overallResult3 = DataSynchronization.columnMatch(ds1, ds2, colKeyMap, compColsMap3, assertion) - assert(overallResult3.isInstanceOf[DataSynchronizationFailed]) - val failedOverallResult3 = overallResult3.asInstanceOf[DataSynchronizationFailed] + assert(overallResult3.isInstanceOf[DatasetMatchFailed]) + val failedOverallResult3 = overallResult3.asInstanceOf[DatasetMatchFailed] assert(failedOverallResult3.errorMessage.contains( s"The following columns were not found in the second dataset: $nonExistCol2"))