diff --git a/pom.xml b/pom.xml index e4295b6e0..2f4248a42 100644 --- a/pom.xml +++ b/pom.xml @@ -149,7 +149,11 @@ test - + + com.amazonaws + aws-java-sdk-core + 1.12.128 + diff --git a/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala new file mode 100644 index 000000000..f0182df8c --- /dev/null +++ b/src/main/scala/com/amazon/deequ/repository/rest/RestMetricsRepository.scala @@ -0,0 +1,222 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.repository.rest + +import com.amazon.deequ.analyzers.Analyzer +import com.amazon.deequ.analyzers.runners.AnalyzerContext +import com.amazon.deequ.metrics.Metric +import com.amazon.deequ.repository._ +import com.amazonaws.http.{AmazonHttpClient, DefaultErrorResponseHandler, + ExecutionContext, HttpResponse, HttpResponseHandler} +import com.amazonaws.retry.PredefinedRetryPolicies +import com.amazonaws.{AmazonClientException, ClientConfiguration, Request} +import com.google.common.collect.ImmutableList +import com.google.common.io.Closeables +import org.apache.commons.io.IOUtils + +import java.io.{BufferedInputStream, ByteArrayInputStream} + + +/** A simple Repository implementation using AmazonHttpClient to read and write to API + * + * readRequest: an endpoint request that read all of the metrics generated so far + * writeRequest: an endpoint request that write analyzer metrics + * */ +class RestMetricsRepository(readRequest: Request[Void], writeRequest: Request[Void]) + extends MetricsRepository { + /** + * Other implementation of this RestApiHelper can be used, + * by extending RestApiHelper, and call setApiHelper + * */ + var apiHelper: RestApiHelper = new RestApiHelperImp() + /** + * Saves Analysis results (metrics) + * + * @param resultKey A ResultKey that uniquely identifies a AnalysisResult + * @param analyzerContext The resulting AnalyzerContext of an Analysis + */ + override def save(resultKey: ResultKey, analyzerContext: AnalyzerContext): Unit = { + val successfulMetrics = analyzerContext.metricMap.filter { + case (_, metric) => metric.value.isSuccess + } + val analyzerContextWithSuccessfulValues = AnalyzerContext(successfulMetrics) + val serializedResult = AnalysisResultSerde.serialize( + Seq(AnalysisResult(resultKey, analyzerContextWithSuccessfulValues)) + ) + + writeRequest.setContent(new ByteArrayInputStream(serializedResult.getBytes("UTF-8"))) + + apiHelper.writeHttpRequest(writeRequest) + } + + /** + * Get a AnalyzerContext saved using exactly the same resultKey if present + */ + override def loadByKey(resultKey: ResultKey): Option[AnalyzerContext] = { + load().get().find(_.resultKey == resultKey).map(_.analyzerContext) + } + + /** Get a builder class to construct a loading query to get AnalysisResults */ + override def load(): MetricsRepositoryMultipleResultsLoader = { + new RestMetricsRepositoryMultipleResultsLoader(apiHelper, readRequest) + } + + /** Set different implementation of RestApiHelper, instead of the default AmazonHttpClient */ + def setApiHelper(apiHelper: RestApiHelper): Unit = { + this.apiHelper = apiHelper + } +} + +class RestMetricsRepositoryMultipleResultsLoader(apiHelper: RestApiHelper, + readRequest: Request[Void]) + extends MetricsRepositoryMultipleResultsLoader { + + private[this] var tagValues: Option[Map[String, String]] = None + private[this] var forAnalyzers: Option[Seq[Analyzer[_, Metric[_]]]] = None + private[this] var before: Option[Long] = None + private[this] var after: Option[Long] = None + + /** + * Filter out results that don't have specific values for specific tags + * + * @param tagValues Map with tag names and the corresponding values to filter for + */ + def withTagValues(tagValues: Map[String, String]): MetricsRepositoryMultipleResultsLoader = { + this.tagValues = Option(tagValues) + this + } + + /** + * Choose all metrics that you want to load + * + * @param analyzers A sequence of analyers who's resulting metrics you want to load + */ + def forAnalyzers(analyzers: Seq[Analyzer[_, Metric[_]]]) + : MetricsRepositoryMultipleResultsLoader = { + + this.forAnalyzers = Option(analyzers) + this + } + + /** + * Only look at AnalysisResults with a result key with a smaller value + * + * @param dateTime The maximum dateTime of AnalysisResults to look at + */ + def before(dateTime: Long): MetricsRepositoryMultipleResultsLoader = { + this.before = Option(dateTime) + this + } + + /** + * Only look at AnalysisResults with a result key with a greater value + * + * @param dateTime The minimum dateTime of AnalysisResults to look at + */ + def after(dateTime: Long): MetricsRepositoryMultipleResultsLoader = { + this.after = Option(dateTime) + this + } + + /** Get the AnalysisResult */ + def get(): Seq[AnalysisResult] = { + val contentString = apiHelper.readHttpRequest(readRequest, { + IOUtils.toString(_, "UTF-8") + }) + + val allResults = contentString + .map { text => AnalysisResultSerde.deserialize(text) } + .getOrElse(Seq.empty) + + val selection = allResults + .filter { result => after.isEmpty || after.get <= result.resultKey.dataSetDate } + .filter { result => before.isEmpty || result.resultKey.dataSetDate <= before.get } + .filter { result => tagValues.isEmpty || + tagValues.get.toSet.subsetOf(result.resultKey.tags.toSet) } + + selection + .map { analysisResult => + + val requestedMetrics = analysisResult + .analyzerContext + .metricMap + .filterKeys(analyzer => forAnalyzers.isEmpty || forAnalyzers.get.contains(analyzer)) + + val requestedAnalyzerContext = AnalyzerContext(requestedMetrics) + + AnalysisResult(analysisResult.resultKey, requestedAnalyzerContext) + } + } +} + +trait RestApiHelper { + def writeHttpRequest(writeRequest: Request[Void]): Unit + def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T): Option[T] +} + +class RestApiHelperImp extends RestApiHelper { + private val httpClient = new AmazonHttpClient(new ClientConfiguration() + .withRetryPolicy(PredefinedRetryPolicies.DEFAULT)) + + override def writeHttpRequest(writeRequest: Request[Void]): Unit = { + httpClient + .requestExecutionBuilder + .executionContext(new ExecutionContext(true)) + .request(writeRequest) + .errorResponseHandler(new HttpResponseHandler[AmazonClientException] { + override def handle(response: HttpResponse): AmazonClientException = { + throw new AmazonClientException(s"ERROR writing to endpoint: " + + s"${writeRequest.getEndpoint}. Code: ${response.getStatusCode}") + } + override def needsConnectionLeftOpen(): Boolean = false + }) + .execute(new HttpResponseHandler[Unit] { + override def handle(response: HttpResponse): Unit = { + if (response.getStatusCode != 200) { + throw new AmazonClientException(s"ERROR writing to endpoint: " + + s"${writeRequest.getEndpoint}. Code: ${response.getStatusCode}") + } + } + override def needsConnectionLeftOpen(): Boolean = false + }) + .getAwsResponse + } + + override def readHttpRequest[T](readRequest: Request[Void], + readFunc: BufferedInputStream => T): Option[T] = { + httpClient + .requestExecutionBuilder + .executionContext(new ExecutionContext(true)) + .request(readRequest) + .errorResponseHandler(new DefaultErrorResponseHandler(ImmutableList.of())) + .execute(new HttpResponseHandler[Option[T]] { + override def handle(response: HttpResponse): Option[T] = { + if (response.getStatusCode == 200) { + val bufferedInputStream = new BufferedInputStream(response.getContent) + try { + Option(readFunc(bufferedInputStream)) + } finally { + Closeables.close(bufferedInputStream, false) + } + } else { + None + } + } + override def needsConnectionLeftOpen(): Boolean = false + }).getAwsResponse + } +} diff --git a/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala new file mode 100644 index 000000000..0e84c1f49 --- /dev/null +++ b/src/test/scala/com/amazon/deequ/repository/rest/RestMetricsRepositoryTest.scala @@ -0,0 +1,336 @@ +/** + * Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License + * is located at + * + * http://aws.amazon.com/apache2.0/ + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + * + */ + +package com.amazon.deequ.repository.rest + +import com.amazon.deequ.SparkContextSpec +import com.amazon.deequ.analyzers._ +import com.amazon.deequ.analyzers.runners.AnalyzerContext._ +import com.amazon.deequ.analyzers.runners.{AnalysisRunner, AnalyzerContext} +import com.amazon.deequ.metrics.{DoubleMetric, Entity, Metric} +import com.amazon.deequ.repository.{AnalysisResult, AnalysisResultSerde, MetricsRepository, ResultKey} +import com.amazon.deequ.utils.{FixtureSupport} +import com.amazonaws.{DefaultRequest, Request} +import com.amazonaws.http.HttpMethodName +import com.google.common.io.Closeables +import org.apache.commons.io.IOUtils +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.scalatest.wordspec.AnyWordSpec + +import java.io.{BufferedInputStream, ByteArrayInputStream} +import java.net.URI +import java.time.{LocalDate, ZoneOffset} +import java.util.concurrent.ConcurrentHashMap +import scala.util.{Failure, Success} +import scala.collection.JavaConverters._ +import scala.collection.JavaConversions._ + +class RestMetricsRepositoryTest extends AnyWordSpec + with SparkContextSpec with FixtureSupport { + + private[this] val DATE_ONE = createDate(2017, 10, 14) + private[this] val DATE_TWO = createDate(2017, 10, 15) + private[this] val DATE_THREE = createDate(2017, 10, 16) + + private[this] val REGION_EU = Map("Region" -> "EU") + private[this] val REGION_NA = Map("Region" -> "NA") + + private[this] val TOKEN = "sso_token_guid_123" + private[this] val ENDPOINT = "https://test.api.com" + private[this] val PATH = "/v1/api/metrics" + + "Rest Metric Repository" should { + + "save and retrieve AnalyzerContexts" in withSparkSession { session => + evaluate(session) { (results, repository) => + + val resultKey = ResultKey(DATE_ONE, REGION_EU) + repository.save(resultKey, results) + + val loadedResults = repository.loadByKey(resultKey).get + + val loadedResultsAsDataFrame = successMetricsAsDataFrame(session, loadedResults) + val resultsAsDataFrame = successMetricsAsDataFrame(session, results) + + assertSameRows(loadedResultsAsDataFrame, resultsAsDataFrame) + assert(results == loadedResults) + } + } + + "save should ignore failed result metrics when saving" in withSparkSession { session => + + val metrics: Map[Analyzer[_, Metric[_]], Metric[_]] = Map( + Size() -> DoubleMetric(Entity.Column, "Size", "*", Success(5.0)), + Completeness("ColumnA") -> + DoubleMetric(Entity.Column, "Completeness", "ColumnA", + Failure(new RuntimeException("error")))) + + val resultsWithMixedValues = AnalyzerContext(metrics) + + val successMetrics = resultsWithMixedValues.metricMap + .filter { case (_, metric) => metric.value.isSuccess } + + val resultsWithSuccessfulValues = AnalyzerContext(successMetrics) + + val repository = createRepository(session) + + val resultKey = ResultKey(DATE_ONE, REGION_EU) + + repository.save(resultKey, resultsWithMixedValues) + + val loadedAnalyzerContext = repository.loadByKey(resultKey).get + + assert(resultsWithSuccessfulValues == loadedAnalyzerContext) + } + + "saving should work for very long strings as well" in withSparkSession { session => + evaluate(session) { (results, repository) => + + (1 to 200).foreach(number => repository.save(ResultKey(number, Map.empty), results)) + + val loadedAnalyzerContext = repository.loadByKey(ResultKey(200, Map.empty)).get + + assert(results == loadedAnalyzerContext) + } + } + + "save and retrieve AnalysisResults" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .getSuccessMetricsAsDataFrame(session) + + import session.implicits._ + val expected = Seq( + // First analysisResult + ("Dataset", "*", "Size", 4.0, DATE_ONE, "EU"), + ("Column", "item", "Distinctness", 1.0, DATE_ONE, "EU"), + ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU"), + // Second analysisResult + ("Dataset", "*", "Size", 4.0, DATE_TWO, "NA"), + ("Column", "item", "Distinctness", 1.0, DATE_TWO, "NA"), + ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "only load AnalysisResults within a specific time frame if requested" in + withSparkSession { sparkSession => + + evaluate(sparkSession) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + repository.save(ResultKey(DATE_THREE, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_TWO) + .before(DATE_TWO) + .getSuccessMetricsAsDataFrame(sparkSession) + + import sparkSession.implicits._ + val expected = Seq( + // Second analysisResult + ("Dataset", "*", "Size", 4.0, DATE_TWO, "NA"), + ("Column", "item", "Distinctness", 1.0, DATE_TWO, "NA"), + ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "only load AnalyzerContexts with specific Tags if requested" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .withTagValues(REGION_EU) + .getSuccessMetricsAsDataFrame(session) + + import session.implicits._ + val expected = Seq( + // First analysisResult + ("Dataset", "*", "Size", 4.0, DATE_ONE, "EU"), + ("Column", "item", "Distinctness", 1.0, DATE_ONE, "EU"), + ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "only include specific metrics in loaded AnalysisResults if requested" in + withSparkSession { sparkSession => + + evaluate(sparkSession) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .forAnalyzers(Seq(Completeness("att1"), Uniqueness(Seq("att1", "att2")))) + .getSuccessMetricsAsDataFrame(sparkSession) + + import sparkSession.implicits._ + val expected = Seq( + // First analysisResult + ("Column", "att1", "Completeness", 1.0, DATE_ONE, "EU"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_ONE, "EU"), + // Second analysisResult + ("Column", "att1", "Completeness", 1.0, DATE_TWO, "NA"), + ("Mutlicolumn", "att1,att2", "Uniqueness", 0.25, DATE_TWO, "NA")) + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "include no metrics in loaded AnalysisResults if requested" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResultsAsDataFrame = repository.load() + .after(DATE_ONE) + .forAnalyzers(Seq.empty) + .getSuccessMetricsAsDataFrame(session) + + import session.implicits._ + val expected = Seq.empty[(String, String, String, Double, Long, String)] + .toDF("entity", "instance", "name", "value", "dataset_date", "region") + + assertSameRows(analysisResultsAsDataFrame, expected) + } + } + + "return empty Seq if load parameters too restrictive" in withSparkSession { session => + + evaluate(session) { (results, repository) => + + repository.save(ResultKey(DATE_ONE, REGION_EU), results) + repository.save(ResultKey(DATE_TWO, REGION_NA), results) + + val analysisResults = repository.load() + .after(DATE_TWO) + .before(DATE_ONE) + .get() + + assert(analysisResults.isEmpty) + } + } + } + + private[this] def evaluate(session: SparkSession) + (test: ( AnalyzerContext, MetricsRepository) => Unit): Unit = { + + val data = getDfFull(session) + val results = AnalysisRunner.run(data, createAnalysis()) + val repository = createRepository(session) + + test(results, repository) + } + + private[this] def createAnalysis(): Analysis = { + Analysis() + .addAnalyzer(Size()) + .addAnalyzer(Distinctness("item")) + .addAnalyzer(Completeness("att1")) + .addAnalyzer(Uniqueness(Seq("att1", "att2"))) + } + + private[this] def createDate(year: Int, month: Int, day: Int): Long = { + LocalDate.of(year, month, day).atTime(10, 10, 10).toEpochSecond(ZoneOffset.UTC) + } + + private[this] def createRepository(sparkSession: SparkSession): MetricsRepository = { + val ssoJwt = "sso-jwt " + TOKEN + val headers = Map( + "Content-Type" -> "application/json", + "Authorization" -> ssoJwt + ) + + val writeRequest = new DefaultRequest[Void]("execute-api") + writeRequest.setHttpMethod(HttpMethodName.POST) + writeRequest.setEndpoint(URI.create(ENDPOINT)) + writeRequest.setResourcePath(PATH) + writeRequest.setHeaders(headers.asJava) + + val readRequest = new DefaultRequest[Void]("execute-api") + readRequest.setHttpMethod(HttpMethodName.GET) + readRequest.setEndpoint(URI.create(ENDPOINT)) + readRequest.setResourcePath(PATH) + readRequest.setHeaders(headers.asJava) + + val repo = new RestMetricsRepository(readRequest = readRequest, writeRequest = writeRequest) + repo.setApiHelper(new RestApiHelperMock()) + repo + } + + private[this] def assertSameRows(dataFrameA: DataFrame, dataFrameB: DataFrame): Unit = { + assert(dataFrameA.collect().toSet == dataFrameB.collect().toSet) + } +} + +class RestApiHelperMock extends RestApiHelper { + private val mapRepo = new ConcurrentHashMap[ResultKey, AnalysisResult]() + + override def writeHttpRequest(writeRequest: Request[Void]): Unit = { + val contentString = Option(IOUtils.toString(writeRequest.getContent, "UTF-8")) + val allResults = contentString + .map { text => AnalysisResultSerde.deserialize(text) } + .getOrElse(Seq.empty) + allResults.foreach(result => mapRepo.put(result.resultKey, result)) + } + + override def readHttpRequest[T](readRequest: Request[Void], readFunc: BufferedInputStream => T): + Option[T] = { + val analyzerResults = mapRepo.values.map { analysisResult => + val requestedMetrics = analysisResult + .analyzerContext + .metricMap + + AnalysisResult(analysisResult.resultKey, AnalyzerContext(requestedMetrics)) + } + .toSeq + val serializedResult = AnalysisResultSerde.serialize(analyzerResults) + val byteArrayInputStream = new ByteArrayInputStream(serializedResult.getBytes("UTF-8")) + val bufferedInputStream = new BufferedInputStream(byteArrayInputStream) + try { + Option(readFunc(bufferedInputStream)) + } finally { + Closeables.close(bufferedInputStream, false) + } + } +}