From 5c43d004f3468a7ea3b01cd582bfbe22dbec24c9 Mon Sep 17 00:00:00 2001 From: cmach Date: Tue, 10 Jan 2023 15:05:53 -0800 Subject: [PATCH 1/6] Adding this dockerfile to be able to build the deequ --- Dockerfile | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..5f0ea3eb9 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM maven:3.5-jdk-8 AS build + +RUN apt-get upgrade +COPY . /usr/src/app/deequ +RUN mvn -f /usr/src/app/deequ/pom.xml clean install -DargLine="-Xms128m -Xms512m -XX:MaxPermSize=300m -ea" From ecfeccb4610803d9ffaede3cafd1079b917041a9 Mon Sep 17 00:00:00 2001 From: cmach Date: Tue, 10 Jan 2023 18:19:33 -0800 Subject: [PATCH 2/6] adding RestMetricsRepository --- pom.xml | 6 +- .../rest/RestMetricsRepository.scala | 202 ++++++++++++++++++ 2 files changed, 207 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala diff --git a/pom.xml b/pom.xml index e4295b6e0..2f4248a42 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,11 @@ test - + + com.amazonaws + aws-java-sdk-core + 1.12.128 + diff --git a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala new file mode 100644 index 000000000..17b28e69e --- /dev/null +++ b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala @@ -0,0 +1,202 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.repository.rest + +import com.amazon.deequ.analyzers.Analyzer +import com.amazon.deequ.analyzers.runners.AnalyzerContext +import com.amazon.deequ.metrics.Metric +import com.amazon.deequ.repository._ +import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, ExecutionContext, HttpResponse, HttpResponseHandler} +import com.amazonaws.retry.PredefinedRetryPolicies +import com.amazonaws.{AmazonClientException, ClientConfiguration, DefaultRequest} +import com.google.common.collect.ImmutableList +import com.google.common.io.Closeables +import org.apache.commons.io.IOUtils + +import java.io.{BufferedInputStream, ByteArrayInputStream} + + +/** A simple Repository implementation backed by a concurrent hash map + * + * readRequest: an endpoint request that read all of the metrics generated so far + * writeRequest: an endpoint request that write th metrics to database + * */ +class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]) extends MetricsRepository { + /** + * Saves Analysis results (metrics) + * + * @param resultKey A ResultKey that uniquely identifies a AnalysisResult + * @param analyzerContext The resulting AnalyzerContext of an Analysis + */ + override def save(resultKey: ResultKey, analyzerContext: AnalyzerContext): Unit = { + val successfulMetrics = analyzerContext.metricMap.filter { + case (_, metric) => metric.value.isSuccess + } + val analyzerContextWithSuccessfulValues = AnalyzerContext(successfulMetrics) + val serializedResult = AnalysisResultSerde.serialize(Seq(AnalysisResult(resultKey, analyzerContextWithSuccessfulValues))) + + writeRequest.setContent(new ByteArrayInputStream(serializedResult.getBytes)) + + RestMetricsRepository.writeHttpClient(writeRequest) + } + + /** + * Get a AnalyzerContext saved using exactly the same resultKey if present + */ + override def loadByKey(resultKey: ResultKey): Option[AnalyzerContext] = { + load().get().find(_.resultKey == resultKey).map(_.analyzerContext) + } + + /** Get a builder class to construct a loading query to get AnalysisResults */ + override def load(): MetricsRepositoryMultipleResultsLoader = { + new RestMetricsRepositoryMultipleResultsLoader(readRequest) + } +} + +class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Void]) extends MetricsRepositoryMultipleResultsLoader { + + private[this] var tagValues: Option[Map[String, String]] = None + private[this] var forAnalyzers: Option[Seq[Analyzer[_, Metric[_]]]] = None + private[this] var before: Option[Long] = None + private[this] var after: Option[Long] = None + + /** + * Filter out results that don't have specific values for specific tags + * + * @param tagValues Map with tag names and the corresponding values to filter for + */ + def withTagValues(tagValues: Map[String, String]): MetricsRepositoryMultipleResultsLoader = { + this.tagValues = Option(tagValues) + this + } + + /** + * Choose all metrics that you want to load + * + * @param analyzers A sequence of analyers who's resulting metrics you want to load + */ + def forAnalyzers(analyzers: Seq[Analyzer[_, Metric[_]]]) + : MetricsRepositoryMultipleResultsLoader = { + + this.forAnalyzers = Option(analyzers) + this + } + + /** + * Only look at AnalysisResults with a result key with a smaller value + * + * @param dateTime The maximum dateTime of AnalysisResults to look at + */ + def before(dateTime: Long): MetricsRepositoryMultipleResultsLoader = { + this.before = Option(dateTime) + this + } + + /** + * Only look at AnalysisResults with a result key with a greater value + * + * @param dateTime The minimum dateTime of AnalysisResults to look at + */ + def after(dateTime: Long): MetricsRepositoryMultipleResultsLoader = { + this.after = Option(dateTime) + this + } + + /** Get the AnalysisResult */ + def get(): Seq[AnalysisResult] = { + val contentString = RestMetricsRepository.readHttpClient(readRequest, { + IOUtils.toString(_, "UTF-8") + }) + + val allResults = contentString.map { text => AnalysisResultSerde.deserialize(text) }.getOrElse(Seq.empty) + + val selection = allResults + .filter { result => after.isEmpty || after.get <= result.resultKey.dataSetDate } + .filter { result => before.isEmpty || result.resultKey.dataSetDate <= before.get } + .filter { result => tagValues.isEmpty || + tagValues.get.toSet.subsetOf(result.resultKey.tags.toSet) } + + selection + .map { analysisResult => + + val requestedMetrics = analysisResult + .analyzerContext + .metricMap + .filterKeys(analyzer => forAnalyzers.isEmpty || forAnalyzers.get.contains(analyzer)) + + val requestedAnalyzerContext = AnalyzerContext(requestedMetrics) + + AnalysisResult(analysisResult.resultKey, requestedAnalyzerContext) + } + } +} + +object RestMetricsRepository { + private[api] val httpClient = new AmazonHttpClient(new ClientConfiguration() + .withRetryPolicy(PredefinedRetryPolicies.DEFAULT)) + + def apply(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]): RestMetricsRepository = { + new RestMetricsRepository(readRequest, writeRequest) + } + + /* Helper function to write to a content to provided endpoint */ + private[api] def writeHttpClient(writeRequest: DefaultRequest[Void]): Unit = { + httpClient + .requestExecutionBuilder + .executionContext(new ExecutionContext(true)) + .request(writeRequest) + .errorResponseHandler(new HttpResponseHandler[AmazonClientException] { + override def handle(response: HttpResponse): AmazonClientException = { + throw new AmazonClientException(s"ERROR writing to endpoint: ${writeRequest.getEndpoint}. Code: ${response.getStatusCode}. Message: ${response.getStatusText}") + } + override def needsConnectionLeftOpen(): Boolean = false + }) + .execute(new HttpResponseHandler[Unit] { + override def handle(response: HttpResponse): Unit = { + if (response.getStatusCode != 200) { + throw new AmazonClientException(s"ERROR writing to endpoint: ${writeRequest.getEndpoint}. Code: ${response.getStatusCode}. Message: ${response.getStatusText}") + } + } + override def needsConnectionLeftOpen(): Boolean = false + }) + .getAwsResponse + } + + /* Helper function to read from provided endpoint */ + private[api] def readHttpClient[T](readRequest: DefaultRequest[Void], readFunc: BufferedInputStream => T): Option[T] = { + httpClient + .requestExecutionBuilder + .executionContext(new ExecutionContext(true)) + .request(readRequest) + .errorResponseHandler(new DefaultErrorResponseHandler(ImmutableList.of())) + .execute(new HttpResponseHandler[Option[T]] { + override def handle(response: HttpResponse): Option[T] = { + if (response.getStatusCode == 200) { + val bufferedInputStream = new BufferedInputStream(response.getContent) + try { + Option(readFunc(bufferedInputStream)) + } finally { + Closeables.close(bufferedInputStream, false) + } + } else { + None + } + } + override def needsConnectionLeftOpen(): Boolean = false + }).getAwsResponse + } +} From 1e22904d7808347e77696c3e1e8d3a1afc556d49 Mon Sep 17 00:00:00 2001 From: cmach Date: Tue, 10 Jan 2023 18:41:49 -0800 Subject: [PATCH 3/6] fixing format --- .../rest/RestMetricsRepository.scala | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala index 17b28e69e..e7d7524df 100644 --- a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala +++ b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala @@ -20,7 +20,8 @@ import com.amazon.deequ.analyzers.Analyzer import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository._ -import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, ExecutionContext, HttpResponse, HttpResponseHandler} +import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, ExecutionContext, HttpResponse, + HttpResponseHandler} import com.amazonaws.retry.PredefinedRetryPolicies import com.amazonaws.{AmazonClientException, ClientConfiguration, DefaultRequest} import com.google.common.collect.ImmutableList @@ -35,7 +36,8 @@ import java.io.{BufferedInputStream, ByteArrayInputStream} * readRequest: an endpoint request that read all of the metrics generated so far * writeRequest: an endpoint request that write th metrics to database * */ -class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]) extends MetricsRepository { +class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]) + extends MetricsRepository { /** * Saves Analysis results (metrics) * @@ -47,7 +49,9 @@ class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: Def case (_, metric) => metric.value.isSuccess } val analyzerContextWithSuccessfulValues = AnalyzerContext(successfulMetrics) - val serializedResult = AnalysisResultSerde.serialize(Seq(AnalysisResult(resultKey, analyzerContextWithSuccessfulValues))) + val serializedResult = AnalysisResultSerde.serialize( + Seq(AnalysisResult(resultKey, analyzerContextWithSuccessfulValues)) + ) writeRequest.setContent(new ByteArrayInputStream(serializedResult.getBytes)) @@ -67,7 +71,8 @@ class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: Def } } -class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Void]) extends MetricsRepositoryMultipleResultsLoader { +class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Void]) + extends MetricsRepositoryMultipleResultsLoader { private[this] var tagValues: Option[Map[String, String]] = None private[this] var forAnalyzers: Option[Seq[Analyzer[_, Metric[_]]]] = None @@ -122,7 +127,9 @@ class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Voi IOUtils.toString(_, "UTF-8") }) - val allResults = contentString.map { text => AnalysisResultSerde.deserialize(text) }.getOrElse(Seq.empty) + val allResults = contentString + .map { text => AnalysisResultSerde.deserialize(text) } + .getOrElse(Seq.empty) val selection = allResults .filter { result => after.isEmpty || after.get <= result.resultKey.dataSetDate } @@ -146,29 +153,32 @@ class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Voi } object RestMetricsRepository { - private[api] val httpClient = new AmazonHttpClient(new ClientConfiguration() + private[rest] val httpClient = new AmazonHttpClient(new ClientConfiguration() .withRetryPolicy(PredefinedRetryPolicies.DEFAULT)) - def apply(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]): RestMetricsRepository = { + def apply(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]): + RestMetricsRepository = { new RestMetricsRepository(readRequest, writeRequest) } /* Helper function to write to a content to provided endpoint */ - private[api] def writeHttpClient(writeRequest: DefaultRequest[Void]): Unit = { + private[rest] def writeHttpClient(writeRequest: DefaultRequest[Void]): Unit = { httpClient .requestExecutionBuilder .executionContext(new ExecutionContext(true)) .request(writeRequest) .errorResponseHandler(new HttpResponseHandler[AmazonClientException] { override def handle(response: HttpResponse): AmazonClientException = { - throw new AmazonClientException(s"ERROR writing to endpoint: ${writeRequest.getEndpoint}. Code: ${response.getStatusCode}. Message: ${response.getStatusText}") + throw new AmazonClientException(s"ERROR writing to endpoint: " + + s"${writeRequest.getEndpoint}. Code: ${response.getStatusCode}") } override def needsConnectionLeftOpen(): Boolean = false }) .execute(new HttpResponseHandler[Unit] { override def handle(response: HttpResponse): Unit = { if (response.getStatusCode != 200) { - throw new AmazonClientException(s"ERROR writing to endpoint: ${writeRequest.getEndpoint}. Code: ${response.getStatusCode}. Message: ${response.getStatusText}") + throw new AmazonClientException(s"ERROR writing to endpoint: " + + s"${writeRequest.getEndpoint}. Code: ${response.getStatusCode}") } } override def needsConnectionLeftOpen(): Boolean = false @@ -177,7 +187,8 @@ object RestMetricsRepository { } /* Helper function to read from provided endpoint */ - private[api] def readHttpClient[T](readRequest: DefaultRequest[Void], readFunc: BufferedInputStream => T): Option[T] = { + private[rest] def readHttpClient[T](readRequest: DefaultRequest[Void], + readFunc: BufferedInputStream => T): Option[T] = { httpClient .requestExecutionBuilder .executionContext(new ExecutionContext(true)) From 5ad0ab2ebd5ecb93f05ebeb77dd7a5d527838f9f Mon Sep 17 00:00:00 2001 From: cmach Date: Wed, 11 Jan 2023 21:47:41 -0800 Subject: [PATCH 4/6] finished unit tests --- .../rest/RestMetricsRepository.scala | 42 ++- .../rest/RestMetricsRepositoryTest.scala | 338 ++++++++++++++++++ 2 files changed, 361 insertions(+), 19 deletions(-) create mode 100644 src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala diff --git a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala index e7d7524df..333a1d6e8 100644 --- a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala +++ b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala @@ -20,10 +20,9 @@ import com.amazon.deequ.analyzers.Analyzer import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository._ -import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, ExecutionContext, HttpResponse, - HttpResponseHandler} +import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, ExecutionContext, HttpResponse, HttpResponseHandler} import com.amazonaws.retry.PredefinedRetryPolicies -import com.amazonaws.{AmazonClientException, ClientConfiguration, DefaultRequest} +import com.amazonaws.{AmazonClientException, ClientConfiguration, Request} import com.google.common.collect.ImmutableList import com.google.common.io.Closeables import org.apache.commons.io.IOUtils @@ -36,8 +35,9 @@ import java.io.{BufferedInputStream, ByteArrayInputStream} * readRequest: an endpoint request that read all of the metrics generated so far * writeRequest: an endpoint request that write th metrics to database * */ -class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]) +class RestMetricsRepository(readRequest: Request[Void], writeRequest: Request[Void]) extends MetricsRepository { + var apiHelper: RestApiHelper = new RestApiHelperImp() /** * Saves Analysis results (metrics) * @@ -53,9 +53,9 @@ class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: Def Seq(AnalysisResult(resultKey, analyzerContextWithSuccessfulValues)) ) - writeRequest.setContent(new ByteArrayInputStream(serializedResult.getBytes)) + writeRequest.setContent(new ByteArrayInputStream(serializedResult.getBytes("UTF-8"))) - RestMetricsRepository.writeHttpClient(writeRequest) + apiHelper.writeHttpRequest(writeRequest) } /** @@ -67,11 +67,15 @@ class RestMetricsRepository(readRequest: DefaultRequest[Void], writeRequest: Def /** Get a builder class to construct a loading query to get AnalysisResults */ override def load(): MetricsRepositoryMultipleResultsLoader = { - new RestMetricsRepositoryMultipleResultsLoader(readRequest) + new RestMetricsRepositoryMultipleResultsLoader(apiHelper, readRequest) + } + + def setApiHelper(apiHelper: RestApiHelper): Unit = { + this.apiHelper = apiHelper } } -class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Void]) +class RestMetricsRepositoryMultipleResultsLoader(apiHelper: RestApiHelper, readRequest: Request[Void]) extends MetricsRepositoryMultipleResultsLoader { private[this] var tagValues: Option[Map[String, String]] = None @@ -123,7 +127,7 @@ class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Voi /** Get the AnalysisResult */ def get(): Seq[AnalysisResult] = { - val contentString = RestMetricsRepository.readHttpClient(readRequest, { + val contentString = apiHelper.readHttpRequest(readRequest, { IOUtils.toString(_, "UTF-8") }) @@ -152,17 +156,17 @@ class RestMetricsRepositoryMultipleResultsLoader(readRequest: DefaultRequest[Voi } } -object RestMetricsRepository { - private[rest] val httpClient = new AmazonHttpClient(new ClientConfiguration() - .withRetryPolicy(PredefinedRetryPolicies.DEFAULT)) +trait RestApiHelper { + def writeHttpRequest(writeRequest: Request[Void]): Unit + def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T): Option[T] +} - def apply(readRequest: DefaultRequest[Void], writeRequest: DefaultRequest[Void]): - RestMetricsRepository = { - new RestMetricsRepository(readRequest, writeRequest) - } +class RestApiHelperImp extends RestApiHelper { + private val httpClient = new AmazonHttpClient(new ClientConfiguration() + .withRetryPolicy(PredefinedRetryPolicies.DEFAULT)) /* Helper function to write to a content to provided endpoint */ - private[rest] def writeHttpClient(writeRequest: DefaultRequest[Void]): Unit = { + override def writeHttpRequest(writeRequest: Request[Void]): Unit = { httpClient .requestExecutionBuilder .executionContext(new ExecutionContext(true)) @@ -187,8 +191,8 @@ object RestMetricsRepository { } /* Helper function to read from provided endpoint */ - private[rest] def readHttpClient[T](readRequest: DefaultRequest[Void], - readFunc: BufferedInputStream => T): Option[T] = { + override def readHttpRequest[T](readRequest: Request[Void], + readFunc: BufferedInputStream => T): Option[T] = { httpClient .requestExecutionBuilder .executionContext(new ExecutionContext(true)) diff --git a/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala new file mode 100644 index 000000000..0cb598fc6 --- /dev/null +++ b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala @@ -0,0 +1,338 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.repository.rest + +import com.amazon.deequ.SparkContextSpec +import com.amazon.deequ.analyzers._ +import com.amazon.deequ.analyzers.runners.AnalyzerContext._ +import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext} +import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} +import com.amazon.deequ.repository.{AnalysisResult, AnalysisResultSerde, MetricsRepository, ResultKey} +import com.amazon.deequ.utils.{FixtureSupport, TempFileUtils} +import com.amazonaws.{DefaultRequest, Request} +import com.amazonaws.http.HttpMethodName +import com.google.common.io.Closeables +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.scalamock.scalatest.MockFactory +import org.scalatest.wordspec.AnyWordSpec + +import java.io.{BufferedInputStream, ByteArrayInputStream} +import java.net.URI +import java.time.{LocalDate, ZoneOffset} +import java.util.concurrent.ConcurrentHashMap +import scala.util.{Failure, Success} +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ + +class RestMetricsRepositoryTest extends AnyWordSpec + with SparkContextSpec with FixtureSupport { + + private[this] val DATE_ONE = createDate(2017, 10, 14) + private[this] val DATE_TWO = createDate(2017, 10, 15) + private[this] val DATE_THREE = createDate(2017, 10, 16) + + private[this] val REGION_EU = Map("Region" -> "EU") + private[this] val REGION_NA = Map("Region" -> "NA") + + private[this] val TOKEN = "asdf123" + private[this] val ENDPOINT = "https://test.api.com" + private[this] val PATH = "/v1/api/metrics" + + "Rest Metric Repository" should { + + "save and retrieve AnalyzerContexts" in withSparkSession { session => + evaluate(session) { (results, repository) => + + val resultKey = ResultKey(DATE_ONE, REGION_EU) + repository.save(resultKey, results) + + val loadedResults = repository.loadByKey(resultKey).get + + val loadedResultsAsDataFrame = successMetricsAsDataFrame(session, loadedResults) + val resultsAsDataFrame = successMetricsAsDataFrame(session, results) + + assertSameRows(loadedResultsAsDataFrame, resultsAsDataFrame) + assert(results == loadedResults) + } + } + + "save should ignore failed result metrics when saving" in withSparkSession { session => + + val metrics: Map[Analyzer[_, Metric[_]], Metric[_]] = Map( + Size() -> DoubleMetric(Entity.Column, "Size", "*", Success(5.0)), + Completeness("ColumnA") -> + DoubleMetric(Entity.Column, "Completeness", "ColumnA", + Failure(new RuntimeException("error")))) + + val resultsWithMixedValues = AnalyzerContext(metrics) + + val successMetrics = resultsWithMixedValues.metricMap + .filter { case (_, metric) => metric.value.isSuccess } + + val resultsWithSuccessfulValues = AnalyzerContext(successMetrics) + + val repository = createRepository(session) + + val resultKey = ResultKey(DATE_ONE, REGION_EU) + + repository.save(resultKey, resultsWithMixedValues) + + val loadedAnalyzerContext = repository.loadByKey(resultKey).get + + assert(resultsWithSuccessfulValues == loadedAnalyzerContext) + } + + "saving should work for very long strings as well" in withSparkSession { session => + evaluate(session) { (results, repository) => + + (1 to 200).foreach(number => repository.save(ResultKey(number, Map.empty), results)) + + val loadedAnalyzerContext = repository.loadByKey(ResultKey(200, Map.empty)).get + + assert(results == loadedAnalyzerContext) + } + } + + "save and retrieve AnalysisResults" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .getSuccessMetricsAsDataFrame(session) + + import session.implicits._ + val expected = Seq( + // First analysisResult + ("Dataset", "*", "Size", 4.0, DATE_ONE, "EU"), + ("Column", "item", "Distinctness", 1.0, DATE_ONE, "EU"), + ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU"), + // Second analysisResult + ("Dataset", "*", "Size", 4.0, DATE_TWO, "NA"), + ("Column", "item", "Distinctness", 1.0, DATE_TWO, "NA"), + ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "only load AnalysisResults within a specific time frame if requested" in + withSparkSession { sparkSession => + + evaluate(sparkSession) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + repository.save(ResultKey(DATE_THREE, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_TWO) + .before(DATE_TWO) + .getSuccessMetricsAsDataFrame(sparkSession) + + import sparkSession.implicits._ + val expected = Seq( + // Second analysisResult + ("Dataset", "*", "Size", 4.0, DATE_TWO, "NA"), + ("Column", "item", "Distinctness", 1.0, DATE_TWO, "NA"), + ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "only load AnalyzerContexts with specific Tags if requested" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .withTagValues(REGION_EU) + .getSuccessMetricsAsDataFrame(session) + + import session.implicits._ + val expected = Seq( + // First analysisResult + ("Dataset", "*", "Size", 4.0, DATE_ONE, "EU"), + ("Column", "item", "Distinctness", 1.0, DATE_ONE, "EU"), + ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "only include specific metrics in loaded AnalysisResults if requested" in + withSparkSession { sparkSession => + + evaluate(sparkSession) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .forAnalyzers(Seq(Completeness("att1"), Uniqueness(Seq("att1", "att2")))) + .getSuccessMetricsAsDataFrame(sparkSession) + + import sparkSession.implicits._ + val expected = Seq( + // First analysisResult + ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU"), + // Second analysisResult + ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "include no metrics in loaded AnalysisResults if requested" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .forAnalyzers(Seq.empty) + .getSuccessMetricsAsDataFrame(session) + + import session.implicits._ + val expected = Seq.empty[(String, String, String, Double, Long, String)] + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "return empty Seq if load parameters too restrictive" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResults = repository.load() + .after(DATE_TWO) + .before(DATE_ONE) + .get() + + assert(analysisResults.isEmpty) + } + } + } + + private[this] def evaluate(session: SparkSession) + (test: ( AnalyzerContext, MetricsRepository) => Unit): Unit = { + + val data = getDfFull(session) + val results = AnalysisRunner.run(data, createAnalysis()) + val repository = createRepository(session) + + test(results, repository) + } + + private[this] def createAnalysis(): Analysis = { + Analysis() + .addAnalyzer(Size()) + .addAnalyzer(Distinctness("item")) + .addAnalyzer(Completeness("att1")) + .addAnalyzer(Uniqueness(Seq("att1", "att2"))) + } + + private[this] def createDate(year: Int, month: Int, day: Int): Long = { + LocalDate.of(year, month, day).atTime(10, 10, 10).toEpochSecond(ZoneOffset.UTC) + } + + private[this] def createRepository(sparkSession: SparkSession): MetricsRepository = { + val writeRequest = new DefaultRequest[Void]("execute-api") + writeRequest.setHttpMethod(HttpMethodName.POST) + writeRequest.setEndpoint(URI.create(ENDPOINT)) + writeRequest.setResourcePath(PATH) + val ssoJwt = "sso-jwt " + TOKEN + writeRequest.setHeaders(Map( + "Content-Type" -> "application/json", + "Authorization" -> ssoJwt + ).asJava) + + val readRequest = new DefaultRequest[Void]("execute-api") + readRequest.setHttpMethod(HttpMethodName.GET) + readRequest.setEndpoint(URI.create(ENDPOINT)) + readRequest.setResourcePath(PATH) + readRequest.setHeaders(Map( + "Content-Type" -> "application/json", + "Authorization" -> ssoJwt + ).asJava) + + val repo = new RestMetricsRepository(readRequest=readRequest, writeRequest=writeRequest) + repo.setApiHelper(new RestApiHelperMock()) + repo + } + + private[this] def assertSameRows(dataFrameA: DataFrame, dataFrameB: DataFrame): Unit = { + assert(dataFrameA.collect().toSet == dataFrameB.collect().toSet) + } +} + +class RestApiHelperMock extends RestApiHelper { + private val mapRepo = new ConcurrentHashMap[ResultKey, AnalysisResult]() + + override def writeHttpRequest(writeRequest: Request[Void]): Unit = { + val contentString = Option(IOUtils.toString(writeRequest.getContent, "UTF-8")) + val allResults = contentString + .map { text => AnalysisResultSerde.deserialize(text) } + .getOrElse(Seq.empty) + allResults.foreach(result => mapRepo.put(result.resultKey, result)) + } + + override def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T): + Option[T] = { + val analyzerResults = mapRepo.values.map { analysisResult => + val requestedMetrics = analysisResult + .analyzerContext + .metricMap + + AnalysisResult(analysisResult.resultKey, AnalyzerContext(requestedMetrics)) + } + .toSeq + val serializedResult = AnalysisResultSerde.serialize(analyzerResults) + val byteArrayInputStream = new ByteArrayInputStream(serializedResult.getBytes("UTF-8")) + val bufferedInputStream = new BufferedInputStream(byteArrayInputStream) + try { + Option(readFunc(bufferedInputStream)) + } finally { + Closeables.close(bufferedInputStream, false) + } + } +} From 4b5a80dd96469351004f162fdb2910cb52c1ed56 Mon Sep 17 00:00:00 2001 From: cmach Date: Thu, 12 Jan 2023 11:57:25 -0800 Subject: [PATCH 5/6] clean up and adding comments --- .../rest/RestMetricsRepository.scala | 10 +++++---- .../rest/RestMetricsRepositoryTest.scala | 22 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala index 333a1d6e8..c5e022ffa 100644 --- a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala +++ b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala @@ -30,13 +30,16 @@ import org.apache.commons.io.IOUtils import java.io.{BufferedInputStream, ByteArrayInputStream} -/** A simple Repository implementation backed by a concurrent hash map +/** A simple Repository implementation using AmazonHttpClient to read and write to API * * readRequest: an endpoint request that read all of the metrics generated so far - * writeRequest: an endpoint request that write th metrics to database + * writeRequest: an endpoint request that write analyzer metrics * */ class RestMetricsRepository(readRequest: Request[Void], writeRequest: Request[Void]) extends MetricsRepository { + /** + * Other implementation of this RestApiHelper can be used, by extending RestApiHelper, and call setApiHelper + * */ var apiHelper: RestApiHelper = new RestApiHelperImp() /** * Saves Analysis results (metrics) @@ -70,6 +73,7 @@ class RestMetricsRepository(readRequest: Request[Void], writeRequest: Request[Vo new RestMetricsRepositoryMultipleResultsLoader(apiHelper, readRequest) } + /** Set different implementation of RestApiHelper, instead of the default AmazonHttpClient */ def setApiHelper(apiHelper: RestApiHelper): Unit = { this.apiHelper = apiHelper } @@ -165,7 +169,6 @@ class RestApiHelperImp extends RestApiHelper { private val httpClient = new AmazonHttpClient(new ClientConfiguration() .withRetryPolicy(PredefinedRetryPolicies.DEFAULT)) - /* Helper function to write to a content to provided endpoint */ override def writeHttpRequest(writeRequest: Request[Void]): Unit = { httpClient .requestExecutionBuilder @@ -190,7 +193,6 @@ class RestApiHelperImp extends RestApiHelper { .getAwsResponse } - /* Helper function to read from provided endpoint */ override def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T): Option[T] = { httpClient diff --git a/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala index 0cb598fc6..a802c95dd 100644 --- a/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala @@ -22,13 +22,12 @@ import com.amazon.deequ.analyzers.runners.AnalyzerContext._ import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext} import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} import com.amazon.deequ.repository.{AnalysisResult, AnalysisResultSerde, MetricsRepository, ResultKey} -import com.amazon.deequ.utils.{FixtureSupport, TempFileUtils} +import com.amazon.deequ.utils.{FixtureSupport} import com.amazonaws.{DefaultRequest, Request} import com.amazonaws.http.HttpMethodName import com.google.common.io.Closeables import org.apache.commons.io.IOUtils import org.apache.spark.sql.{DataFrame, SparkSession} -import org.scalamock.scalatest.MockFactory import org.scalatest.wordspec.AnyWordSpec import java.io.{BufferedInputStream, ByteArrayInputStream} @@ -49,7 +48,7 @@ class RestMetricsRepositoryTest extends AnyWordSpec private[this] val REGION_EU = Map("Region" -> "EU") private[this] val REGION_NA = Map("Region" -> "NA") - private[this] val TOKEN = "asdf123" + private[this] val TOKEN = "sso_token_guid_123" private[this] val ENDPOINT = "https://test.api.com" private[this] val PATH = "/v1/api/metrics" @@ -276,24 +275,23 @@ class RestMetricsRepositoryTest extends AnyWordSpec } private[this] def createRepository(sparkSession: SparkSession): MetricsRepository = { + val ssoJwt = "sso-jwt " + TOKEN + val headers = Map( + "Content-Type" -> "application/json", + "Authorization" -> ssoJwt + ) + val writeRequest = new DefaultRequest[Void]("execute-api") writeRequest.setHttpMethod(HttpMethodName.POST) writeRequest.setEndpoint(URI.create(ENDPOINT)) writeRequest.setResourcePath(PATH) - val ssoJwt = "sso-jwt " + TOKEN - writeRequest.setHeaders(Map( - "Content-Type" -> "application/json", - "Authorization" -> ssoJwt - ).asJava) + writeRequest.setHeaders(headers.asJava) val readRequest = new DefaultRequest[Void]("execute-api") readRequest.setHttpMethod(HttpMethodName.GET) readRequest.setEndpoint(URI.create(ENDPOINT)) readRequest.setResourcePath(PATH) - readRequest.setHeaders(Map( - "Content-Type" -> "application/json", - "Authorization" -> ssoJwt - ).asJava) + readRequest.setHeaders(headers.asJava) val repo = new RestMetricsRepository(readRequest=readRequest, writeRequest=writeRequest) repo.setApiHelper(new RestApiHelperMock()) From eab065d1d2c8177dec4ce3c9ae0efd4e46be3028 Mon Sep 17 00:00:00 2001 From: cmach Date: Thu, 12 Jan 2023 13:15:48 -0800 Subject: [PATCH 6/6] fixing style --- Dockerfile | 5 ----- .../deequ/repository/rest/RestMetricsRepository.scala | 9 ++++++--- .../repository/rest/RestMetricsRepositoryTest.scala | 4 ++-- 3 files changed, 8 insertions(+), 10 deletions(-) delete mode 100644 Dockerfile diff --git a/Dockerfile b/Dockerfile deleted file mode 100644 index 5f0ea3eb9..000000000 --- a/Dockerfile +++ /dev/null @@ -1,5 +0,0 @@ -FROM maven:3.5-jdk-8 AS build - -RUN apt-get upgrade -COPY . /usr/src/app/deequ -RUN mvn -f /usr/src/app/deequ/pom.xml clean install -DargLine="-Xms128m -Xms512m -XX:MaxPermSize=300m -ea" diff --git a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala index c5e022ffa..f0182df8c 100644 --- a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala +++ b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala @@ -20,7 +20,8 @@ import com.amazon.deequ.analyzers.Analyzer import com.amazon.deequ.analyzers.runners.AnalyzerContext import com.amazon.deequ.metrics.Metric import com.amazon.deequ.repository._ -import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, ExecutionContext, HttpResponse, HttpResponseHandler} +import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, + ExecutionContext, HttpResponse, HttpResponseHandler} import com.amazonaws.retry.PredefinedRetryPolicies import com.amazonaws.{AmazonClientException, ClientConfiguration, Request} import com.google.common.collect.ImmutableList @@ -38,7 +39,8 @@ import java.io.{BufferedInputStream, ByteArrayInputStream} class RestMetricsRepository(readRequest: Request[Void], writeRequest: Request[Void]) extends MetricsRepository { /** - * Other implementation of this RestApiHelper can be used, by extending RestApiHelper, and call setApiHelper + * Other implementation of this RestApiHelper can be used, + * by extending RestApiHelper, and call setApiHelper * */ var apiHelper: RestApiHelper = new RestApiHelperImp() /** @@ -79,7 +81,8 @@ class RestMetricsRepository(readRequest: Request[Void], writeRequest: Request[Vo } } -class RestMetricsRepositoryMultipleResultsLoader(apiHelper: RestApiHelper, readRequest: Request[Void]) +class RestMetricsRepositoryMultipleResultsLoader(apiHelper: RestApiHelper, + readRequest: Request[Void]) extends MetricsRepositoryMultipleResultsLoader { private[this] var tagValues: Option[Map[String, String]] = None diff --git a/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala index a802c95dd..0e84c1f49 100644 --- a/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala +++ b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala @@ -293,7 +293,7 @@ class RestMetricsRepositoryTest extends AnyWordSpec readRequest.setResourcePath(PATH) readRequest.setHeaders(headers.asJava) - val repo = new RestMetricsRepository(readRequest=readRequest, writeRequest=writeRequest) + val repo = new RestMetricsRepository(readRequest = readRequest, writeRequest = writeRequest) repo.setApiHelper(new RestApiHelperMock()) repo } @@ -315,7 +315,7 @@ class RestApiHelperMock extends RestApiHelper { } override def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T): - Option[T] = { + Option[T] = { val analyzerResults = mapRepo.values.map { analysisResult => val requestedMetrics = analysisResult .analyzerContext