Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
updating anomaly check bounds to not have defaults and require inputs…
Browse files Browse the repository at this point in the history
… for the bound value and isThresholdInclusive, also adding anomaly detection with extended results README, and adding anomaly detection test with 2 anomaly checks on the same suite
arsenalgunnershubert777 committed Nov 4, 2024
1 parent 0f81982 commit fdebce5
Showing 22 changed files with 584 additions and 337 deletions.
Original file line number Diff line number Diff line change
@@ -131,7 +131,8 @@ trait BaseChangeStrategy
(None, false)
}
(outputSequenceIndex, AnomalyDetectionDataPoint(value, change,
Threshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
BoundedRange(lowerBound = Bound(lowerBound, inclusive = true),
upperBound = Bound(upperBound, inclusive = true)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
@@ -113,7 +113,8 @@ case class BatchNormalStrategy(
(None, false)
}
(index, AnomalyDetectionDataPoint(value, value,
Threshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)), isAnomaly, 1.0, detail))
BoundedRange(lowerBound = Bound(lowerBound, inclusive = true),
upperBound = Bound(upperBound, inclusive = true)), isAnomaly, 1.0, detail))
}
}

Original file line number Diff line number Diff line change
@@ -26,26 +26,26 @@ package com.amazon.deequ.anomalydetection
* Anomaly Detection Data Point class
* This class is different from the Anomaly Class in that this class
* wraps around all data points, not just anomalies, and provides extended results including
* if the data point is an anomaly, and the thresholds used in the anomaly calculation.
* if the data point is an anomaly, and the range with bounds used in the anomaly calculation.
*
* @param dataMetricValue The metric value that is the data point.
* @param anomalyMetricValue The metric value that is being used in the anomaly calculation.
* This usually aligns with dataMetricValue but not always,
* like in a rate of change strategy where the rate of change is the anomaly metric
* which may not equal the actual data point value.
* @param anomalyThreshold The thresholds used in the anomaly check, the anomalyMetricValue is
* compared to this threshold.
* @param anomalyCheckRange The range of bounds used in the anomaly check, the anomalyMetricValue is
* compared to this range.
* @param isAnomaly If the data point is an anomaly.
* @param confidence Confidence of anomaly detection.
* @param detail Detailed error message.
*/
class AnomalyDetectionDataPoint(
val dataMetricValue: Double,
val anomalyMetricValue: Double,
val anomalyThreshold: Threshold,
val isAnomaly: Boolean,
val confidence: Double,
val detail: Option[String])
val dataMetricValue: Double,
val anomalyMetricValue: Double,
val anomalyCheckRange: BoundedRange,
val isAnomaly: Boolean,
val confidence: Double,
val detail: Option[String])
{

def canEqual(that: Any): Boolean = {
@@ -64,7 +64,7 @@ class AnomalyDetectionDataPoint(
case anomaly: AnomalyDetectionDataPoint =>
anomaly.dataMetricValue == dataMetricValue &&
anomaly.anomalyMetricValue == anomalyMetricValue &&
anomaly.anomalyThreshold == anomalyThreshold &&
anomaly.anomalyCheckRange == anomalyCheckRange &&
anomaly.isAnomaly == isAnomaly &&
anomaly.confidence == confidence
case _ => false
@@ -76,7 +76,7 @@ class AnomalyDetectionDataPoint(
var result = 1
result = prime * result + dataMetricValue.hashCode()
result = prime * result + anomalyMetricValue.hashCode()
result = prime * result + anomalyThreshold.hashCode()
result = prime * result + anomalyCheckRange.hashCode()
result = prime * result + isAnomaly.hashCode()
result = prime * result + confidence.hashCode()
result
@@ -86,29 +86,29 @@ class AnomalyDetectionDataPoint(

object AnomalyDetectionDataPoint {
def apply(dataMetricValue: Double, anomalyMetricValue: Double,
anomalyThreshold: Threshold = Threshold(), isAnomaly: Boolean = false,
anomalyCheckRange: BoundedRange, isAnomaly: Boolean,
confidence: Double, detail: Option[String] = None
): AnomalyDetectionDataPoint = {
new AnomalyDetectionDataPoint(dataMetricValue, anomalyMetricValue, anomalyThreshold, isAnomaly, confidence, detail)
new AnomalyDetectionDataPoint(dataMetricValue, anomalyMetricValue, anomalyCheckRange, isAnomaly, confidence, detail)
}
}


/**
* Threshold class
* Defines threshold for the anomaly detection, defaults to inclusive bounds of Double.Min and Double.Max.
* BoundedRange class
* Defines range for the anomaly detection.
* @param upperBound The upper bound or threshold.
* @param lowerBound The lower bound or threshold.
*/
case class Threshold(lowerBound: Bound = Bound(Double.MinValue), upperBound: Bound = Bound(Double.MaxValue))
case class BoundedRange(lowerBound: Bound, upperBound: Bound)

/**
* Bound Class
* Class representing a threshold/bound, with value and inclusive/exclusive boolean/
* @param value The value of the bound as a Double.
* @param inclusive Boolean indicating if the Bound is inclusive or not.
*/
case class Bound(value: Double, inclusive: Boolean = true)
case class Bound(value: Double, inclusive: Boolean)



@@ -123,23 +123,21 @@ case class ExtendedDetectionResult(anomalyDetectionDataPointSequence:

/**
* AnomalyDetectionExtendedResult Class
* This class contains anomaly detection extended results through a Sequence of AnomalyDetectionDataPoints.
* This class contains anomaly detection extended results through an AnomalyDetectionDataPoint.
* This is currently an optional field in the ConstraintResult class that is exposed to users.
*
* Currently, anomaly detection only runs on "newest" data point (referring to the dataframe being
* run on by the verification suite) and not multiple data points, so the returned sequence will contain
* run on by the verification suite) and not multiple data points, so this will contain that
* one AnomalyDetectionDataPoint.
* In the future, if we allow the anomaly check to detect multiple points, the returned sequence
* may be more than one AnomalyDetectionDataPoints.
* @param anomalyDetectionDataPoints Sequence of AnomalyDetectionDataPoints.
* @param anomalyDetectionDataPoint AnomalyDetectionDataPoint of newest data point generated from check.
*/
case class AnomalyDetectionExtendedResult(anomalyDetectionDataPoints: Seq[AnomalyDetectionDataPoint])
case class AnomalyDetectionExtendedResult(anomalyDetectionDataPoint: AnomalyDetectionDataPoint)

/**
* AnomalyDetectionAssertionResult Class
* This class is returned by the assertion function Check.isNewestPointNonAnomalousWithExtendedResults.
* @param hasNoAnomaly Boolean indicating if there was no anomaly detected.
* @param hasAnomaly Boolean indicating if there was an anomaly detected.
* @param anomalyDetectionExtendedResult AnomalyDetectionExtendedResults class.
*/
case class AnomalyDetectionAssertionResult(hasNoAnomaly: Boolean,
case class AnomalyDetectionAssertionResult(hasAnomaly: Boolean,
anomalyDetectionExtendedResult: AnomalyDetectionExtendedResult)
Original file line number Diff line number Diff line change
@@ -173,8 +173,8 @@ case class OnlineNormalStrategy(
val value = dataSeries(index)

(index, AnomalyDetectionDataPoint(value, value,
Threshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)),
calcRes.isAnomaly, 1.0, detail))
BoundedRange(lowerBound = Bound(lowerBound, inclusive = true),
upperBound = Bound(upperBound, inclusive = true)), calcRes.isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
@@ -78,8 +78,8 @@ case class SimpleThresholdStrategy(
}

(index, AnomalyDetectionDataPoint(value, value,
Threshold(lowerBound = Bound(lowerBound), upperBound = Bound(upperBound)),
isAnomaly, 1.0, detail))
BoundedRange(lowerBound = Bound(lowerBound, inclusive = true),
upperBound = Bound(upperBound, inclusive = true)), isAnomaly, 1.0, detail))
}
}
}
Original file line number Diff line number Diff line change
@@ -17,8 +17,15 @@
package com.amazon.deequ.anomalydetection.seasonal

import breeze.linalg.DenseVector
import breeze.optimize.{ApproximateGradientFunction, DiffFunction, LBFGSB}
import com.amazon.deequ.anomalydetection.{Anomaly, AnomalyDetectionDataPoint, AnomalyDetectionStrategy, AnomalyDetectionStrategyWithExtendedResults, Threshold, Bound}
import breeze.optimize.ApproximateGradientFunction
import breeze.optimize.DiffFunction
import breeze.optimize.LBFGSB
import com.amazon.deequ.anomalydetection.Anomaly
import com.amazon.deequ.anomalydetection.AnomalyDetectionDataPoint
import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy
import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategyWithExtendedResults
import com.amazon.deequ.anomalydetection.BoundedRange
import com.amazon.deequ.anomalydetection.Bound

import collection.mutable.ListBuffer

@@ -202,7 +209,8 @@ class HoltWinters(seriesPeriodicity: Int)
detectionIndex + startIndex -> AnomalyDetectionDataPoint(
dataMetricValue = inputValue,
anomalyMetricValue = anomalyMetricValue,
anomalyThreshold = Threshold(upperBound = Bound(upperBound)),
anomalyCheckRange = BoundedRange(lowerBound = Bound(Double.MinValue, inclusive = true),
upperBound = Bound(upperBound, inclusive = true)),
isAnomaly = isAnomaly,
confidence = 1.0,
detail = detail
24 changes: 16 additions & 8 deletions src/main/scala/com/amazon/deequ/checks/Check.scala
Original file line number Diff line number Diff line change
@@ -25,7 +25,15 @@ import com.amazon.deequ.analyzers.Histogram
import com.amazon.deequ.analyzers.KLLParameters
import com.amazon.deequ.analyzers.Patterns
import com.amazon.deequ.analyzers.State
import com.amazon.deequ.anomalydetection.{AnomalyDetectionAssertionResult, AnomalyDetectionExtendedResult, ExtendedDetectionResult, AnomalyDetectionStrategy, AnomalyDetectionStrategyWithExtendedResults, AnomalyDetector, AnomalyDetectorWithExtendedResults, DataPoint, HistoryUtils}
import com.amazon.deequ.anomalydetection.AnomalyDetectionAssertionResult
import com.amazon.deequ.anomalydetection.AnomalyDetectionExtendedResult
import com.amazon.deequ.anomalydetection.ExtendedDetectionResult
import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategy
import com.amazon.deequ.anomalydetection.AnomalyDetectionStrategyWithExtendedResults
import com.amazon.deequ.anomalydetection.AnomalyDetector
import com.amazon.deequ.anomalydetection.AnomalyDetectorWithExtendedResults
import com.amazon.deequ.anomalydetection.DataPoint
import com.amazon.deequ.anomalydetection.HistoryUtils
import com.amazon.deequ.checks.ColumnCondition.isAnyNotNull
import com.amazon.deequ.checks.ColumnCondition.isEachNotNull
import com.amazon.deequ.constraints.Constraint._
@@ -1487,21 +1495,21 @@ object Check {
*/
private[deequ] def getNewestPointAnomalyResults(extendedDetectionResult: ExtendedDetectionResult):
AnomalyDetectionAssertionResult = {
val (hasNoAnomaly, anomalyDetectionExtendedResults): (Boolean, AnomalyDetectionExtendedResult) = {
val (hasAnomaly, anomalyDetectionExtendedResults): (Boolean, AnomalyDetectionExtendedResult) = {

// Based on upstream code, this anomaly detection data point sequence should never be empty
// Based on upstream code, this anomaly detection data point sequence should never be empty.
require(extendedDetectionResult.anomalyDetectionDataPointSequence != Nil,
"anomalyDetectionDataPoints from AnomalyDetectionExtendedResult cannot be empty")

// get the last anomaly detection data point of sequence (there should only be one element for now)
// and check the isAnomaly boolean, also return the last anomaly detection data point
// wrapped in the anomaly detection extended result class
// Get the last anomaly detection data point of sequence (there should only be one element for now).
// Check the isAnomaly boolean, also return the last anomaly detection data point
// wrapped in the anomaly detection extended result class.
extendedDetectionResult.anomalyDetectionDataPointSequence match {
case _ :+ lastAnomalyDataPointPair =>
(!lastAnomalyDataPointPair._2.isAnomaly, AnomalyDetectionExtendedResult(Seq(lastAnomalyDataPointPair._2)))
(lastAnomalyDataPointPair._2.isAnomaly, AnomalyDetectionExtendedResult(lastAnomalyDataPointPair._2))
}
}
AnomalyDetectionAssertionResult(
hasNoAnomaly = hasNoAnomaly, anomalyDetectionExtendedResult = anomalyDetectionExtendedResults)
hasAnomaly = hasAnomaly, anomalyDetectionExtendedResult = anomalyDetectionExtendedResults)
}
}
Original file line number Diff line number Diff line change
@@ -16,12 +16,14 @@

package com.amazon.deequ.constraints

import com.amazon.deequ.analyzers.{Analyzer, State}
import com.amazon.deequ.analyzers.Analyzer
import com.amazon.deequ.analyzers.State
import com.amazon.deequ.anomalydetection.AnomalyDetectionAssertionResult
import com.amazon.deequ.metrics.Metric
import org.apache.spark.sql.DataFrame

import scala.util.{Failure, Success}
import scala.util.Success
import scala.util.Success

/**
* Case class for anomaly with extended results constraints that provides unified way to access
@@ -76,7 +78,7 @@ private[deequ] case class AnomalyExtendedResultsConstraint[S <: State[S], M, V](
val assertOn = runPickerOnMetric(metricValue)
val anomalyAssertionResult = runAssertion(assertOn)

if (anomalyAssertionResult.hasNoAnomaly) {
if (!anomalyAssertionResult.hasAnomaly) {
ConstraintResult(this, ConstraintStatus.Success, metric = Some(metric),
anomalyDetectionExtendedResultOption = Some(anomalyAssertionResult.anomalyDetectionExtendedResult))
} else {
Original file line number Diff line number Diff line change
@@ -20,7 +20,8 @@ import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.analyzers.Size
import com.amazon.deequ.anomalydetection.RelativeRateOfChangeStrategy
import com.amazon.deequ.checks.CheckStatus._
import com.amazon.deequ.examples.ExampleUtils.{itemsAsDataframe, withSpark}
import com.amazon.deequ.examples.ExampleUtils.itemsAsDataframe
import com.amazon.deequ.examples.ExampleUtils.withSpark
import com.amazon.deequ.repository.ResultKey
import com.amazon.deequ.repository.memory.InMemoryMetricsRepository

@@ -82,9 +83,9 @@ private[examples] object AnomalyDetectionWithExtendedResultsExample extends App
if (verificationResult.status != Success) {
println("Anomaly detected in the Size() metric!")
val anomalyDetectionDataPoint = verificationResult.checkResults.head._2.constraintResults.
head.anomalyDetectionExtendedResultOption.get.anomalyDetectionDataPoints.head
head.anomalyDetectionExtendedResultOption.get.anomalyDetectionDataPoint
println(s"Rate of change of ${anomalyDetectionDataPoint.anomalyMetricValue} was not in " +
s"${anomalyDetectionDataPoint.anomalyThreshold}")
s"${anomalyDetectionDataPoint.anomalyCheckRange}")

/* Lets have a look at the actual metrics. */
metricsRepository
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Anomaly detection

*After reading this page, check out [anomaly checks with extended results](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/anomaly_detection_with_extended_results_example.md) for how to access more details about the anomaly check such as the upper and lower bounds used in the check. This requires using a different method that has the same signature.*

Very often, it is hard to exactly define what constraints we want to evaluate on our data. However, we often have a better understanding of how much change we expect in certain metrics of our data. Therefore, **deequ** supports anomaly detection for data quality metrics. The idea is that we regularly store the metrics of our data in a [MetricsRepository](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/metrics_repository_example.md). Once we do that, we can run anomaly checks that compare the current value of the metric to its values in the past and allow us to detect anomalous changes.

In this simple example, we assume that we compute the size of a dataset every day and we want to ensure that it does not change drastically: the number of rows on a given day should not be more than double of what we have seen on the day before.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Anomaly detection with extended results

Using the `addAnomalyCheckWithExtendedResults` method instead of the original `addAnomalyCheck`method, you can get more
detailed results about the anomaly detection result from the newly created metric. You can get details such as:

- dataMetricValue: The metric value that is the data point.
- anomalyMetricValue: The value of the metric that is being checked, which isn't always equal to the dataMetricValue.
- anomalyCheckRange: The range of bounds used in the anomaly check, the anomalyMetricValue is compared to this range.
- isAnomaly: If the anomalyMetricValue is outside the anomalyCheckRange, this is true.
- confidence: The confidence of the anomaly detection.
- detail: An optional detail message.

These are contained within the AnomalyDetectionDataPoint class.
```scala
class AnomalyDetectionDataPoint(
val dataMetricValue: Double,
val anomalyMetricValue: Double,
val anomalyCheckRange: BoundedRange,
val isAnomaly: Boolean,
val confidence: Double,
val detail: Option[String])

case class BoundedRange(lowerBound: Bound, upperBound: Bound)

case class Bound(value: Double, inclusive: Boolean)
```

In terms of accessing the result, the AnomalyDetectionDataPoint is wrapped in an AnomalyDetectionExtendedResult class
that is an optional field in the ConstraintResult class. The ConstraintResult class is a class that contains the
results of a constraint check.

```scala
case class ConstraintResult(
constraint: Constraint,
status: ConstraintStatus.Value,
message: Option[String] = None,
metric: Option[Metric[_]] = None,
anomalyDetectionExtendedResultOption: Option[AnomalyDetectionExtendedResult] = None)

case class AnomalyDetectionExtendedResult(anomalyDetectionDataPoint: AnomalyDetectionDataPoint)
```


In order to get extended results you need to run your verification suite with
the `addAnomalyCheckWithExtendedResults` method, which has the same method signature as the original `addAnomalyCheck`
method.

```scala
val result = VerificationSuite()
.onData(yesterdaysDataset)
.useRepository(metricsRepository)
.saveOrAppendResult(yesterdaysKey)
.addAnomalyCheckWithExtendedResults(
RelativeRateOfChangeStrategy(maxRateIncrease = Some(2.0)),
Size())
.run()

val anomalyDetectionExtendedResult: AnomalyDetectionExtendedResult = result.checkResults.head._2.constraintResults.head
.anomalyDetectionExtendedResultOption.getOrElse("placeholder to do something else")

val anomalyDetectionDataPoint: AnomalyDetectionDataPoint = anomalyDetectionExtendedResult.anomalyDetectionDataPoint
```

You can access the values of the anomaly detection extended results like the anomalyMetricValue and anomalyCheckRange.
```scala
println(s"Anomaly check range: ${anomalyDetectionDataPoint.anomalyCheckRange}")
println(s"Anomaly metric value: ${anomalyDetectionDataPoint.anomalyMetricValue}")
```

```
Anomaly check range: BoundedRange(Bound(-2.0,true),Bound(2.0,true))
Anomaly metric value: 4.5
```

An [executable version of this example with extended results](https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/examples/AnomalyDetectionWithExtendedResultsExample.scala) is available as part of our code base.
Loading

0 comments on commit fdebce5

Please sign in to comment.