diff --git a/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala b/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala
index a4ee45f6b..40caa4092 100644
--- a/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala
+++ b/src/main/scala/com/amazon/deequ/VerificationRunBuilder.scala
@@ -25,7 +25,7 @@ import com.amazon.deequ.repository._
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 /** A class to build a VerificationRun using a fluent API */
-class VerificationRunBuilder(val data: DataFrame) {
+class VerificationRunBuilder(val data: DataFrame)  {
 
   protected var requiredAnalyzers: Seq[Analyzer[_, Metric[_]]] = Seq.empty
 
@@ -159,7 +159,6 @@ class VerificationRunBuilder(val data: DataFrame) {
     new VerificationRunBuilderWithSparkSession(this, Option(sparkSession))
   }
 
-
   def run(): VerificationResult = {
     VerificationSuite().doVerificationRun(
       data,
diff --git a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
index a80405825..bc241fe72 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/Analyzer.scala
@@ -17,6 +17,7 @@
 package com.amazon.deequ.analyzers
 
 import com.amazon.deequ.analyzers.Analyzers._
+import com.amazon.deequ.analyzers.FilteredRow.FilteredRow
 import com.amazon.deequ.analyzers.NullBehavior.NullBehavior
 import com.amazon.deequ.analyzers.runners._
 import com.amazon.deequ.metrics.DoubleMetric
@@ -69,7 +70,7 @@ trait Analyzer[S <: State[_], +M <: Metric[_]] extends Serializable {
     * @param data data frame
     * @return
     */
-  def computeStateFrom(data: DataFrame): Option[S]
+  def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None): Option[S]
 
   /**
     * Compute the metric from the state (sufficient statistics)
@@ -97,13 +98,14 @@ trait Analyzer[S <: State[_], +M <: Metric[_]] extends Serializable {
   def calculate(
       data: DataFrame,
       aggregateWith: Option[StateLoader] = None,
-      saveStatesWith: Option[StatePersister] = None)
+      saveStatesWith: Option[StatePersister] = None,
+      filterCondition: Option[String] = None)
     : M = {
 
     try {
       preconditions.foreach { condition => condition(data.schema) }
 
-      val state = computeStateFrom(data)
+      val state = computeStateFrom(data, filterCondition)
 
       calculateMetric(state, aggregateWith, saveStatesWith)
     } catch {
@@ -170,7 +172,6 @@ trait Analyzer[S <: State[_], +M <: Metric[_]] extends Serializable {
   private[deequ] def copyStateTo(source: StateLoader, target: StatePersister): Unit = {
     source.load[S](this).foreach { state => target.persist(this, state) }
   }
-
 }
 
 /** An analyzer that runs a set of aggregation functions over the data,
@@ -184,7 +185,7 @@ trait ScanShareableAnalyzer[S <: State[_], +M <: Metric[_]] extends Analyzer[S,
   private[deequ] def fromAggregationResult(result: Row, offset: Int): Option[S]
 
   /** Runs aggregation functions directly, without scan sharing */
-  override def computeStateFrom(data: DataFrame): Option[S] = {
+  override def computeStateFrom(data: DataFrame, where: Option[String] = None): Option[S] = {
     val aggregations = aggregationFunctions()
     val result = data.agg(aggregations.head, aggregations.tail: _*).collect().head
     fromAggregationResult(result, 0)
@@ -255,12 +256,18 @@ case class NumMatchesAndCount(numMatches: Long, count: Long, override val fullCo
   }
 }
 
-case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore)
+case class AnalyzerOptions(nullBehavior: NullBehavior = NullBehavior.Ignore,
+                           filteredRow: FilteredRow = FilteredRow.TRUE)
 object NullBehavior extends Enumeration {
   type NullBehavior = Value
   val Ignore, EmptyString, Fail = Value
 }
 
+object FilteredRow extends Enumeration {
+  type FilteredRow = Value
+  val NULL, TRUE = Value
+}
+
 /** Base class for analyzers that compute ratios of matching predicates */
 abstract class PredicateMatchingAnalyzer(
     name: String,
@@ -490,6 +497,18 @@ private[deequ] object Analyzers {
     conditionalSelectionFromColumns(selection, conditionColumn)
   }
 
+  def conditionalSelectionFilteredFromColumns(
+                                       selection: Column,
+                                       conditionColumn: Option[Column],
+                                       filterTreatment: String)
+  : Column = {
+    conditionColumn
+      .map { condition => {
+        when(not(condition), expr(filterTreatment)).when(condition, selection)
+      } }
+      .getOrElse(selection)
+  }
+
   private[this] def conditionalSelectionFromColumns(
       selection: Column,
       conditionColumn: Option[Column])
diff --git a/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala b/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala
index 5e80e2f6e..399cbb06a 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/Completeness.scala
@@ -20,19 +20,21 @@ import com.amazon.deequ.analyzers.Preconditions.{hasColumn, isNotNested}
 import org.apache.spark.sql.functions.sum
 import org.apache.spark.sql.types.{IntegerType, StructType}
 import Analyzers._
+import com.amazon.deequ.analyzers.FilteredRow.FilteredRow
 import com.google.common.annotations.VisibleForTesting
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.{Column, Row}
 
 /** Completeness is the fraction of non-null values in a column of a DataFrame. */
-case class Completeness(column: String, where: Option[String] = None) extends
+case class Completeness(column: String, where: Option[String] = None,
+                        analyzerOptions: Option[AnalyzerOptions] = None) extends
   StandardScanShareableAnalyzer[NumMatchesAndCount]("Completeness", column) with
   FilterableAnalyzer {
 
   override def fromAggregationResult(result: Row, offset: Int): Option[NumMatchesAndCount] = {
-
     ifNoNullsIn(result, offset, howMany = 2) { _ =>
-      NumMatchesAndCount(result.getLong(offset), result.getLong(offset + 1), Some(criterion))
+      NumMatchesAndCount(result.getLong(offset), result.getLong(offset + 1), Some(rowLevelResults))
     }
   }
 
@@ -51,4 +53,16 @@ case class Completeness(column: String, where: Option[String] = None) extends
 
   @VisibleForTesting // required by some tests that compare analyzer results to an expected state
   private[deequ] def criterion: Column = conditionalSelection(column, where).isNotNull
+
+  @VisibleForTesting
+  private[deequ] def rowLevelResults: Column = {
+    val whereCondition = where.map { expression => expr(expression)}
+    conditionalSelectionFilteredFromColumns(col(column).isNotNull, whereCondition, getRowLevelFilterTreatment.toString)
+  }
+
+  private def getRowLevelFilterTreatment: FilteredRow = {
+    analyzerOptions
+      .map { options => options.filteredRow }
+      .getOrElse(FilteredRow.TRUE)
+  }
 }
diff --git a/src/main/scala/com/amazon/deequ/analyzers/CustomSql.scala b/src/main/scala/com/amazon/deequ/analyzers/CustomSql.scala
index b8dc2692a..e07e2d11f 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/CustomSql.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/CustomSql.scala
@@ -33,7 +33,7 @@ case class CustomSql(expression: String) extends Analyzer[CustomSqlState, Double
    * @param data data frame
    * @return
    */
-  override def computeStateFrom(data: DataFrame): Option[CustomSqlState] = {
+  override def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None): Option[CustomSqlState] = {
 
     Try {
       data.sqlContext.sql(expression)
diff --git a/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchAnalyzer.scala b/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchAnalyzer.scala
index cdf0e5061..f2aefb57f 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchAnalyzer.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/DatasetMatchAnalyzer.scala
@@ -69,7 +69,7 @@ case class DatasetMatchAnalyzer(dfToCompare: DataFrame,
                                 matchColumnMappings: Option[Map[String, String]] = None)
   extends Analyzer[DatasetMatchState, DoubleMetric] {
 
-  override def computeStateFrom(data: DataFrame): Option[DatasetMatchState] = {
+  override def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None): Option[DatasetMatchState] = {
 
     val result = if (matchColumnMappings.isDefined) {
       DataSynchronization.columnMatch(data, dfToCompare, columnMappings, matchColumnMappings.get, assertion)
diff --git a/src/main/scala/com/amazon/deequ/analyzers/GroupingAnalyzers.scala b/src/main/scala/com/amazon/deequ/analyzers/GroupingAnalyzers.scala
index 2090d8231..30bd89621 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/GroupingAnalyzers.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/GroupingAnalyzers.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.functions.count
 import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.functions.when
 
 /** Base class for all analyzers that operate the frequencies of groups in the data */
 abstract class FrequencyBasedAnalyzer(columnsToGroupOn: Seq[String])
@@ -39,8 +40,9 @@ abstract class FrequencyBasedAnalyzer(columnsToGroupOn: Seq[String])
 
   override def groupingColumns(): Seq[String] = { columnsToGroupOn }
 
-  override def computeStateFrom(data: DataFrame): Option[FrequenciesAndNumRows] = {
-    Some(FrequencyBasedAnalyzer.computeFrequencies(data, groupingColumns()))
+  override def computeStateFrom(data: DataFrame,
+                                filterCondition: Option[String] = None): Option[FrequenciesAndNumRows] = {
+    Some(FrequencyBasedAnalyzer.computeFrequencies(data, groupingColumns(), filterCondition))
   }
 
   /** We need at least one grouping column, and all specified columns must exist */
@@ -88,7 +90,15 @@ object FrequencyBasedAnalyzer {
       .count()
 
     // Set rows with value count 1 to true, and otherwise false
-    val fullColumn: Column = count(UNIQUENESS_ID).over(Window.partitionBy(columnsToGroupBy: _*))
+    val fullColumn: Column = {
+      val window = Window.partitionBy(columnsToGroupBy: _*)
+      where.map {
+        condition => {
+          count(when(expr(condition), UNIQUENESS_ID)).over(window)
+        }
+      }.getOrElse(count(UNIQUENESS_ID).over(window))
+    }
+
     FrequenciesAndNumRows(frequencies, numRows, Option(fullColumn))
   }
 
diff --git a/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala b/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala
index 42a7e72e5..742b2ba68 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/Histogram.scala
@@ -59,7 +59,8 @@ case class Histogram(
     }
   }
 
-  override def computeStateFrom(data: DataFrame): Option[FrequenciesAndNumRows] = {
+  override def computeStateFrom(data: DataFrame,
+                                filterCondition: Option[String] = None): Option[FrequenciesAndNumRows] = {
 
     // TODO figure out a way to pass this in if its known before hand
     val totalCount = if (computeFrequenciesAsRatio) {
diff --git a/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala b/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala
index d3c8aeb68..c2fce1f14 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/UniqueValueRatio.scala
@@ -17,13 +17,17 @@
 package com.amazon.deequ.analyzers
 
 import com.amazon.deequ.analyzers.Analyzers.COUNT_COL
+import com.amazon.deequ.analyzers.FilteredRow.FilteredRow
 import com.amazon.deequ.metrics.DoubleMetric
+import org.apache.spark.sql.functions.expr
+import org.apache.spark.sql.functions.not
 import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.{Column, Row}
 import org.apache.spark.sql.functions.{col, count, lit, sum}
 import org.apache.spark.sql.types.DoubleType
 
-case class UniqueValueRatio(columns: Seq[String], where: Option[String] = None)
+case class UniqueValueRatio(columns: Seq[String], where: Option[String] = None,
+                            analyzerOptions: Option[AnalyzerOptions] = None)
   extends ScanShareableFrequencyBasedAnalyzer("UniqueValueRatio", columns)
   with FilterableAnalyzer {
 
@@ -34,11 +38,27 @@ case class UniqueValueRatio(columns: Seq[String], where: Option[String] = None)
   override def fromAggregationResult(result: Row, offset: Int, fullColumn: Option[Column] = None): DoubleMetric = {
     val numUniqueValues = result.getDouble(offset)
     val numDistinctValues = result.getLong(offset + 1).toDouble
-    val fullColumnUniqueness = when((fullColumn.getOrElse(null)).equalTo(1), true).otherwise(false)
-    toSuccessMetric(numUniqueValues / numDistinctValues, Option(fullColumnUniqueness))
+    val conditionColumn = where.map { expression => expr(expression) }
+    val fullColumnUniqueness = fullColumn.map {
+      rowLevelColumn => {
+        conditionColumn.map {
+          condition => {
+            when(not(condition), expr(getRowLevelFilterTreatment.toString))
+              .when(rowLevelColumn.equalTo(1), true).otherwise(false)
+          }
+        }.getOrElse(when(rowLevelColumn.equalTo(1), true).otherwise(false))
+      }
+    }
+    toSuccessMetric(numUniqueValues / numDistinctValues, fullColumnUniqueness)
   }
 
   override def filterCondition: Option[String] = where
+
+  private def getRowLevelFilterTreatment: FilteredRow = {
+    analyzerOptions
+      .map { options => options.filteredRow }
+      .getOrElse(FilteredRow.TRUE)
+  }
 }
 
 object UniqueValueRatio {
diff --git a/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala b/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala
index 959f4734c..78ba4c418 100644
--- a/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala
+++ b/src/main/scala/com/amazon/deequ/analyzers/Uniqueness.scala
@@ -17,31 +17,52 @@
 package com.amazon.deequ.analyzers
 
 import com.amazon.deequ.analyzers.Analyzers.COUNT_COL
+import com.amazon.deequ.analyzers.FilteredRow.FilteredRow
 import com.amazon.deequ.metrics.DoubleMetric
+import com.google.common.annotations.VisibleForTesting
 import org.apache.spark.sql.Column
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.functions.when
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.functions.not
+import org.apache.spark.sql.functions.expr
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.functions.sum
 import org.apache.spark.sql.types.DoubleType
 
 /** Uniqueness is the fraction of unique values of a column(s), i.e.,
   * values that occur exactly once. */
-case class Uniqueness(columns: Seq[String], where: Option[String] = None)
+case class Uniqueness(columns: Seq[String], where: Option[String] = None,
+                      analyzerOptions: Option[AnalyzerOptions] = None)
   extends ScanShareableFrequencyBasedAnalyzer("Uniqueness", columns)
   with FilterableAnalyzer {
 
   override def aggregationFunctions(numRows: Long): Seq[Column] = {
-    (sum(col(COUNT_COL).equalTo(lit(1)).cast(DoubleType)) / numRows) :: Nil
+   (sum(col(COUNT_COL).equalTo(lit(1)).cast(DoubleType)) / numRows) :: Nil
   }
 
   override def fromAggregationResult(result: Row, offset: Int, fullColumn: Option[Column]): DoubleMetric = {
-    val fullColumnUniqueness = when((fullColumn.getOrElse(null)).equalTo(1), true).otherwise(false)
-    super.fromAggregationResult(result, offset, Option(fullColumnUniqueness))
+    val conditionColumn = where.map { expression => expr(expression) }
+    val fullColumnUniqueness = fullColumn.map {
+      rowLevelColumn => {
+        conditionColumn.map {
+          condition => {
+            when(not(condition), expr(getRowLevelFilterTreatment.toString))
+              .when(rowLevelColumn.equalTo(1), true).otherwise(false)
+          }
+        }.getOrElse(when(rowLevelColumn.equalTo(1), true).otherwise(false))
+      }
+    }
+    super.fromAggregationResult(result, offset, fullColumnUniqueness)
   }
 
   override def filterCondition: Option[String] = where
+
+  private def getRowLevelFilterTreatment: FilteredRow = {
+    analyzerOptions
+      .map { options => options.filteredRow }
+      .getOrElse(FilteredRow.TRUE)
+  }
 }
 
 object Uniqueness {
diff --git a/src/main/scala/com/amazon/deequ/checks/Check.scala b/src/main/scala/com/amazon/deequ/checks/Check.scala
index 884041469..9921e7f8e 100644
--- a/src/main/scala/com/amazon/deequ/checks/Check.scala
+++ b/src/main/scala/com/amazon/deequ/checks/Check.scala
@@ -132,10 +132,12 @@ case class Check(
     *
     * @param column Column to run the assertion on
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     * @return
     */
-  def isComplete(column: String, hint: Option[String] = None): CheckWithLastConstraintFilterable = {
-    addFilterableConstraint { filter => completenessConstraint(column, Check.IsOne, filter, hint) }
+  def isComplete(column: String, hint: Option[String] = None,
+                 analyzerOptions: Option[AnalyzerOptions] = None): CheckWithLastConstraintFilterable = {
+    addFilterableConstraint { filter => completenessConstraint(column, Check.IsOne, filter, hint, analyzerOptions) }
   }
 
   /**
@@ -146,14 +148,16 @@ case class Check(
     * @param column    Column to run the assertion on
     * @param assertion Function that receives a double input parameter and returns a boolean
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     * @return
     */
   def hasCompleteness(
       column: String,
       assertion: Double => Boolean,
-      hint: Option[String] = None)
+      hint: Option[String] = None,
+      analyzerOptions: Option[AnalyzerOptions] = None)
     : CheckWithLastConstraintFilterable = {
-    addFilterableConstraint { filter => completenessConstraint(column, assertion, filter, hint) }
+    addFilterableConstraint { filter => completenessConstraint(column, assertion, filter, hint, analyzerOptions) }
   }
 
   /**
@@ -221,11 +225,13 @@ case class Check(
     *
     * @param column Column to run the assertion on
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     * @return
     */
-  def isUnique(column: String, hint: Option[String] = None): CheckWithLastConstraintFilterable = {
+  def isUnique(column: String, hint: Option[String] = None,
+               analyzerOptions: Option[AnalyzerOptions] = None): CheckWithLastConstraintFilterable = {
     addFilterableConstraint { filter =>
-      uniquenessConstraint(Seq(column), Check.IsOne, filter, hint) }
+      uniquenessConstraint(Seq(column), Check.IsOne, filter, hint, analyzerOptions) }
   }
 
   /**
@@ -269,6 +275,24 @@ case class Check(
     addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter) }
   }
 
+  /**
+   * Creates a constraint that asserts on uniqueness in a single or combined set of key columns.
+   *
+   * @param columns         Key columns
+   * @param assertion       Function that receives a double input parameter and returns a boolean.
+   *                        Refers to the fraction of unique values
+   * @param hint            A hint to provide additional context why a constraint could have failed
+   * @return
+   */
+  def hasUniqueness(
+                     columns: Seq[String],
+                     assertion: Double => Boolean,
+                     hint: Option[String])
+  : CheckWithLastConstraintFilterable = {
+
+    addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter, hint) }
+  }
+
   /**
     * Creates a constraint that asserts on uniqueness in a single or combined set of key columns.
     *
@@ -276,15 +300,17 @@ case class Check(
     * @param assertion Function that receives a double input parameter and returns a boolean.
     *                  Refers to the fraction of unique values
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     * @return
     */
   def hasUniqueness(
       columns: Seq[String],
       assertion: Double => Boolean,
-      hint: Option[String])
+      hint: Option[String],
+      analyzerOptions: Option[AnalyzerOptions])
     : CheckWithLastConstraintFilterable = {
 
-    addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter, hint) }
+    addFilterableConstraint { filter => uniquenessConstraint(columns, assertion, filter, hint, analyzerOptions) }
   }
 
   /**
@@ -314,6 +340,22 @@ case class Check(
     hasUniqueness(Seq(column), assertion, hint)
   }
 
+  /**
+   * Creates a constraint that asserts on the uniqueness of a key column.
+   *
+   * @param column          Key column
+   * @param assertion       Function that receives a double input parameter and returns a boolean.
+   *                        Refers to the fraction of unique values.
+   * @param hint            A hint to provide additional context why a constraint could have failed
+   * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
+   * @return
+   */
+  def hasUniqueness(column: String, assertion: Double => Boolean, hint: Option[String],
+                    analyzerOptions: Option[AnalyzerOptions])
+  : CheckWithLastConstraintFilterable = {
+    hasUniqueness(Seq(column), assertion, hint, analyzerOptions)
+  }
+
   /**
     * Creates a constraint on the distinctness in a single or combined set of key columns.
     *
@@ -601,6 +643,7 @@ case class Check(
     * @param column Column to run the assertion on
     * @param assertion Function that receives a double input parameter and returns a boolean
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     * @return
     */
   def hasMinLength(
@@ -619,6 +662,7 @@ case class Check(
     * @param column Column to run the assertion on
     * @param assertion Function that receives a double input parameter and returns a boolean
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     * @return
     */
   def hasMaxLength(
diff --git a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala
index 74070687e..d17ee9abe 100644
--- a/src/main/scala/com/amazon/deequ/constraints/Constraint.scala
+++ b/src/main/scala/com/amazon/deequ/constraints/Constraint.scala
@@ -192,15 +192,17 @@ object Constraint {
     * @param assertion Function that receives a double input parameter (since the metric is
     *                  double metric) and returns a boolean
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     */
   def completenessConstraint(
       column: String,
       assertion: Double => Boolean,
       where: Option[String] = None,
-      hint: Option[String] = None)
+      hint: Option[String] = None,
+      analyzerOptions: Option[AnalyzerOptions] = None)
     : Constraint = {
 
-    val completeness = Completeness(column, where)
+    val completeness = Completeness(column, where, analyzerOptions)
 
     this.fromAnalyzer(completeness, assertion, hint)
   }
@@ -242,15 +244,17 @@ object Constraint {
     *                  (since the metric is double metric) and returns a boolean
     * @param where Additional filter to apply before the analyzer is run.
     * @param hint A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     */
   def uniquenessConstraint(
       columns: Seq[String],
       assertion: Double => Boolean,
       where: Option[String] = None,
-      hint: Option[String] = None)
+      hint: Option[String] = None,
+      analyzerOptions: Option[AnalyzerOptions] = None)
     : Constraint = {
 
-    val uniqueness = Uniqueness(columns, where)
+    val uniqueness = Uniqueness(columns, where, analyzerOptions)
 
     fromAnalyzer(uniqueness, assertion, hint)
   }
@@ -528,6 +532,7 @@ object Constraint {
     * @param column Column to run the assertion on
     * @param assertion Function that receives a double input parameter and returns a boolean
     * @param hint    A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     */
   def maxLengthConstraint(
       column: String,
@@ -562,6 +567,7 @@ object Constraint {
     * @param column Column to run the assertion on
     * @param assertion Function that receives a double input parameter and returns a boolean
     * @param hint    A hint to provide additional context why a constraint could have failed
+    * @param analyzerOptions Options to configure analyzer behavior (NullTreatment, FilteredRow)
     */
   def minLengthConstraint(
       column: String,
diff --git a/src/test/scala/com/amazon/deequ/VerificationResultTest.scala b/src/test/scala/com/amazon/deequ/VerificationResultTest.scala
index 93aa73201..0a90c8f77 100644
--- a/src/test/scala/com/amazon/deequ/VerificationResultTest.scala
+++ b/src/test/scala/com/amazon/deequ/VerificationResultTest.scala
@@ -78,6 +78,13 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe
 
           val successMetricsResultsJson = VerificationResult.successMetricsAsJson(results)
 
+          val expectedJsonSet = Set("""{"entity":"Column","instance":"item","name":"Distinctness","value":1.0}""",
+            """{"entity": "Column", "instance":"att2","name":"Completeness","value":1.0}""",
+            """{"entity":"Column","instance":"att1","name":"Completeness","value":1.0}""",
+            """{"entity":"Multicolumn","instance":"att1,att2",
+              "name":"Uniqueness","value":0.25}""",
+            """{"entity":"Dataset","instance":"*","name":"Size","value":4.0}""")
+
           val expectedJson =
             """[{"entity":"Column","instance":"item","name":"Distinctness","value":1.0},
               |{"entity": "Column", "instance":"att2","name":"Completeness","value":1.0},
@@ -123,11 +130,11 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe
 
         import session.implicits._
         val expected = Seq(
-          ("group-1", "Error", "Success", "CompletenessConstraint(Completeness(att1,None))",
+          ("group-1", "Error", "Success", "CompletenessConstraint(Completeness(att1,None,None))",
             "Success", ""),
           ("group-2-E", "Error", "Error", "SizeConstraint(Size(None))", "Failure",
             "Value: 4 does not meet the constraint requirement! Should be greater than 5!"),
-          ("group-2-E", "Error", "Error", "CompletenessConstraint(Completeness(att2,None))",
+          ("group-2-E", "Error", "Error", "CompletenessConstraint(Completeness(att2,None,None))",
             "Success", ""),
           ("group-2-W", "Warning", "Warning",
             "DistinctnessConstraint(Distinctness(List(item),None))",
@@ -150,7 +157,7 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe
 
           val expectedJson =
             """[{"check":"group-1","check_level":"Error","check_status":"Success",
-              |"constraint":"CompletenessConstraint(Completeness(att1,None))",
+              |"constraint":"CompletenessConstraint(Completeness(att1,None,None))",
               |"constraint_status":"Success","constraint_message":""},
               |
               |{"check":"group-2-E","check_level":"Error","check_status":"Error",
@@ -159,7 +166,7 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe
               | Should be greater than 5!"},
               |
               |{"check":"group-2-E","check_level":"Error","check_status":"Error",
-              |"constraint":"CompletenessConstraint(Completeness(att2,None))",
+              |"constraint":"CompletenessConstraint(Completeness(att2,None,None))",
               |"constraint_status":"Success","constraint_message":""},
               |
               |{"check":"group-2-W","check_level":"Warning","check_status":"Warning",
@@ -214,7 +221,6 @@ class VerificationResultTest extends WordSpec with Matchers with SparkContextSpe
   }
 
   private[this] def assertSameResultsJson(jsonA: String, jsonB: String): Unit = {
-    assert(SimpleResultSerde.deserialize(jsonA) ==
-      SimpleResultSerde.deserialize(jsonB))
+    assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet))
   }
 }
diff --git a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
index a468b8a34..932c82988 100644
--- a/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
+++ b/src/test/scala/com/amazon/deequ/VerificationSuiteTest.scala
@@ -304,6 +304,91 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
       assert(Seq(true, true, true, false, false, false).sameElements(rowLevel8))
     }
 
+    "generate a result that contains row-level results with true for filtered rows" in withSparkSession { session =>
+      val data = getDfCompleteAndInCompleteColumns(session)
+
+      val completeness = new Check(CheckLevel.Error, "rule1")
+        .hasCompleteness("att2", _ > 0.7, None)
+        .where("att1 = \"a\"")
+      val uniqueness = new Check(CheckLevel.Error, "rule2")
+        .hasUniqueness("att1", _ > 0.5, None)
+      val uniquenessWhere = new Check(CheckLevel.Error, "rule3")
+        .isUnique("att1")
+        .where("item < 3")
+      val expectedColumn1 = completeness.description
+      val expectedColumn2 = uniqueness.description
+      val expectedColumn3 = uniquenessWhere.description
+
+
+      val suite = new VerificationSuite().onData(data)
+        .addCheck(completeness)
+        .addCheck(uniqueness)
+        .addCheck(uniquenessWhere)
+
+      val result: VerificationResult = suite.run()
+
+      assert(result.status == CheckStatus.Error)
+
+      val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data).orderBy("item")
+      resultData.show(false)
+      val expectedColumns: Set[String] =
+        data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3
+      assert(resultData.columns.toSet == expectedColumns)
+
+      val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getAs[Any](0))
+      assert(Seq(true, true, false, true, true, true).sameElements(rowLevel1))
+
+      val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getAs[Any](0))
+      assert(Seq(false, false, false, false, false, false).sameElements(rowLevel2))
+
+      val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getAs[Any](0))
+      assert(Seq(true, true, true, true, true, true).sameElements(rowLevel3))
+
+    }
+
+    "generate a result that contains row-level results with null for filtered rows" in withSparkSession { session =>
+      val data = getDfCompleteAndInCompleteColumns(session)
+
+      val analyzerOptions = Option(AnalyzerOptions(filteredRow = FilteredRow.NULL))
+
+      val completeness = new Check(CheckLevel.Error, "rule1")
+        .hasCompleteness("att2", _ > 0.7, None, analyzerOptions)
+        .where("att1 = \"a\"")
+      val uniqueness = new Check(CheckLevel.Error, "rule2")
+        .hasUniqueness("att1", _ > 0.5, None, analyzerOptions)
+      val uniquenessWhere = new Check(CheckLevel.Error, "rule3")
+        .isUnique("att1", None, analyzerOptions)
+        .where("item < 3")
+      val expectedColumn1 = completeness.description
+      val expectedColumn2 = uniqueness.description
+      val expectedColumn3 = uniquenessWhere.description
+
+      val suite = new VerificationSuite().onData(data)
+        .addCheck(completeness)
+        .addCheck(uniqueness)
+        .addCheck(uniquenessWhere)
+
+      val result: VerificationResult = suite.run()
+
+      assert(result.status == CheckStatus.Error)
+
+      val resultData = VerificationResult.rowLevelResultsAsDataFrame(session, result, data).orderBy("item")
+      resultData.show(false)
+      val expectedColumns: Set[String] =
+        data.columns.toSet + expectedColumn1 + expectedColumn2 + expectedColumn3
+      assert(resultData.columns.toSet == expectedColumns)
+
+      val rowLevel1 = resultData.select(expectedColumn1).collect().map(r => r.getAs[Any](0))
+      assert(Seq(true, null, false, true, null, true).sameElements(rowLevel1))
+
+      val rowLevel2 = resultData.select(expectedColumn2).collect().map(r => r.getAs[Any](0))
+      assert(Seq(false, false, false, false, false, false).sameElements(rowLevel2))
+
+      val rowLevel3 = resultData.select(expectedColumn3).collect().map(r => r.getAs[Any](0))
+      assert(Seq(true, true, null, null, null, null).sameElements(rowLevel3))
+
+    }
+
     "generate a result that contains row-level results for null column values" in withSparkSession { session =>
       val data = getDfCompleteAndInCompleteColumnsAndVarLengthStrings(session)
 
@@ -459,6 +544,38 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
 
     }
 
+    "accept analysis config for mandatory analysis for checks with filters" in withSparkSession { sparkSession =>
+
+      import sparkSession.implicits._
+      val df = getDfCompleteAndInCompleteColumns(sparkSession)
+
+      val result = {
+        val checkToSucceed = Check(CheckLevel.Error, "group-1")
+          .hasCompleteness("att2", _ > 0.7, null) // 0.75
+          .where("att1 = \"a\"")
+        val uniquenessCheck = Check(CheckLevel.Error, "group-2")
+          .isUnique("att1")
+          .where("item < 3")
+
+
+        VerificationSuite().onData(df).addCheck(checkToSucceed).addCheck(uniquenessCheck).run()
+      }
+
+      assert(result.status == CheckStatus.Success)
+
+      val analysisDf = AnalyzerContext.successMetricsAsDataFrame(sparkSession,
+        AnalyzerContext(result.metrics))
+
+      val expected = Seq(
+        ("Column", "att2", "Completeness (where: att1 = \"a\")", 0.75),
+        ("Column", "att1", "Uniqueness (where: item < 3)", 1.0))
+        .toDF("entity", "instance", "name", "value")
+
+
+      assertSameRows(analysisDf, expected)
+
+    }
+
     "run the analysis even there are no constraints" in withSparkSession { sparkSession =>
 
       import sparkSession.implicits._
@@ -786,6 +903,33 @@ class VerificationSuiteTest extends WordSpec with Matchers with SparkContextSpec
       }
     }
 
+    "A well-defined check should pass even if an ill-defined check is also configured quotes" in withSparkSession {
+      sparkSession =>
+        val df = getDfWithDistinctValuesQuotes(sparkSession)
+
+        val rangeCheck = Check(CheckLevel.Error, "a")
+          .isContainedIn("att2", Array("can't", "help", "but", "wouldn't"))
+
+        val reasonCheck = Check(CheckLevel.Error, "a")
+          .isContainedIn("reason", Array("Already Has ", " Can't Proceed"))
+
+        val verificationResult = VerificationSuite()
+          .onData(df)
+          .addCheck(rangeCheck)
+          .addCheck(reasonCheck)
+          .run()
+
+        val checkSuccessResult = verificationResult.checkResults(rangeCheck)
+        checkSuccessResult.constraintResults.map(_.message) shouldBe List(None)
+        println(checkSuccessResult.constraintResults.map(_.message))
+        assert(checkSuccessResult.status == CheckStatus.Success)
+
+        val reasonResult = verificationResult.checkResults(reasonCheck)
+        checkSuccessResult.constraintResults.map(_.message) shouldBe List(None)
+        println(checkSuccessResult.constraintResults.map(_.message))
+        assert(checkSuccessResult.status == CheckStatus.Success)
+    }
+
     "A well-defined check should pass even if an ill-defined check is also configured" in withSparkSession {
       sparkSession =>
         val df = getDfWithNameAndAge(sparkSession)
diff --git a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala
index 03787b886..1c0b28d1a 100644
--- a/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala
+++ b/src/test/scala/com/amazon/deequ/analyzers/AnalyzerTests.scala
@@ -63,7 +63,9 @@ class AnalyzerTests extends AnyWordSpec with Matchers with SparkContextSpec with
       val result2 = Completeness("att2").calculate(dfMissing)
       assert(result2 == DoubleMetric(Entity.Column,
         "Completeness", "att2", Success(0.75), result2.fullColumn))
-
+      val result3 = Completeness("att2", Option("att1 is NOT NULL")).calculate(dfMissing)
+      assert(result3 == DoubleMetric(Entity.Column,
+      "Completeness", "att2", Success(4.0/6.0), result3.fullColumn))
     }
 
     "fail on wrong column input" in withSparkSession { sparkSession =>
diff --git a/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala b/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala
index b1cdf3014..54e26f867 100644
--- a/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala
+++ b/src/test/scala/com/amazon/deequ/analyzers/CompletenessTest.scala
@@ -23,6 +23,8 @@ import com.amazon.deequ.utils.FixtureSupport
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpec
 
+import scala.util.Success
+
 class CompletenessTest extends AnyWordSpec with Matchers with SparkContextSpec with FixtureSupport {
 
   "Completeness" should {
@@ -37,5 +39,36 @@ class CompletenessTest extends AnyWordSpec with Matchers with SparkContextSpec w
       data.withColumn("new", metric.fullColumn.get).collect().map(_.getAs[Boolean]("new")) shouldBe
         Seq(true, true, true, true, false, true, true, false)
     }
+
+    "return row-level results for columns filtered as null" in withSparkSession { session =>
+
+      val data = getDfCompleteAndInCompleteColumns(session)
+
+      // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder
+      val completenessAtt2 = Completeness("att2", Option("att1 = \"a\""),
+                              Option(AnalyzerOptions(filteredRow = FilteredRow.NULL)))
+      val state = completenessAtt2.computeStateFrom(data)
+      val metric: DoubleMetric with FullColumn = completenessAtt2.computeMetricFrom(state)
+
+      val df = data.withColumn("new", metric.fullColumn.get)
+      df.show(false)
+      df.collect().map(_.getAs[Any]("new")).toSeq  shouldBe
+        Seq(true, null, false, true, null, true)
+    }
+
+    "return row-level results for columns filtered as true" in withSparkSession { session =>
+
+      val data = getDfCompleteAndInCompleteColumns(session)
+
+      // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder
+      val completenessAtt2 = Completeness("att2", Option("att1 = \"a\""))
+      val state = completenessAtt2.computeStateFrom(data)
+      val metric: DoubleMetric with FullColumn = completenessAtt2.computeMetricFrom(state)
+
+      val df = data.withColumn("new", metric.fullColumn.get)
+      df.show(false)
+      df.collect().map(_.getAs[Any]("new")).toSeq shouldBe
+        Seq(true, true, false, true, true, true)
+    }
   }
 }
diff --git a/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala b/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala
index 5d6d6808f..d50995b55 100644
--- a/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala
+++ b/src/test/scala/com/amazon/deequ/analyzers/UniquenessTest.scala
@@ -117,4 +117,68 @@ class UniquenessTest extends AnyWordSpec with Matchers with SparkContextSpec wit
       .withColumn("new", metric.fullColumn.get).orderBy("unique")
       .collect().map(_.getAs[Boolean]("new")) shouldBe Seq(true, true, true, true, true, true)
   }
+
+  "return filtered row-level results for uniqueness with null" in withSparkSession { session =>
+
+    val data = getDfWithUniqueColumns(session)
+
+    val addressLength = Uniqueness(Seq("onlyUniqueWithOtherNonUnique"), Option("unique < 4"),
+      Option(AnalyzerOptions(filteredRow = FilteredRow.NULL)))
+    val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique < 4"))
+    val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state)
+
+    // Adding column with UNIQUENESS_ID, since it's only added in VerificationResult.getRowLevelResults
+    val resultDf = data.withColumn(UNIQUENESS_ID, monotonically_increasing_id())
+      .withColumn("new", metric.fullColumn.get).orderBy("unique")
+    resultDf
+      .collect().map(_.getAs[Any]("new")) shouldBe Seq(true, true, true, null, null, null)
+  }
+
+  "return filtered row-level results for uniqueness with null on multiple columns" in withSparkSession { session =>
+
+    val data = getDfWithUniqueColumns(session)
+
+    val addressLength = Uniqueness(Seq("halfUniqueCombinedWithNonUnique", "nonUnique"), Option("unique > 2"),
+      Option(AnalyzerOptions(filteredRow = FilteredRow.NULL)))
+    val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique > 2"))
+    val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state)
+
+    // Adding column with UNIQUENESS_ID, since it's only added in VerificationResult.getRowLevelResults
+    val resultDf = data.withColumn(UNIQUENESS_ID, monotonically_increasing_id())
+      .withColumn("new", metric.fullColumn.get).orderBy("unique")
+    resultDf
+      .collect().map(_.getAs[Any]("new")) shouldBe Seq(null, null, true, true, true, true)
+  }
+
+  "return filtered row-level results for uniqueness true null" in withSparkSession { session =>
+
+    val data = getDfWithUniqueColumns(session)
+
+    // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder
+    val addressLength = Uniqueness(Seq("onlyUniqueWithOtherNonUnique"), Option("unique < 4"))
+    val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique < 4"))
+    val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state)
+
+    // Adding column with UNIQUENESS_ID, since it's only added in VerificationResult.getRowLevelResults
+    val resultDf = data.withColumn(UNIQUENESS_ID, monotonically_increasing_id())
+      .withColumn("new", metric.fullColumn.get).orderBy("unique")
+    resultDf
+      .collect().map(_.getAs[Any]("new")) shouldBe Seq(true, true, true, true, true, true)
+  }
+
+  "return filtered row-level results for uniqueness with true on multiple columns" in withSparkSession { session =>
+
+    val data = getDfWithUniqueColumns(session)
+
+    // Explicitly setting RowLevelFilterTreatment for test purposes, this should be set at the VerificationRunBuilder
+    val addressLength = Uniqueness(Seq("halfUniqueCombinedWithNonUnique", "nonUnique"), Option("unique > 2"))
+    val state: Option[FrequenciesAndNumRows] = addressLength.computeStateFrom(data, Option("unique > 2"))
+    val metric: DoubleMetric with FullColumn = addressLength.computeMetricFrom(state)
+
+    // Adding column with UNIQUENESS_ID, since it's only added in VerificationResult.getRowLevelResults
+    val resultDf = data.withColumn(UNIQUENESS_ID, monotonically_increasing_id())
+      .withColumn("new", metric.fullColumn.get).orderBy("unique")
+    resultDf
+      .collect().map(_.getAs[Any]("new")) shouldBe Seq(true, true, true, true, true, true)
+  }
 }
diff --git a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala
index 4ffc9eeb9..ce9bda69b 100644
--- a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala
+++ b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalysisRunnerTests.scala
@@ -137,7 +137,8 @@ class AnalysisRunnerTests extends AnyWordSpec
           UniqueValueRatio(Seq("att1"), Some("att3 > 0")) :: Nil
 
         val (separateResults, numSeparateJobs) = sparkMonitor.withMonitoringSession { stat =>
-          val results = analyzers.map { _.calculate(df) }.toSet
+          val results = analyzers.map { analyzer =>
+            analyzer.calculate(df, filterCondition = analyzer.filterCondition) }.toSet
           (results, stat.jobCount)
         }
 
@@ -160,7 +161,9 @@ class AnalysisRunnerTests extends AnyWordSpec
           UniqueValueRatio(Seq("att1", "att2"), Some("att3 > 0")) :: Nil
 
         val (separateResults, numSeparateJobs) = sparkMonitor.withMonitoringSession { stat =>
-          val results = analyzers.map { _.calculate(df) }.toSet
+          val results = analyzers.map { analyzer =>
+            analyzer.calculate(df, filterCondition = analyzer.filterCondition)
+          }.toSet
           (results, stat.jobCount)
         }
 
@@ -184,7 +187,9 @@ class AnalysisRunnerTests extends AnyWordSpec
           Uniqueness("att1", Some("att3 = 0")) :: Nil
 
         val (separateResults, numSeparateJobs) = sparkMonitor.withMonitoringSession { stat =>
-          val results = analyzers.map { _.calculate(df) }.toSet
+          val results = analyzers.map { analyzer =>
+            analyzer.calculate(df, filterCondition = analyzer.filterCondition)
+          }.toSet
           (results, stat.jobCount)
         }
 
@@ -195,7 +200,14 @@ class AnalysisRunnerTests extends AnyWordSpec
 
         assert(numSeparateJobs == analyzers.length * 2)
         assert(numCombinedJobs == analyzers.length * 2)
-        assert(separateResults.toString == runnerResults.toString)
+        // assert(separateResults == runnerResults.toString)
+        // Used to be tested with the above line, but adding filters changed the order of the results.
+        assert(separateResults.asInstanceOf[Set[DoubleMetric]].size ==
+          runnerResults.asInstanceOf[Set[DoubleMetric]].size)
+        separateResults.asInstanceOf[Set[DoubleMetric]].foreach( result => {
+            assert(runnerResults.toString.contains(result.toString))
+          }
+          )
       }
 
     "reuse existing results" in
@@ -272,7 +284,7 @@ class AnalysisRunnerTests extends AnyWordSpec
 
         assert(exception.getMessage == "Could not find all necessary results in the " +
           "MetricsRepository, the calculation of the metrics for these analyzers " +
-          "would be needed: Uniqueness(List(item, att2),None), Size(None)")
+          "would be needed: Uniqueness(List(item, att2),None,None), Size(None)")
       }
 
     "save results if specified" in
diff --git a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala
index 254fac9b4..9133d5ae4 100644
--- a/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala
+++ b/src/test/scala/com/amazon/deequ/analyzers/runners/AnalyzerContextTest.scala
@@ -145,7 +145,8 @@ class AnalyzerContextTest extends AnyWordSpec
   }
 
   private[this] def assertSameJson(jsonA: String, jsonB: String): Unit = {
-    assert(SimpleResultSerde.deserialize(jsonA) ==
-      SimpleResultSerde.deserialize(jsonB))
+    assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet))
+    //    assert(SimpleResultSerde.deserialize(jsonA) ==
+    //      SimpleResultSerde.deserialize(jsonB))
   }
 }
diff --git a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala
index 5a21079ae..bc20b0954 100644
--- a/src/test/scala/com/amazon/deequ/checks/CheckTest.scala
+++ b/src/test/scala/com/amazon/deequ/checks/CheckTest.scala
@@ -62,18 +62,39 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
       val check3 = Check(CheckLevel.Warning, "group-2-W")
         .hasCompleteness("att2", _ > 0.8) // 0.75
 
+      val check4 = Check(CheckLevel.Error, "group-3")
+        .isComplete("att2", None) // 1.0 with filter
+        .where("att2 is NOT NULL")
+        .hasCompleteness("att2", _ == 1.0, None) // 1.0 with filter
+        .where("att2 is NOT NULL")
+
       val context = runChecks(getDfCompleteAndInCompleteColumns(sparkSession),
-        check1, check2, check3)
+        check1, check2, check3, check4)
 
       context.metricMap.foreach { println }
 
       assertEvaluatesTo(check1, context, CheckStatus.Success)
       assertEvaluatesTo(check2, context, CheckStatus.Error)
       assertEvaluatesTo(check3, context, CheckStatus.Warning)
+      assertEvaluatesTo(check4, context, CheckStatus.Success)
 
       assert(check1.getRowLevelConstraintColumnNames() == Seq("Completeness-att1", "Completeness-att1"))
       assert(check2.getRowLevelConstraintColumnNames() == Seq("Completeness-att2"))
       assert(check3.getRowLevelConstraintColumnNames() == Seq("Completeness-att2"))
+      assert(check4.getRowLevelConstraintColumnNames() == Seq("Completeness-att2", "Completeness-att2"))
+    }
+
+    "return the correct check status for completeness with where filter" in withSparkSession { sparkSession =>
+
+      val check = Check(CheckLevel.Error, "group-3")
+        .hasCompleteness("ZipCode", _ > 0.6, None) // 1.0 with filter
+        .where("City is NOT NULL")
+
+      val context = runChecks(getDfForWhereClause(sparkSession), check)
+
+      assertEvaluatesTo(check, context, CheckStatus.Success)
+
+      assert(check.getRowLevelConstraintColumnNames() == Seq("Completeness-ZipCode"))
     }
 
     "return the correct check status for combined completeness" in
@@ -164,7 +185,6 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
       assert(constraintStatuses.head == ConstraintStatus.Success)
       assert(constraintStatuses(1) == ConstraintStatus.Success)
       assert(constraintStatuses(2) == ConstraintStatus.Success)
-
       assert(constraintStatuses(3) == ConstraintStatus.Failure)
       assert(constraintStatuses(4) == ConstraintStatus.Failure)
     }
@@ -515,6 +535,14 @@ class CheckTest extends AnyWordSpec with Matchers with SparkContextSpec with Fix
       assertEvaluatesTo(numericRangeCheck9, numericRangeResults, CheckStatus.Success)
     }
 
+    "correctly evaluate range constraints when values have single quote in string" in withSparkSession { sparkSession =>
+      val rangeCheck = Check(CheckLevel.Error, "a")
+        .isContainedIn("att2", Array("can't", "help", "but", "wouldn't"))
+
+      val rangeResults = runChecks(getDfWithDistinctValuesQuotes(sparkSession), rangeCheck)
+      assertEvaluatesTo(rangeCheck, rangeResults, CheckStatus.Success)
+    }
+
     "return the correct check status for histogram constraints" in
       withSparkSession { sparkSession =>
 
diff --git a/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala b/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala
index f8188165c..a7efbe180 100644
--- a/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala
+++ b/src/test/scala/com/amazon/deequ/constraints/AnalysisBasedConstraintTest.scala
@@ -58,7 +58,8 @@ class AnalysisBasedConstraintTest extends WordSpec with Matchers with SparkConte
     override def calculate(
         data: DataFrame,
         stateLoader: Option[StateLoader],
-        statePersister: Option[StatePersister])
+        statePersister: Option[StatePersister],
+        filterCondition: Option[String])
       : DoubleMetric = {
       val value: Try[Double] = Try {
         require(data.columns.contains(column), s"Missing column $column")
@@ -67,11 +68,10 @@ class AnalysisBasedConstraintTest extends WordSpec with Matchers with SparkConte
       DoubleMetric(Entity.Column, "sample", column, value)
     }
 
-    override def computeStateFrom(data: DataFrame): Option[NumMatches] = {
+    override def computeStateFrom(data: DataFrame, filterCondition: Option[String] = None): Option[NumMatches] = {
       throw new NotImplementedError()
     }
 
-
     override def computeMetricFrom(state: Option[NumMatches]): DoubleMetric = {
       throw new NotImplementedError()
     }
diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala
index 6f1fa1874..05f4d47bd 100644
--- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala
+++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultSerdeTest.scala
@@ -363,7 +363,7 @@ class SimpleResultSerdeTest extends WordSpec with Matchers with SparkContextSpec
             .stripMargin.replaceAll("\n", "")
 
       // ordering of map entries is not guaranteed, so comparing strings is not an option
-      assert(SimpleResultSerde.deserialize(sucessMetricsResultJson) ==
-        SimpleResultSerde.deserialize(expected))
+      assert(SimpleResultSerde.deserialize(sucessMetricsResultJson).toSet.sameElements(
+        SimpleResultSerde.deserialize(expected).toSet))
     }
 }
diff --git a/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala b/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala
index 97d7a3c49..d4ce97fcb 100644
--- a/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala
+++ b/src/test/scala/com/amazon/deequ/repository/AnalysisResultTest.scala
@@ -344,7 +344,8 @@ class AnalysisResultTest extends AnyWordSpec
   }
 
   private[this] def assertSameJson(jsonA: String, jsonB: String): Unit = {
-    assert(SimpleResultSerde.deserialize(jsonA) ==
-      SimpleResultSerde.deserialize(jsonB))
+    assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet))
+//    assert(SimpleResultSerde.deserialize(jsonA) ==
+//      SimpleResultSerde.deserialize(jsonB))
   }
 }
diff --git a/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala b/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala
index 6e61b9385..592f27b0e 100644
--- a/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala
+++ b/src/test/scala/com/amazon/deequ/repository/MetricsRepositoryMultipleResultsLoaderTest.scala
@@ -264,7 +264,8 @@ class MetricsRepositoryMultipleResultsLoaderTest extends AnyWordSpec with Matche
   }
 
   private[this] def assertSameJson(jsonA: String, jsonB: String): Unit = {
-    assert(SimpleResultSerde.deserialize(jsonA) ==
-      SimpleResultSerde.deserialize(jsonB))
+    assert(SimpleResultSerde.deserialize(jsonA).toSet.sameElements(SimpleResultSerde.deserialize(jsonB).toSet))
+    //    assert(SimpleResultSerde.deserialize(jsonA) ==
+    //      SimpleResultSerde.deserialize(jsonB))
   }
 }
diff --git a/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala b/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala
index 6a98bf3c6..9a82903e8 100644
--- a/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala
+++ b/src/test/scala/com/amazon/deequ/suggestions/ConstraintSuggestionResultTest.scala
@@ -212,7 +212,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
             """{
               |  "constraint_suggestions": [
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(att2,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))",
               |      "column_name": "att2",
               |      "current_value": "Completeness: 1.0",
               |      "description": "'att2' is not null",
@@ -222,7 +222,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "code_for_constraint": ".isComplete(\"att2\")"
               |    },
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(att1,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))",
               |      "column_name": "att1",
               |      "current_value": "Completeness: 1.0",
               |      "description": "'att1' is not null",
@@ -232,7 +232,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "code_for_constraint": ".isComplete(\"att1\")"
               |    },
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(item,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(item,None,None))",
               |      "column_name": "item",
               |      "current_value": "Completeness: 1.0",
               |      "description": "'item' is not null",
@@ -265,7 +265,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "code_for_constraint": ".isNonNegative(\"item\")"
               |    },
               |    {
-              |      "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None))",
+              |      "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None,None))",
               |      "column_name": "item",
               |      "current_value": "ApproxDistinctness: 1.0",
               |      "description": "'item' is unique",
@@ -294,7 +294,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
             """{
               |  "constraint_suggestions": [
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(att2,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))",
               |      "column_name": "att2",
               |      "current_value": "Completeness: 1.0",
               |      "description": "\u0027att2\u0027 is not null",
@@ -305,7 +305,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "constraint_result_on_test_set": "Failure"
               |    },
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(att1,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))",
               |      "column_name": "att1",
               |      "current_value": "Completeness: 1.0",
               |      "description": "\u0027att1\u0027 is not null",
@@ -316,7 +316,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "constraint_result_on_test_set": "Failure"
               |    },
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(item,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(item,None,None))",
               |      "column_name": "item",
               |      "current_value": "Completeness: 1.0",
               |      "description": "\u0027item\u0027 is not null",
@@ -352,7 +352,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "constraint_result_on_test_set": "Failure"
               |    },
               |    {
-              |      "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None))",
+              |      "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None,None))",
               |      "column_name": "item",
               |      "current_value": "ApproxDistinctness: 1.0",
               |      "description": "\u0027item\u0027 is unique",
@@ -381,7 +381,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
             """{
               |  "constraint_suggestions": [
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(att2,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))",
               |      "column_name": "att2",
               |      "current_value": "Completeness: 1.0",
               |      "description": "\u0027att2\u0027 is not null",
@@ -392,7 +392,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "constraint_result_on_test_set": "Unknown"
               |    },
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(att1,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))",
               |      "column_name": "att1",
               |      "current_value": "Completeness: 1.0",
               |      "description": "\u0027att1\u0027 is not null",
@@ -403,7 +403,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "constraint_result_on_test_set": "Unknown"
               |    },
               |    {
-              |      "constraint_name": "CompletenessConstraint(Completeness(item,None))",
+              |      "constraint_name": "CompletenessConstraint(Completeness(item,None,None))",
               |      "column_name": "item",
               |      "current_value": "Completeness: 1.0",
               |      "description": "\u0027item\u0027 is not null",
@@ -439,7 +439,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
               |      "constraint_result_on_test_set": "Unknown"
               |    },
               |    {
-              |      "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None))",
+              |      "constraint_name": "UniquenessConstraint(Uniqueness(List(item),None,None))",
               |      "column_name": "item",
               |      "current_value": "ApproxDistinctness: 1.0",
               |      "description": "\u0027item\u0027 is unique",
@@ -471,7 +471,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
           """{
             |  "constraint_suggestions": [
             |  {
-            |      "constraint_name": "CompletenessConstraint(Completeness(`item.one`,None))",
+            |      "constraint_name": "CompletenessConstraint(Completeness(`item.one`,None,None))",
             |      "column_name": "`item.one`",
             |      "current_value": "Completeness: 1.0",
             |      "description": "'`item.one`' is not null",
@@ -504,7 +504,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
             |      "code_for_constraint": ".isNonNegative(\"`item.one`\")"
             |    },
             |    {
-            |      "constraint_name": "UniquenessConstraint(Uniqueness(List(`item.one`),None))",
+            |      "constraint_name": "UniquenessConstraint(Uniqueness(List(`item.one`),None,None))",
             |      "column_name": "`item.one`",
             |      "current_value": "ApproxDistinctness: 1.0",
             |      "description": "'`item.one`' is unique",
@@ -515,7 +515,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
             |      "code_for_constraint": ".isUnique(\"`item.one`\")"
             |    },
             |    {
-            |      "constraint_name": "CompletenessConstraint(Completeness(att2,None))",
+            |      "constraint_name": "CompletenessConstraint(Completeness(att2,None,None))",
             |      "column_name": "att2",
             |      "current_value": "Completeness: 1.0",
             |      "description": "'att2' is not null",
@@ -525,7 +525,7 @@ class ConstraintSuggestionResultTest extends WordSpec with Matchers with SparkCo
             |      "code_for_constraint": ".isComplete(\"att2\")"
             |    },
             |    {
-            |      "constraint_name": "CompletenessConstraint(Completeness(att1,None))",
+            |      "constraint_name": "CompletenessConstraint(Completeness(att1,None,None))",
             |      "column_name": "att1",
             |      "current_value": "Completeness: 1.0",
             |      "description": "'att1' is not null",
diff --git a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala
index 9b6ad9d4e..601134a53 100644
--- a/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala
+++ b/src/test/scala/com/amazon/deequ/utils/FixtureSupport.scala
@@ -338,6 +338,19 @@ trait FixtureSupport {
       .toDF("att1", "att2")
   }
 
+  def getDfWithDistinctValuesQuotes(sparkSession: SparkSession): DataFrame = {
+    import sparkSession.implicits._
+
+    Seq(
+      ("a", null, "Already Has "),
+      ("a", null, " Can't Proceed"),
+      (null, "can't", "Already Has "),
+      ("b", "help", " Can't Proceed"),
+      ("b", "but", "Already Has "),
+      ("c", "wouldn't", " Can't Proceed"))
+      .toDF("att1", "att2", "reason")
+  }
+
   def getDfWithConditionallyUninformativeColumns(sparkSession: SparkSession): DataFrame = {
     import sparkSession.implicits._
     Seq(
@@ -409,6 +422,17 @@ trait FixtureSupport {
     ).toDF("item.one", "att1", "att2")
   }
 
+  def getDfForWhereClause(sparkSession: SparkSession): DataFrame = {
+    import sparkSession.implicits._
+
+    Seq(
+      ("Acme", "90210", "CA", "Los Angeles"),
+      ("Acme", "90211", "CA", "Los Angeles"),
+      ("Robocorp", null, "NJ", null),
+      ("Robocorp", null, "NY", "New York")
+    ).toDF("Company", "ZipCode", "State", "City")
+  }
+
   def getDfCompleteAndInCompleteColumnsWithPeriod(sparkSession: SparkSession): DataFrame = {
     import sparkSession.implicits._