diff --git a/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala b/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala index fb31651c8..40caa4092 100644 --- a/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala +++ b/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala @@ -22,9 +22,6 @@ import com.amazon.deequ.analyzers.{State, _} import com.amazon.deequ.checks.{Check, CheckLevel} import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository._ -import com.amazon.deequ.utilities.FilteredRow.FilteredRow -import com.amazon.deequ.utilities.RowLevelFilterTreatment -import com.amazon.deequ.utilities.RowLevelFilterTreatmentImpl import org.apache.spark.sql.{DataFrame, SparkSession} /** A class to build a VerificationRun using a fluent API */ @@ -47,7 +44,6 @@ class VerificationRunBuilder(val data: DataFrame) { protected var statePersister: Option[StatePersister] = None protected var stateLoader: Option[StateLoader] = None - protected var rowLevelFilterTreatment: RowLevelFilterTreatment = RowLevelFilterTreatment.sharedInstance protected def this(verificationRunBuilder: VerificationRunBuilder) { @@ -70,7 +66,6 @@ class VerificationRunBuilder(val data: DataFrame) { stateLoader = verificationRunBuilder.stateLoader statePersister = verificationRunBuilder.statePersister - rowLevelFilterTreatment = verificationRunBuilder.rowLevelFilterTreatment } /** @@ -140,17 +135,6 @@ class VerificationRunBuilder(val data: DataFrame) { this } - /** - * Sets how row level results will be treated for the Verification run - * - * @param filteredRow enum to determine how filtered row level results are labeled (TRUE | NULL) - */ - def withRowLevelFilterTreatment(filteredRow: FilteredRow): this.type = { - RowLevelFilterTreatment.setSharedInstance(new RowLevelFilterTreatmentImpl(filteredRow)) - rowLevelFilterTreatment = RowLevelFilterTreatment.sharedInstance - this - } - /** * Set a metrics repository associated with the current data to enable features like reusing * previously computed results and storing the results of the current run. diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala index 028579426..108c23613 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala @@ -17,6 +17,7 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers._ +import com.amazon.deequ.analyzers.FilteredRow.FilteredRow import com.amazon.deequ.analyzers.NullBehavior.NullBehavior import com.amazon.deequ.analyzers.runners._ import com.amazon.deequ.metrics.DoubleMetric @@ -24,11 +25,6 @@ import com.amazon.deequ.metrics.Entity import com.amazon.deequ.metrics.FullColumn import com.amazon.deequ.metrics.Metric import com.amazon.deequ.utilities.ColumnUtil.removeEscapeColumn -import com.amazon.deequ.utilities.FilteredRow -import com.amazon.deequ.utilities.FilteredRow.FilteredRow -import com.amazon.deequ.utilities.RowLevelFilterTreatment -import com.amazon.deequ.utilities.RowLevelFilterTreatmentImpl -import com.google.common.annotations.VisibleForTesting import org.apache.spark.sql.Column import org.apache.spark.sql.DataFrame import org.apache.spark.sql.Row @@ -266,12 +262,17 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo } } -case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore) +case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore, filteredRow: FilteredRow = FilteredRow.TRUE) object NullBehavior extends Enumeration { type NullBehavior = Value val Ignore, EmptyString, Fail = Value } +object FilteredRow extends Enumeration { + type FilteredRow = Value + val NULL, TRUE = Value +} + /** Base class for analyzers that compute ratios of matching predicates */ abstract class PredicateMatchingAnalyzer( name: String, diff --git a/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala b/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala index f385da45d..61bffb8e0 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala @@ -20,19 +20,16 @@ import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNotNested} import org.apache.spark.sql.functions.sum import org.apache.spark.sql.types.{IntegerType, StructType} import Analyzers._ -import com.amazon.deequ.utilities.FilteredRow.FilteredRow -import com.amazon.deequ.utilities.RowLevelAnalyzer -import com.amazon.deequ.utilities.RowLevelFilterTreatment -import com.amazon.deequ.utilities.RowLevelFilterTreatmentImpl +import com.amazon.deequ.analyzers.FilteredRow.FilteredRow import com.google.common.annotations.VisibleForTesting import org.apache.spark.sql.functions.col import org.apache.spark.sql.functions.expr import org.apache.spark.sql.{Column, Row} /** Completeness is the fraction of non-null values in a column of a DataFrame. */ -case class Completeness(column: String, where: Option[String] = None) extends +case class Completeness(column: String, where: Option[String] = None, analyzerOptions: Option[AnalyzerOptions] = None) extends StandardScanShareableAnalyzer[NumMatchesAndCount]("Completeness", column) with - FilterableAnalyzer with RowLevelAnalyzer { + FilterableAnalyzer { override def fromAggregationResult(result: Row, offset: Int): Option[NumMatchesAndCount] = { ifNoNullsIn(result, offset, howMany = 2) { _ => @@ -59,12 +56,12 @@ case class Completeness(column: String, where: Option[String] = None) extends @VisibleForTesting private[deequ] def rowLevelResults: Column = { val whereCondition = where.map { expression => expr(expression)} - conditionalSelectionFilteredFromColumns(col(column).isNotNull, whereCondition, rowLevelFilterTreatment.toString) + conditionalSelectionFilteredFromColumns(col(column).isNotNull, whereCondition, getRowLevelFilterTreatment.toString) } - @VisibleForTesting - private[deequ] def withRowLevelFilterTreatment(filteredRow: FilteredRow): this.type = { - RowLevelFilterTreatment.setSharedInstance(new RowLevelFilterTreatmentImpl(filteredRow)) - this + private def getRowLevelFilterTreatment: FilteredRow = { + analyzerOptions + .map { options => options.filteredRow } + .getOrElse(FilteredRow.TRUE) } } diff --git a/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala b/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala index b3d1d7011..c2fce1f14 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala @@ -17,8 +17,8 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers.COUNT_COL +import com.amazon.deequ.analyzers.FilteredRow.FilteredRow import com.amazon.deequ.metrics.DoubleMetric -import com.amazon.deequ.utilities.RowLevelAnalyzer import org.apache.spark.sql.functions.expr import org.apache.spark.sql.functions.not import org.apache.spark.sql.functions.when @@ -26,9 +26,10 @@ import org.apache.spark.sql.{Column, Row} import org.apache.spark.sql.functions.{col, count, lit, sum} import org.apache.spark.sql.types.DoubleType -case class UniqueValueRatio(columns: Seq[String], where: Option[String] = None) +case class UniqueValueRatio(columns: Seq[String], where: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) extends ScanShareableFrequencyBasedAnalyzer("UniqueValueRatio", columns) - with FilterableAnalyzer with RowLevelAnalyzer { + with FilterableAnalyzer { override def aggregationFunctions(numRows: Long): Seq[Column] = { sum(col(COUNT_COL).equalTo(lit(1)).cast(DoubleType)) :: count("*") :: Nil @@ -38,17 +39,26 @@ case class UniqueValueRatio(columns: Seq[String], where: Option[String] = None) val numUniqueValues = result.getDouble(offset) val numDistinctValues = result.getLong(offset + 1).toDouble val conditionColumn = where.map { expression => expr(expression) } - val fullColumnUniqueness = conditionColumn.map { - condition => { - when(not(condition), expr(rowLevelFilterTreatment.toString)) - .when((fullColumn.getOrElse(null)).equalTo(1), true) - .otherwise(false) + val fullColumnUniqueness = fullColumn.map { + rowLevelColumn => { + conditionColumn.map { + condition => { + when(not(condition), expr(getRowLevelFilterTreatment.toString)) + .when(rowLevelColumn.equalTo(1), true).otherwise(false) + } + }.getOrElse(when(rowLevelColumn.equalTo(1), true).otherwise(false)) } - }.getOrElse(when((fullColumn.getOrElse(null)).equalTo(1), true).otherwise(false)) - toSuccessMetric(numUniqueValues / numDistinctValues, Option(fullColumnUniqueness)) + } + toSuccessMetric(numUniqueValues / numDistinctValues, fullColumnUniqueness) } override def filterCondition: Option[String] = where + + private def getRowLevelFilterTreatment: FilteredRow = { + analyzerOptions + .map { options => options.filteredRow } + .getOrElse(FilteredRow.TRUE) + } } object UniqueValueRatio { diff --git a/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala b/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala index 16ec6d7b1..78ba4c418 100644 --- a/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala +++ b/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala @@ -17,12 +17,8 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.analyzers.Analyzers.COUNT_COL +import com.amazon.deequ.analyzers.FilteredRow.FilteredRow import com.amazon.deequ.metrics.DoubleMetric -import com.amazon.deequ.utilities.FilteredRow -import com.amazon.deequ.utilities.FilteredRow.FilteredRow -import com.amazon.deequ.utilities.RowLevelAnalyzer -import com.amazon.deequ.utilities.RowLevelFilterTreatment -import com.amazon.deequ.utilities.RowLevelFilterTreatmentImpl import com.google.common.annotations.VisibleForTesting import org.apache.spark.sql.Column import org.apache.spark.sql.Row @@ -36,9 +32,10 @@ import org.apache.spark.sql.types.DoubleType /** Uniqueness is the fraction of unique values of a column(s), i.e., * values that occur exactly once. */ -case class Uniqueness(columns: Seq[String], where: Option[String] = None) +case class Uniqueness(columns: Seq[String], where: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) extends ScanShareableFrequencyBasedAnalyzer("Uniqueness", columns) - with FilterableAnalyzer with RowLevelAnalyzer { + with FilterableAnalyzer { override def aggregationFunctions(numRows: Long): Seq[Column] = { (sum(col(COUNT_COL).equalTo(lit(1)).cast(DoubleType)) / numRows) :: Nil @@ -46,22 +43,25 @@ case class Uniqueness(columns: Seq[String], where: Option[String] = None) override def fromAggregationResult(result: Row, offset: Int, fullColumn: Option[Column]): DoubleMetric = { val conditionColumn = where.map { expression => expr(expression) } - val fullColumnUniqueness = conditionColumn.map { - condition => { - when(not(condition), expr(rowLevelFilterTreatment.toString)) - .when(fullColumn.getOrElse(null).equalTo(1), true).otherwise(false) + val fullColumnUniqueness = fullColumn.map { + rowLevelColumn => { + conditionColumn.map { + condition => { + when(not(condition), expr(getRowLevelFilterTreatment.toString)) + .when(rowLevelColumn.equalTo(1), true).otherwise(false) + } + }.getOrElse(when(rowLevelColumn.equalTo(1), true).otherwise(false)) } - }.getOrElse(when((fullColumn.getOrElse(null)).equalTo(1), true).otherwise(false)) - super.fromAggregationResult(result, offset, Option(fullColumnUniqueness)) + } + super.fromAggregationResult(result, offset, fullColumnUniqueness) } override def filterCondition: Option[String] = where - - @VisibleForTesting - private[deequ] def withRowLevelFilterTreatment(filteredRow: FilteredRow): this.type = { - RowLevelFilterTreatment.setSharedInstance(new RowLevelFilterTreatmentImpl(filteredRow)) - this + private def getRowLevelFilterTreatment: FilteredRow = { + analyzerOptions + .map { options => options.filteredRow } + .getOrElse(FilteredRow.TRUE) } } diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala index 884041469..9921e7f8e 100644 --- a/src/main/scala/com/amazon/deequ/checks/Check.scala +++ b/src/main/scala/com/amazon/deequ/checks/Check.scala @@ -132,10 +132,12 @@ case class Check( * * @param column Column to run the assertion on * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) * @return */ - def isComplete(column: String, hint: Option[String] = None): CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => completenessConstraint(column, Check.IsOne, filter, hint) } + def isComplete(column: String, hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None): CheckWithLastConstraintFilterable = { + addFilterableConstraint { filter => completenessConstraint(column, Check.IsOne, filter, hint, analyzerOptions) } } /** @@ -146,14 +148,16 @@ case class Check( * @param column Column to run the assertion on * @param assertion Function that receives a double input parameter and returns a boolean * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) * @return */ def hasCompleteness( column: String, assertion: Double => Boolean, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => completenessConstraint(column, assertion, filter, hint) } + addFilterableConstraint { filter => completenessConstraint(column, assertion, filter, hint, analyzerOptions) } } /** @@ -221,11 +225,13 @@ case class Check( * * @param column Column to run the assertion on * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) * @return */ - def isUnique(column: String, hint: Option[String] = None): CheckWithLastConstraintFilterable = { + def isUnique(column: String, hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None): CheckWithLastConstraintFilterable = { addFilterableConstraint { filter => - uniquenessConstraint(Seq(column), Check.IsOne, filter, hint) } + uniquenessConstraint(Seq(column), Check.IsOne, filter, hint, analyzerOptions) } } /** @@ -269,6 +275,24 @@ case class Check( addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter) } } + /** + * Creates a constraint that asserts on uniqueness in a single or combined set of key columns. + * + * @param columns Key columns + * @param assertion Function that receives a double input parameter and returns a boolean. + * Refers to the fraction of unique values + * @param hint A hint to provide additional context why a constraint could have failed + * @return + */ + def hasUniqueness( + columns: Seq[String], + assertion: Double => Boolean, + hint: Option[String]) + : CheckWithLastConstraintFilterable = { + + addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter, hint) } + } + /** * Creates a constraint that asserts on uniqueness in a single or combined set of key columns. * @@ -276,15 +300,17 @@ case class Check( * @param assertion Function that receives a double input parameter and returns a boolean. * Refers to the fraction of unique values * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) * @return */ def hasUniqueness( columns: Seq[String], assertion: Double => Boolean, - hint: Option[String]) + hint: Option[String], + analyzerOptions: Option[AnalyzerOptions]) : CheckWithLastConstraintFilterable = { - addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter, hint) } + addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter, hint, analyzerOptions) } } /** @@ -314,6 +340,22 @@ case class Check( hasUniqueness(Seq(column), assertion, hint) } + /** + * Creates a constraint that asserts on the uniqueness of a key column. + * + * @param column Key column + * @param assertion Function that receives a double input parameter and returns a boolean. + * Refers to the fraction of unique values. + * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) + * @return + */ + def hasUniqueness(column: String, assertion: Double => Boolean, hint: Option[String], + analyzerOptions: Option[AnalyzerOptions]) + : CheckWithLastConstraintFilterable = { + hasUniqueness(Seq(column), assertion, hint, analyzerOptions) + } + /** * Creates a constraint on the distinctness in a single or combined set of key columns. * @@ -601,6 +643,7 @@ case class Check( * @param column Column to run the assertion on * @param assertion Function that receives a double input parameter and returns a boolean * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) * @return */ def hasMinLength( @@ -619,6 +662,7 @@ case class Check( * @param column Column to run the assertion on * @param assertion Function that receives a double input parameter and returns a boolean * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) * @return */ def hasMaxLength( diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala index 74070687e..d17ee9abe 100644 --- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala +++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala @@ -192,15 +192,17 @@ object Constraint { * @param assertion Function that receives a double input parameter (since the metric is * double metric) and returns a boolean * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) */ def completenessConstraint( column: String, assertion: Double => Boolean, where: Option[String] = None, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : Constraint = { - val completeness = Completeness(column, where) + val completeness = Completeness(column, where, analyzerOptions) this.fromAnalyzer(completeness, assertion, hint) } @@ -242,15 +244,17 @@ object Constraint { * (since the metric is double metric) and returns a boolean * @param where Additional filter to apply before the analyzer is run. * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) */ def uniquenessConstraint( columns: Seq[String], assertion: Double => Boolean, where: Option[String] = None, - hint: Option[String] = None) + hint: Option[String] = None, + analyzerOptions: Option[AnalyzerOptions] = None) : Constraint = { - val uniqueness = Uniqueness(columns, where) + val uniqueness = Uniqueness(columns, where, analyzerOptions) fromAnalyzer(uniqueness, assertion, hint) } @@ -528,6 +532,7 @@ object Constraint { * @param column Column to run the assertion on * @param assertion Function that receives a double input parameter and returns a boolean * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) */ def maxLengthConstraint( column: String, @@ -562,6 +567,7 @@ object Constraint { * @param column Column to run the assertion on * @param assertion Function that receives a double input parameter and returns a boolean * @param hint A hint to provide additional context why a constraint could have failed + * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow) */ def minLengthConstraint( column: String, diff --git a/src/main/scala/com/amazon/deequ/utilities/RowLevelFilterTreatement.scala b/src/main/scala/com/amazon/deequ/utilities/RowLevelFilterTreatement.scala deleted file mode 100644 index c37e72e61..000000000 --- a/src/main/scala/com/amazon/deequ/utilities/RowLevelFilterTreatement.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright 2024 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). You may not - * use this file except in compliance with the License. A copy of the License - * is located at - * - * http://aws.amazon.com/apache2.0/ - * - * or in the "license" file accompanying this file. This file is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - * - */ - -package com.amazon.deequ.utilities -import com.amazon.deequ.utilities.FilteredRow.FilteredRow - -/** - * Trait that defines how row level results will be treated when a filter is applied to an analyzer - */ -trait RowLevelFilterTreatment { - def rowLevelFilterTreatment: FilteredRow -} - -/** - * Companion object for RowLevelFilterTreatment - * Defines a sharedInstance that can be used throughout the VerificationRunBuilder - */ -object RowLevelFilterTreatment { - private var _sharedInstance: RowLevelFilterTreatment = new RowLevelFilterTreatmentImpl(FilteredRow.TRUE) - - def sharedInstance: RowLevelFilterTreatment = _sharedInstance - - def setSharedInstance(instance: RowLevelFilterTreatment): Unit = { - _sharedInstance = instance - } -} - -class RowLevelFilterTreatmentImpl(initialFilterTreatment: FilteredRow) extends RowLevelFilterTreatment { - override val rowLevelFilterTreatment: FilteredRow = initialFilterTreatment -} - -object FilteredRow extends Enumeration { - type FilteredRow = Value - val NULL, TRUE = Value -} - -trait RowLevelAnalyzer extends RowLevelFilterTreatment { - def rowLevelFilterTreatment: FilteredRow.Value = RowLevelFilterTreatment.sharedInstance.rowLevelFilterTreatment -} diff --git a/src/test/scala/com/amazon/deequ/VerificationResultTest.scala b/src/test/scala/com/amazon/deequ/VerificationResultTest.scala index 93aa73201..0a90c8f77 100644 --- a/src/test/scala/com/amazon/deequ/VerificationResultTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationResultTest.scala @@ -78,6 +78,13 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe val successMetricsResultsJson = VerificationResult.successMetricsAsJson(results) + val expectedJsonSet = Set("""{"entity":"Column","instance":"item","name":"Distinctness","value":1.0}""", + """{"entity": "Column", "instance":"att2","name":"Completeness","value":1.0}""", + """{"entity":"Column","instance":"att1","name":"Completeness","value":1.0}""", + """{"entity":"Multicolumn","instance":"att1,att2", + "name":"Uniqueness","value":0.25}""", + """{"entity":"Dataset","instance":"*","name":"Size","value":4.0}""") + val expectedJson = """[{"entity":"Column","instance":"item","name":"Distinctness","value":1.0}, |{"entity": "Column", "instance":"att2","name":"Completeness","value":1.0}, @@ -123,11 +130,11 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe import session.implicits._ val expected = Seq( - ("group-1", "Error", "Success", "CompletenessConstraint(Completeness(att1,None))", + ("group-1", "Error", "Success", "CompletenessConstraint(Completeness(att1,None,None))", "Success", ""), ("group-2-E", "Error", "Error", "SizeConstraint(Size(None))", "Failure", "Value: 4 does not meet the constraint requirement! Should be greater than 5!"), - ("group-2-E", "Error", "Error", "CompletenessConstraint(Completeness(att2,None))", + ("group-2-E", "Error", "Error", "CompletenessConstraint(Completeness(att2,None,None))", "Success", ""), ("group-2-W", "Warning", "Warning", "DistinctnessConstraint(Distinctness(List(item),None))", @@ -150,7 +157,7 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe val expectedJson = """[{"check":"group-1","check_level":"Error","check_status":"Success", - |"constraint":"CompletenessConstraint(Completeness(att1,None))", + |"constraint":"CompletenessConstraint(Completeness(att1,None,None))", |"constraint_status":"Success","constraint_message":""}, | |{"check":"group-2-E","check_level":"Error","check_status":"Error", @@ -159,7 +166,7 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe | Should be greater than 5!"}, | |{"check":"group-2-E","check_level":"Error","check_status":"Error", - |"constraint":"CompletenessConstraint(Completeness(att2,None))", + |"constraint":"CompletenessConstraint(Completeness(att2,None,None))", |"constraint_status":"Success","constraint_message":""}, | |{"check":"group-2-W","check_level":"Warning","check_status":"Warning", @@ -214,7 +221,6 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe } private[this] def assertSameResultsJson(jsonA: String, jsonB: String): Unit = { - assert(SimpleResultSerde.deserialize(jsonA) == - SimpleResultSerde.deserialize(jsonB)) + assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet)) } } diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala index 5701b456b..932c82988 100644 --- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala +++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala @@ -29,7 +29,6 @@ import com.amazon.deequ.metrics.Entity import com.amazon.deequ.repository.MetricsRepository import com.amazon.deequ.repository.ResultKey import com.amazon.deequ.repository.memory.InMemoryMetricsRepository -import com.amazon.deequ.utilities.FilteredRow import com.amazon.deequ.utils.CollectionUtils.SeqExtensions import com.amazon.deequ.utils.FixtureSupport import com.amazon.deequ.utils.TempFileUtils @@ -350,20 +349,21 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec "generate a result that contains row-level results with null for filtered rows" in withSparkSession { session => val data = getDfCompleteAndInCompleteColumns(session) + val analyzerOptions = Option(AnalyzerOptions(filteredRow = FilteredRow.NULL)) + val completeness = new Check(CheckLevel.Error, "rule1") - .hasCompleteness("att2", _ > 0.7, None) + .hasCompleteness("att2", _ > 0.7, None, analyzerOptions) .where("att1 = \"a\"") val uniqueness = new Check(CheckLevel.Error, "rule2") - .hasUniqueness("att1", _ > 0.5, None) + .hasUniqueness("att1", _ > 0.5, None, analyzerOptions) val uniquenessWhere = new Check(CheckLevel.Error, "rule3") - .isUnique("att1") + .isUnique("att1", None, analyzerOptions) .where("item < 3") val expectedColumn1 = completeness.description val expectedColumn2 = uniqueness.description val expectedColumn3 = uniquenessWhere.description val suite = new VerificationSuite().onData(data) - .withRowLevelFilterTreatment(FilteredRow.NULL) .addCheck(completeness) .addCheck(uniqueness) .addCheck(uniquenessWhere) diff --git a/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala b/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala index cb2778a1b..54e26f867 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala @@ -19,7 +19,6 @@ package com.amazon.deequ.analyzers import com.amazon.deequ.SparkContextSpec import com.amazon.deequ.metrics.DoubleMetric import com.amazon.deequ.metrics.FullColumn -import com.amazon.deequ.utilities.FilteredRow import com.amazon.deequ.utils.FixtureSupport import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -46,7 +45,8 @@ class CompletenessTest extends AnyWordSpec with Matchers with SparkContextSpec w val data = getDfCompleteAndInCompleteColumns(session) // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder - val completenessAtt2 = Completeness("att2", Option("att1 = \"a\"")).withRowLevelFilterTreatment(FilteredRow.NULL) + val completenessAtt2 = Completeness("att2", Option("att1 = \"a\""), + Option(AnalyzerOptions(filteredRow = FilteredRow.NULL))) val state = completenessAtt2.computeStateFrom(data) val metric: DoubleMetric with FullColumn = completenessAtt2.computeMetricFrom(state) @@ -61,7 +61,7 @@ class CompletenessTest extends AnyWordSpec with Matchers with SparkContextSpec w val data = getDfCompleteAndInCompleteColumns(session) // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder - val completenessAtt2 = Completeness("att2", Option("att1 = \"a\"")).withRowLevelFilterTreatment(FilteredRow.TRUE) + val completenessAtt2 = Completeness("att2", Option("att1 = \"a\"")) val state = completenessAtt2.computeStateFrom(data) val metric: DoubleMetric with FullColumn = completenessAtt2.computeMetricFrom(state) diff --git a/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala b/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala index 7be9b4b35..d50995b55 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala @@ -21,7 +21,6 @@ import com.amazon.deequ.VerificationResult.UNIQUENESS_ID import com.amazon.deequ.analyzers.runners.AnalysisRunner import com.amazon.deequ.metrics.DoubleMetric import com.amazon.deequ.metrics.FullColumn -import com.amazon.deequ.utilities.FilteredRow import com.amazon.deequ.utils.FixtureSupport import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession @@ -123,8 +122,8 @@ class UniquenessTest extends AnyWordSpec with Matchers with SparkContextSpec wit val data = getDfWithUniqueColumns(session) - val addressLength = Uniqueness(Seq("onlyUniqueWithOtherNonUnique"), Option("unique < 4")) - .withRowLevelFilterTreatment(FilteredRow.NULL) + val addressLength = Uniqueness(Seq("onlyUniqueWithOtherNonUnique"), Option("unique < 4"), + Option(AnalyzerOptions(filteredRow = FilteredRow.NULL))) val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique < 4")) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) @@ -139,8 +138,8 @@ class UniquenessTest extends AnyWordSpec with Matchers with SparkContextSpec wit val data = getDfWithUniqueColumns(session) - val addressLength = Uniqueness(Seq("halfUniqueCombinedWithNonUnique", "nonUnique"), Option("unique > 2")) - .withRowLevelFilterTreatment(FilteredRow.NULL) + val addressLength = Uniqueness(Seq("halfUniqueCombinedWithNonUnique", "nonUnique"), Option("unique > 2"), + Option(AnalyzerOptions(filteredRow = FilteredRow.NULL))) val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique > 2")) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) @@ -157,7 +156,6 @@ class UniquenessTest extends AnyWordSpec with Matchers with SparkContextSpec wit // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder val addressLength = Uniqueness(Seq("onlyUniqueWithOtherNonUnique"), Option("unique < 4")) - .withRowLevelFilterTreatment(FilteredRow.TRUE) val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique < 4")) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) @@ -174,7 +172,6 @@ class UniquenessTest extends AnyWordSpec with Matchers with SparkContextSpec wit // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder val addressLength = Uniqueness(Seq("halfUniqueCombinedWithNonUnique", "nonUnique"), Option("unique > 2")) - .withRowLevelFilterTreatment(FilteredRow.TRUE) val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique > 2")) val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state) diff --git a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala index 31b7365ad..ce9bda69b 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala @@ -284,7 +284,7 @@ class AnalysisRunnerTests extends AnyWordSpec assert(exception.getMessage == "Could not find all necessary results in the " + "MetricsRepository, the calculation of the metrics for these analyzers " + - "would be needed: Uniqueness(List(item, att2),None), Size(None)") + "would be needed: Uniqueness(List(item, att2),None,None), Size(None)") } "save results if specified" in diff --git a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala index 254fac9b4..9133d5ae4 100644 --- a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala +++ b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala @@ -145,7 +145,8 @@ class AnalyzerContextTest extends AnyWordSpec } private[this] def assertSameJson(jsonA: String, jsonB: String): Unit = { - assert(SimpleResultSerde.deserialize(jsonA) == - SimpleResultSerde.deserialize(jsonB)) + assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet)) + // assert(SimpleResultSerde.deserialize(jsonA) == + // SimpleResultSerde.deserialize(jsonB)) } } diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala index 6f1fa1874..05f4d47bd 100644 --- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala @@ -363,7 +363,7 @@ class SimpleResultSerdeTest extends WordSpec with Matchers with SparkContextSpec .stripMargin.replaceAll("\n", "") // ordering of map entries is not guaranteed, so comparing strings is not an option - assert(SimpleResultSerde.deserialize(sucessMetricsResultJson) == - SimpleResultSerde.deserialize(expected)) + assert(SimpleResultSerde.deserialize(sucessMetricsResultJson).toSet.sameElements( + SimpleResultSerde.deserialize(expected).toSet)) } } diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala index 97d7a3c49..d4ce97fcb 100644 --- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala @@ -344,7 +344,8 @@ class AnalysisResultTest extends AnyWordSpec } private[this] def assertSameJson(jsonA: String, jsonB: String): Unit = { - assert(SimpleResultSerde.deserialize(jsonA) == - SimpleResultSerde.deserialize(jsonB)) + assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet)) +// assert(SimpleResultSerde.deserialize(jsonA) == +// SimpleResultSerde.deserialize(jsonB)) } } diff --git a/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala b/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala index 6e61b9385..592f27b0e 100644 --- a/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala @@ -264,7 +264,8 @@ class MetricsRepositoryMultipleResultsLoaderTest extends AnyWordSpec with Matche } private[this] def assertSameJson(jsonA: String, jsonB: String): Unit = { - assert(SimpleResultSerde.deserialize(jsonA) == - SimpleResultSerde.deserialize(jsonB)) + assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet)) + // assert(SimpleResultSerde.deserialize(jsonA) == + // SimpleResultSerde.deserialize(jsonB)) } } diff --git a/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala b/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala index 6a98bf3c6..9a82903e8 100644 --- a/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala +++ b/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala @@ -212,7 +212,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo """{ | "constraint_suggestions": [ | { - | "constraint_name": "CompletenessConstraint(Completeness(att2,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))", | "column_name": "att2", | "current_value": "Completeness: 1.0", | "description": "'att2' is not null", @@ -222,7 +222,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "code_for_constraint": ".isComplete(\"att2\")" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(att1,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))", | "column_name": "att1", | "current_value": "Completeness: 1.0", | "description": "'att1' is not null", @@ -232,7 +232,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "code_for_constraint": ".isComplete(\"att1\")" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(item,None))", + | "constraint_name": "CompletenessConstraint(Completeness(item,None,None))", | "column_name": "item", | "current_value": "Completeness: 1.0", | "description": "'item' is not null", @@ -265,7 +265,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "code_for_constraint": ".isNonNegative(\"item\")" | }, | { - | "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None))", + | "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None,None))", | "column_name": "item", | "current_value": "ApproxDistinctness: 1.0", | "description": "'item' is unique", @@ -294,7 +294,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo """{ | "constraint_suggestions": [ | { - | "constraint_name": "CompletenessConstraint(Completeness(att2,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))", | "column_name": "att2", | "current_value": "Completeness: 1.0", | "description": "\u0027att2\u0027 is not null", @@ -305,7 +305,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "constraint_result_on_test_set": "Failure" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(att1,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))", | "column_name": "att1", | "current_value": "Completeness: 1.0", | "description": "\u0027att1\u0027 is not null", @@ -316,7 +316,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "constraint_result_on_test_set": "Failure" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(item,None))", + | "constraint_name": "CompletenessConstraint(Completeness(item,None,None))", | "column_name": "item", | "current_value": "Completeness: 1.0", | "description": "\u0027item\u0027 is not null", @@ -352,7 +352,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "constraint_result_on_test_set": "Failure" | }, | { - | "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None))", + | "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None,None))", | "column_name": "item", | "current_value": "ApproxDistinctness: 1.0", | "description": "\u0027item\u0027 is unique", @@ -381,7 +381,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo """{ | "constraint_suggestions": [ | { - | "constraint_name": "CompletenessConstraint(Completeness(att2,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))", | "column_name": "att2", | "current_value": "Completeness: 1.0", | "description": "\u0027att2\u0027 is not null", @@ -392,7 +392,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "constraint_result_on_test_set": "Unknown" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(att1,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))", | "column_name": "att1", | "current_value": "Completeness: 1.0", | "description": "\u0027att1\u0027 is not null", @@ -403,7 +403,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "constraint_result_on_test_set": "Unknown" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(item,None))", + | "constraint_name": "CompletenessConstraint(Completeness(item,None,None))", | "column_name": "item", | "current_value": "Completeness: 1.0", | "description": "\u0027item\u0027 is not null", @@ -439,7 +439,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "constraint_result_on_test_set": "Unknown" | }, | { - | "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None))", + | "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None,None))", | "column_name": "item", | "current_value": "ApproxDistinctness: 1.0", | "description": "\u0027item\u0027 is unique", @@ -471,7 +471,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo """{ | "constraint_suggestions": [ | { - | "constraint_name": "CompletenessConstraint(Completeness(`item.one`,None))", + | "constraint_name": "CompletenessConstraint(Completeness(`item.one`,None,None))", | "column_name": "`item.one`", | "current_value": "Completeness: 1.0", | "description": "'`item.one`' is not null", @@ -504,7 +504,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "code_for_constraint": ".isNonNegative(\"`item.one`\")" | }, | { - | "constraint_name": "UniquenessConstraint(Uniqueness(List(`item.one`),None))", + | "constraint_name": "UniquenessConstraint(Uniqueness(List(`item.one`),None,None))", | "column_name": "`item.one`", | "current_value": "ApproxDistinctness: 1.0", | "description": "'`item.one`' is unique", @@ -515,7 +515,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "code_for_constraint": ".isUnique(\"`item.one`\")" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(att2,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))", | "column_name": "att2", | "current_value": "Completeness: 1.0", | "description": "'att2' is not null", @@ -525,7 +525,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo | "code_for_constraint": ".isComplete(\"att2\")" | }, | { - | "constraint_name": "CompletenessConstraint(Completeness(att1,None))", + | "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))", | "column_name": "att1", | "current_value": "Completeness: 1.0", | "description": "'att1' is not null",