diff --git a/pom.xml b/pom.xml
index e4295b6e0..2f4248a42 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,7 +149,11 @@
test
-
+
+ com.amazonaws
+ aws-java-sdk-core
+ 1.12.128
+
diff --git a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala
new file mode 100644
index 000000000..f0182df8c
--- /dev/null
+++ b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala
@@ -0,0 +1,222 @@
+/**
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"). You may not
+ * use this file except in compliance with the License. A copy of the License
+ * is located at
+ *
+ * http://aws.amazon.com/apache2.0/
+ *
+ * or in the "license" file accompanying this file. This file is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+package com.amazon.deequ.repository.rest
+
+import com.amazon.deequ.analyzers.Analyzer
+import com.amazon.deequ.analyzers.runners.AnalyzerContext
+import com.amazon.deequ.metrics.Metric
+import com.amazon.deequ.repository._
+import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler,
+ ExecutionContext, HttpResponse, HttpResponseHandler}
+import com.amazonaws.retry.PredefinedRetryPolicies
+import com.amazonaws.{AmazonClientException, ClientConfiguration, Request}
+import com.google.common.collect.ImmutableList
+import com.google.common.io.Closeables
+import org.apache.commons.io.IOUtils
+
+import java.io.{BufferedInputStream, ByteArrayInputStream}
+
+
+/** A simple Repository implementation using AmazonHttpClient to read and write to API
+ *
+ * readRequest: an endpoint request that read all of the metrics generated so far
+ * writeRequest: an endpoint request that write analyzer metrics
+ * */
+class RestMetricsRepository(readRequest: Request[Void], writeRequest: Request[Void])
+ extends MetricsRepository {
+ /**
+ * Other implementation of this RestApiHelper can be used,
+ * by extending RestApiHelper, and call setApiHelper
+ * */
+ var apiHelper: RestApiHelper = new RestApiHelperImp()
+ /**
+ * Saves Analysis results (metrics)
+ *
+ * @param resultKey A ResultKey that uniquely identifies a AnalysisResult
+ * @param analyzerContext The resulting AnalyzerContext of an Analysis
+ */
+ override def save(resultKey: ResultKey, analyzerContext: AnalyzerContext): Unit = {
+ val successfulMetrics = analyzerContext.metricMap.filter {
+ case (_, metric) => metric.value.isSuccess
+ }
+ val analyzerContextWithSuccessfulValues = AnalyzerContext(successfulMetrics)
+ val serializedResult = AnalysisResultSerde.serialize(
+ Seq(AnalysisResult(resultKey, analyzerContextWithSuccessfulValues))
+ )
+
+ writeRequest.setContent(new ByteArrayInputStream(serializedResult.getBytes("UTF-8")))
+
+ apiHelper.writeHttpRequest(writeRequest)
+ }
+
+ /**
+ * Get a AnalyzerContext saved using exactly the same resultKey if present
+ */
+ override def loadByKey(resultKey: ResultKey): Option[AnalyzerContext] = {
+ load().get().find(_.resultKey == resultKey).map(_.analyzerContext)
+ }
+
+ /** Get a builder class to construct a loading query to get AnalysisResults */
+ override def load(): MetricsRepositoryMultipleResultsLoader = {
+ new RestMetricsRepositoryMultipleResultsLoader(apiHelper, readRequest)
+ }
+
+ /** Set different implementation of RestApiHelper, instead of the default AmazonHttpClient */
+ def setApiHelper(apiHelper: RestApiHelper): Unit = {
+ this.apiHelper = apiHelper
+ }
+}
+
+class RestMetricsRepositoryMultipleResultsLoader(apiHelper: RestApiHelper,
+ readRequest: Request[Void])
+ extends MetricsRepositoryMultipleResultsLoader {
+
+ private[this] var tagValues: Option[Map[String, String]] = None
+ private[this] var forAnalyzers: Option[Seq[Analyzer[_, Metric[_]]]] = None
+ private[this] var before: Option[Long] = None
+ private[this] var after: Option[Long] = None
+
+ /**
+ * Filter out results that don't have specific values for specific tags
+ *
+ * @param tagValues Map with tag names and the corresponding values to filter for
+ */
+ def withTagValues(tagValues: Map[String, String]): MetricsRepositoryMultipleResultsLoader = {
+ this.tagValues = Option(tagValues)
+ this
+ }
+
+ /**
+ * Choose all metrics that you want to load
+ *
+ * @param analyzers A sequence of analyers who's resulting metrics you want to load
+ */
+ def forAnalyzers(analyzers: Seq[Analyzer[_, Metric[_]]])
+ : MetricsRepositoryMultipleResultsLoader = {
+
+ this.forAnalyzers = Option(analyzers)
+ this
+ }
+
+ /**
+ * Only look at AnalysisResults with a result key with a smaller value
+ *
+ * @param dateTime The maximum dateTime of AnalysisResults to look at
+ */
+ def before(dateTime: Long): MetricsRepositoryMultipleResultsLoader = {
+ this.before = Option(dateTime)
+ this
+ }
+
+ /**
+ * Only look at AnalysisResults with a result key with a greater value
+ *
+ * @param dateTime The minimum dateTime of AnalysisResults to look at
+ */
+ def after(dateTime: Long): MetricsRepositoryMultipleResultsLoader = {
+ this.after = Option(dateTime)
+ this
+ }
+
+ /** Get the AnalysisResult */
+ def get(): Seq[AnalysisResult] = {
+ val contentString = apiHelper.readHttpRequest(readRequest, {
+ IOUtils.toString(_, "UTF-8")
+ })
+
+ val allResults = contentString
+ .map { text => AnalysisResultSerde.deserialize(text) }
+ .getOrElse(Seq.empty)
+
+ val selection = allResults
+ .filter { result => after.isEmpty || after.get <= result.resultKey.dataSetDate }
+ .filter { result => before.isEmpty || result.resultKey.dataSetDate <= before.get }
+ .filter { result => tagValues.isEmpty ||
+ tagValues.get.toSet.subsetOf(result.resultKey.tags.toSet) }
+
+ selection
+ .map { analysisResult =>
+
+ val requestedMetrics = analysisResult
+ .analyzerContext
+ .metricMap
+ .filterKeys(analyzer => forAnalyzers.isEmpty || forAnalyzers.get.contains(analyzer))
+
+ val requestedAnalyzerContext = AnalyzerContext(requestedMetrics)
+
+ AnalysisResult(analysisResult.resultKey, requestedAnalyzerContext)
+ }
+ }
+}
+
+trait RestApiHelper {
+ def writeHttpRequest(writeRequest: Request[Void]): Unit
+ def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T): Option[T]
+}
+
+class RestApiHelperImp extends RestApiHelper {
+ private val httpClient = new AmazonHttpClient(new ClientConfiguration()
+ .withRetryPolicy(PredefinedRetryPolicies.DEFAULT))
+
+ override def writeHttpRequest(writeRequest: Request[Void]): Unit = {
+ httpClient
+ .requestExecutionBuilder
+ .executionContext(new ExecutionContext(true))
+ .request(writeRequest)
+ .errorResponseHandler(new HttpResponseHandler[AmazonClientException] {
+ override def handle(response: HttpResponse): AmazonClientException = {
+ throw new AmazonClientException(s"ERROR writing to endpoint: " +
+ s"${writeRequest.getEndpoint}. Code: ${response.getStatusCode}")
+ }
+ override def needsConnectionLeftOpen(): Boolean = false
+ })
+ .execute(new HttpResponseHandler[Unit] {
+ override def handle(response: HttpResponse): Unit = {
+ if (response.getStatusCode != 200) {
+ throw new AmazonClientException(s"ERROR writing to endpoint: " +
+ s"${writeRequest.getEndpoint}. Code: ${response.getStatusCode}")
+ }
+ }
+ override def needsConnectionLeftOpen(): Boolean = false
+ })
+ .getAwsResponse
+ }
+
+ override def readHttpRequest[T](readRequest: Request[Void],
+ readFunc: BufferedInputStream => T): Option[T] = {
+ httpClient
+ .requestExecutionBuilder
+ .executionContext(new ExecutionContext(true))
+ .request(readRequest)
+ .errorResponseHandler(new DefaultErrorResponseHandler(ImmutableList.of()))
+ .execute(new HttpResponseHandler[Option[T]] {
+ override def handle(response: HttpResponse): Option[T] = {
+ if (response.getStatusCode == 200) {
+ val bufferedInputStream = new BufferedInputStream(response.getContent)
+ try {
+ Option(readFunc(bufferedInputStream))
+ } finally {
+ Closeables.close(bufferedInputStream, false)
+ }
+ } else {
+ None
+ }
+ }
+ override def needsConnectionLeftOpen(): Boolean = false
+ }).getAwsResponse
+ }
+}
diff --git a/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala
new file mode 100644
index 000000000..0e84c1f49
--- /dev/null
+++ b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala
@@ -0,0 +1,336 @@
+/**
+ * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"). You may not
+ * use this file except in compliance with the License. A copy of the License
+ * is located at
+ *
+ * http://aws.amazon.com/apache2.0/
+ *
+ * or in the "license" file accompanying this file. This file is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ *
+ */
+
+package com.amazon.deequ.repository.rest
+
+import com.amazon.deequ.SparkContextSpec
+import com.amazon.deequ.analyzers._
+import com.amazon.deequ.analyzers.runners.AnalyzerContext._
+import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext}
+import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric}
+import com.amazon.deequ.repository.{AnalysisResult, AnalysisResultSerde, MetricsRepository, ResultKey}
+import com.amazon.deequ.utils.{FixtureSupport}
+import com.amazonaws.{DefaultRequest, Request}
+import com.amazonaws.http.HttpMethodName
+import com.google.common.io.Closeables
+import org.apache.commons.io.IOUtils
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.scalatest.wordspec.AnyWordSpec
+
+import java.io.{BufferedInputStream, ByteArrayInputStream}
+import java.net.URI
+import java.time.{LocalDate, ZoneOffset}
+import java.util.concurrent.ConcurrentHashMap
+import scala.util.{Failure, Success}
+import scala.collection.JavaConverters._
+import scala.collection.JavaConversions._
+
+class RestMetricsRepositoryTest extends AnyWordSpec
+ with SparkContextSpec with FixtureSupport {
+
+ private[this] val DATE_ONE = createDate(2017, 10, 14)
+ private[this] val DATE_TWO = createDate(2017, 10, 15)
+ private[this] val DATE_THREE = createDate(2017, 10, 16)
+
+ private[this] val REGION_EU = Map("Region" -> "EU")
+ private[this] val REGION_NA = Map("Region" -> "NA")
+
+ private[this] val TOKEN = "sso_token_guid_123"
+ private[this] val ENDPOINT = "https://test.api.com"
+ private[this] val PATH = "/v1/api/metrics"
+
+ "Rest Metric Repository" should {
+
+ "save and retrieve AnalyzerContexts" in withSparkSession { session =>
+ evaluate(session) { (results, repository) =>
+
+ val resultKey = ResultKey(DATE_ONE, REGION_EU)
+ repository.save(resultKey, results)
+
+ val loadedResults = repository.loadByKey(resultKey).get
+
+ val loadedResultsAsDataFrame = successMetricsAsDataFrame(session, loadedResults)
+ val resultsAsDataFrame = successMetricsAsDataFrame(session, results)
+
+ assertSameRows(loadedResultsAsDataFrame, resultsAsDataFrame)
+ assert(results == loadedResults)
+ }
+ }
+
+ "save should ignore failed result metrics when saving" in withSparkSession { session =>
+
+ val metrics: Map[Analyzer[_, Metric[_]], Metric[_]] = Map(
+ Size() -> DoubleMetric(Entity.Column, "Size", "*", Success(5.0)),
+ Completeness("ColumnA") ->
+ DoubleMetric(Entity.Column, "Completeness", "ColumnA",
+ Failure(new RuntimeException("error"))))
+
+ val resultsWithMixedValues = AnalyzerContext(metrics)
+
+ val successMetrics = resultsWithMixedValues.metricMap
+ .filter { case (_, metric) => metric.value.isSuccess }
+
+ val resultsWithSuccessfulValues = AnalyzerContext(successMetrics)
+
+ val repository = createRepository(session)
+
+ val resultKey = ResultKey(DATE_ONE, REGION_EU)
+
+ repository.save(resultKey, resultsWithMixedValues)
+
+ val loadedAnalyzerContext = repository.loadByKey(resultKey).get
+
+ assert(resultsWithSuccessfulValues == loadedAnalyzerContext)
+ }
+
+ "saving should work for very long strings as well" in withSparkSession { session =>
+ evaluate(session) { (results, repository) =>
+
+ (1 to 200).foreach(number => repository.save(ResultKey(number, Map.empty), results))
+
+ val loadedAnalyzerContext = repository.loadByKey(ResultKey(200, Map.empty)).get
+
+ assert(results == loadedAnalyzerContext)
+ }
+ }
+
+ "save and retrieve AnalysisResults" in withSparkSession { session =>
+
+ evaluate(session) { (results, repository) =>
+
+ repository.save(ResultKey(DATE_ONE, REGION_EU), results)
+ repository.save(ResultKey(DATE_TWO, REGION_NA), results)
+
+ val analysisResultsAsDataFrame = repository.load()
+ .after(DATE_ONE)
+ .getSuccessMetricsAsDataFrame(session)
+
+ import session.implicits._
+ val expected = Seq(
+ // First analysisResult
+ ("Dataset", "*", "Size", 4.0, DATE_ONE, "EU"),
+ ("Column", "item", "Distinctness", 1.0, DATE_ONE, "EU"),
+ ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"),
+ ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU"),
+ // Second analysisResult
+ ("Dataset", "*", "Size", 4.0, DATE_TWO, "NA"),
+ ("Column", "item", "Distinctness", 1.0, DATE_TWO, "NA"),
+ ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"),
+ ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA"))
+ .toDF("entity", "instance", "name", "value", "dataset_date", "region")
+
+ assertSameRows(analysisResultsAsDataFrame, expected)
+ }
+ }
+
+ "only load AnalysisResults within a specific time frame if requested" in
+ withSparkSession { sparkSession =>
+
+ evaluate(sparkSession) { (results, repository) =>
+
+ repository.save(ResultKey(DATE_ONE, REGION_EU), results)
+ repository.save(ResultKey(DATE_TWO, REGION_NA), results)
+ repository.save(ResultKey(DATE_THREE, REGION_NA), results)
+
+ val analysisResultsAsDataFrame = repository.load()
+ .after(DATE_TWO)
+ .before(DATE_TWO)
+ .getSuccessMetricsAsDataFrame(sparkSession)
+
+ import sparkSession.implicits._
+ val expected = Seq(
+ // Second analysisResult
+ ("Dataset", "*", "Size", 4.0, DATE_TWO, "NA"),
+ ("Column", "item", "Distinctness", 1.0, DATE_TWO, "NA"),
+ ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"),
+ ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA"))
+ .toDF("entity", "instance", "name", "value", "dataset_date", "region")
+
+ assertSameRows(analysisResultsAsDataFrame, expected)
+ }
+ }
+
+ "only load AnalyzerContexts with specific Tags if requested" in withSparkSession { session =>
+
+ evaluate(session) { (results, repository) =>
+
+ repository.save(ResultKey(DATE_ONE, REGION_EU), results)
+ repository.save(ResultKey(DATE_TWO, REGION_NA), results)
+
+ val analysisResultsAsDataFrame = repository.load()
+ .after(DATE_ONE)
+ .withTagValues(REGION_EU)
+ .getSuccessMetricsAsDataFrame(session)
+
+ import session.implicits._
+ val expected = Seq(
+ // First analysisResult
+ ("Dataset", "*", "Size", 4.0, DATE_ONE, "EU"),
+ ("Column", "item", "Distinctness", 1.0, DATE_ONE, "EU"),
+ ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"),
+ ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU"))
+ .toDF("entity", "instance", "name", "value", "dataset_date", "region")
+
+ assertSameRows(analysisResultsAsDataFrame, expected)
+ }
+ }
+
+ "only include specific metrics in loaded AnalysisResults if requested" in
+ withSparkSession { sparkSession =>
+
+ evaluate(sparkSession) { (results, repository) =>
+
+ repository.save(ResultKey(DATE_ONE, REGION_EU), results)
+ repository.save(ResultKey(DATE_TWO, REGION_NA), results)
+
+ val analysisResultsAsDataFrame = repository.load()
+ .after(DATE_ONE)
+ .forAnalyzers(Seq(Completeness("att1"), Uniqueness(Seq("att1", "att2"))))
+ .getSuccessMetricsAsDataFrame(sparkSession)
+
+ import sparkSession.implicits._
+ val expected = Seq(
+ // First analysisResult
+ ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"),
+ ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU"),
+ // Second analysisResult
+ ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"),
+ ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA"))
+ .toDF("entity", "instance", "name", "value", "dataset_date", "region")
+
+ assertSameRows(analysisResultsAsDataFrame, expected)
+ }
+ }
+
+ "include no metrics in loaded AnalysisResults if requested" in withSparkSession { session =>
+
+ evaluate(session) { (results, repository) =>
+
+ repository.save(ResultKey(DATE_ONE, REGION_EU), results)
+ repository.save(ResultKey(DATE_TWO, REGION_NA), results)
+
+ val analysisResultsAsDataFrame = repository.load()
+ .after(DATE_ONE)
+ .forAnalyzers(Seq.empty)
+ .getSuccessMetricsAsDataFrame(session)
+
+ import session.implicits._
+ val expected = Seq.empty[(String, String, String, Double, Long, String)]
+ .toDF("entity", "instance", "name", "value", "dataset_date", "region")
+
+ assertSameRows(analysisResultsAsDataFrame, expected)
+ }
+ }
+
+ "return empty Seq if load parameters too restrictive" in withSparkSession { session =>
+
+ evaluate(session) { (results, repository) =>
+
+ repository.save(ResultKey(DATE_ONE, REGION_EU), results)
+ repository.save(ResultKey(DATE_TWO, REGION_NA), results)
+
+ val analysisResults = repository.load()
+ .after(DATE_TWO)
+ .before(DATE_ONE)
+ .get()
+
+ assert(analysisResults.isEmpty)
+ }
+ }
+ }
+
+ private[this] def evaluate(session: SparkSession)
+ (test: ( AnalyzerContext, MetricsRepository) => Unit): Unit = {
+
+ val data = getDfFull(session)
+ val results = AnalysisRunner.run(data, createAnalysis())
+ val repository = createRepository(session)
+
+ test(results, repository)
+ }
+
+ private[this] def createAnalysis(): Analysis = {
+ Analysis()
+ .addAnalyzer(Size())
+ .addAnalyzer(Distinctness("item"))
+ .addAnalyzer(Completeness("att1"))
+ .addAnalyzer(Uniqueness(Seq("att1", "att2")))
+ }
+
+ private[this] def createDate(year: Int, month: Int, day: Int): Long = {
+ LocalDate.of(year, month, day).atTime(10, 10, 10).toEpochSecond(ZoneOffset.UTC)
+ }
+
+ private[this] def createRepository(sparkSession: SparkSession): MetricsRepository = {
+ val ssoJwt = "sso-jwt " + TOKEN
+ val headers = Map(
+ "Content-Type" -> "application/json",
+ "Authorization" -> ssoJwt
+ )
+
+ val writeRequest = new DefaultRequest[Void]("execute-api")
+ writeRequest.setHttpMethod(HttpMethodName.POST)
+ writeRequest.setEndpoint(URI.create(ENDPOINT))
+ writeRequest.setResourcePath(PATH)
+ writeRequest.setHeaders(headers.asJava)
+
+ val readRequest = new DefaultRequest[Void]("execute-api")
+ readRequest.setHttpMethod(HttpMethodName.GET)
+ readRequest.setEndpoint(URI.create(ENDPOINT))
+ readRequest.setResourcePath(PATH)
+ readRequest.setHeaders(headers.asJava)
+
+ val repo = new RestMetricsRepository(readRequest = readRequest, writeRequest = writeRequest)
+ repo.setApiHelper(new RestApiHelperMock())
+ repo
+ }
+
+ private[this] def assertSameRows(dataFrameA: DataFrame, dataFrameB: DataFrame): Unit = {
+ assert(dataFrameA.collect().toSet == dataFrameB.collect().toSet)
+ }
+}
+
+class RestApiHelperMock extends RestApiHelper {
+ private val mapRepo = new ConcurrentHashMap[ResultKey, AnalysisResult]()
+
+ override def writeHttpRequest(writeRequest: Request[Void]): Unit = {
+ val contentString = Option(IOUtils.toString(writeRequest.getContent, "UTF-8"))
+ val allResults = contentString
+ .map { text => AnalysisResultSerde.deserialize(text) }
+ .getOrElse(Seq.empty)
+ allResults.foreach(result => mapRepo.put(result.resultKey, result))
+ }
+
+ override def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T):
+ Option[T] = {
+ val analyzerResults = mapRepo.values.map { analysisResult =>
+ val requestedMetrics = analysisResult
+ .analyzerContext
+ .metricMap
+
+ AnalysisResult(analysisResult.resultKey, AnalyzerContext(requestedMetrics))
+ }
+ .toSeq
+ val serializedResult = AnalysisResultSerde.serialize(analyzerResults)
+ val byteArrayInputStream = new ByteArrayInputStream(serializedResult.getBytes("UTF-8"))
+ val bufferedInputStream = new BufferedInputStream(byteArrayInputStream)
+ try {
+ Option(readFunc(bufferedInputStream))
+ } finally {
+ Closeables.close(bufferedInputStream, false)
+ }
+ }
+}