From 70f0100bd7bbcb60f67195b8854edc960d48a96f Mon Sep 17 00:00:00 2001 From: Peter Vlugter <59895+pvlugter@users.noreply.github.com> Date: Wed, 8 Jan 2025 13:05:41 +1300 Subject: [PATCH] always provide logging and exception for batch writes with retries --- .../docs/javadsl/ProjectionDocExample.java | 24 +----- .../docs/scaladsl/ProjectionDocExample.scala | 16 +--- .../offset-store-dao.excludes | 2 + .../akka/projection/dynamodb/Requests.scala | 69 ++++++++++++++++++ .../dynamodb/internal/OffsetStoreDao.scala | 21 +----- .../javadsl/{Retry.scala => Requests.scala} | 31 +++----- .../scaladsl/{Retry.scala => Requests.scala} | 73 +++++-------------- docs/src/main/paradox/dynamodb.md | 4 +- 8 files changed, 112 insertions(+), 128 deletions(-) create mode 100644 akka-projection-dynamodb/src/main/mima-filters/1.6.5.backwards.excludes/offset-store-dao.excludes create mode 100644 akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/Requests.scala rename akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/javadsl/{Retry.scala => Requests.scala} (79%) rename akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/scaladsl/{Retry.scala => Requests.scala} (60%) diff --git a/akka-projection-dynamodb-integration/src/test/java/projection/docs/javadsl/ProjectionDocExample.java b/akka-projection-dynamodb-integration/src/test/java/projection/docs/javadsl/ProjectionDocExample.java index fa2182578..255904e63 100644 --- a/akka-projection-dynamodb-integration/src/test/java/projection/docs/javadsl/ProjectionDocExample.java +++ b/akka-projection-dynamodb-integration/src/test/java/projection/docs/javadsl/ProjectionDocExample.java @@ -33,7 +33,7 @@ // #handler // #grouped-handler -import akka.projection.dynamodb.javadsl.Retry; +import akka.projection.dynamodb.javadsl.Requests; import akka.projection.javadsl.Handler; // #grouped-handler // #handler @@ -263,26 +263,8 @@ public CompletionStage process(List> env // batch write, retrying writes for any unprocessed items (with exponential backoff) CompletionStage> result = - Retry.batchWrite( - client, - request, - maxRetries, - minBackoff, - maxBackoff, - randomFactor, - // called on each retry: log that unprocessed items are being retried - (response, retry, delay) -> - logger.debug( - "Retrying batch write in [{} ms], [{}/{}] retries, unprocessed items [{}]", - delay.toMillis(), - retry, - maxRetries, - response.unprocessedItems()), - // create the exception if max retries is reached - response -> - new RuntimeException( - "Failed to write batch, unprocessed items: " + response.unprocessedItems()), - system); + Requests.batchWriteWithRetries( + client, request, maxRetries, minBackoff, maxBackoff, randomFactor, system); return result.thenApply(__ -> Done.getInstance()); } diff --git a/akka-projection-dynamodb-integration/src/test/scala/projection/docs/scaladsl/ProjectionDocExample.scala b/akka-projection-dynamodb-integration/src/test/scala/projection/docs/scaladsl/ProjectionDocExample.scala index d711a32c3..b268a78bb 100644 --- a/akka-projection-dynamodb-integration/src/test/scala/projection/docs/scaladsl/ProjectionDocExample.scala +++ b/akka-projection-dynamodb-integration/src/test/scala/projection/docs/scaladsl/ProjectionDocExample.scala @@ -137,7 +137,7 @@ object ProjectionDocExample { //#grouped-handler import akka.Done import akka.persistence.query.typed.EventEnvelope - import akka.projection.dynamodb.scaladsl.Retry + import akka.projection.dynamodb.scaladsl.Requests import akka.projection.scaladsl.Handler import software.amazon.awssdk.services.dynamodb.model.AttributeValue import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest @@ -180,22 +180,14 @@ object ProjectionDocExample { .build() // batch write, retrying writes for any unprocessed items (with exponential backoff) - Retry - .batchWrite( + Requests + .batchWriteWithRetries( client, request, maxRetries = 3, minBackoff = 200.millis, maxBackoff = 2.seconds, - randomFactor = 0.3, - onRetry = (response, retry, delay) => - logger.debug( - "Retrying batch write in [{} ms], [{}/3] retries, unprocessed items [{}]", - delay.toMillis, - retry, - response.unprocessedItems), - failOnMaxRetries = response => - new RuntimeException(s"Failed to write batch, unprocessed items [${response.unprocessedItems}]")) + randomFactor = 0.3) .map(_ => Done) } } diff --git a/akka-projection-dynamodb/src/main/mima-filters/1.6.5.backwards.excludes/offset-store-dao.excludes b/akka-projection-dynamodb/src/main/mima-filters/1.6.5.backwards.excludes/offset-store-dao.excludes new file mode 100644 index 000000000..8fcded533 --- /dev/null +++ b/akka-projection-dynamodb/src/main/mima-filters/1.6.5.backwards.excludes/offset-store-dao.excludes @@ -0,0 +1,2 @@ +# internal +ProblemFilters.exclude[MissingClassProblem]("akka.projection.dynamodb.internal.OffsetStoreDao$BatchWriteFailed") diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/Requests.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/Requests.scala new file mode 100644 index 000000000..aee227384 --- /dev/null +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/Requests.scala @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2024 Lightbend Inc. + */ + +package akka.projection.dynamodb + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ + +import akka.actor.typed.ActorSystem +import akka.pattern.BackoffSupervisor +import akka.pattern.after +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse + +object Requests { + + final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) + extends RuntimeException( + s"Failed to batch write all items, [${batchUnprocessedTotal(lastResponse)}] unprocessed items remaining") + + private[akka] val log: Logger = LoggerFactory.getLogger(getClass) + + private[akka] def batchUnprocessedTotal(response: BatchWriteItemResponse): Int = + response.unprocessedItems.asScala.valuesIterator.map(_.size).sum + + private[akka] def retry[Request, Response]( + request: Request, + attempt: Request => Future[Response], + decideRetry: (Request, Response) => Option[Request], + maxRetries: Int, + minBackoff: FiniteDuration, + maxBackoff: FiniteDuration, + randomFactor: Double, + onRetry: (Response, Int, FiniteDuration) => Unit, + failOnMaxRetries: Response => Throwable, + retries: Int = 0)(implicit system: ActorSystem[_]): Future[Seq[Response]] = { + import system.executionContext + attempt(request).flatMap { response => + decideRetry(request, response) match { + case Some(nextRequest) => + if (retries >= maxRetries) { + Future.failed(failOnMaxRetries(response)) + } else { // retry after exponential backoff + val nextRetry = retries + 1 + val delay = BackoffSupervisor.calculateDelay(retries, minBackoff, maxBackoff, randomFactor) + onRetry(response, nextRetry, delay) + after(delay) { + retry( + nextRequest, + attempt, + decideRetry, + maxRetries, + minBackoff, + maxBackoff, + randomFactor, + onRetry, + failOnMaxRetries, + nextRetry) + }.map { responses => response +: responses }(ExecutionContext.parasitic) + } + case None => Future.successful(Seq(response)) + } + } + } +} diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala index 66454341c..b61fdb27f 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/internal/OffsetStoreDao.scala @@ -21,8 +21,9 @@ import akka.persistence.dynamodb.internal.InstantFactory import akka.persistence.query.TimestampOffset import akka.projection.ProjectionId import akka.projection.dynamodb.DynamoDBProjectionSettings +import akka.projection.dynamodb.Requests.BatchWriteFailed import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Record -import akka.projection.dynamodb.scaladsl.Retry +import akka.projection.dynamodb.scaladsl.Requests import akka.projection.internal.ManagementState import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -60,8 +61,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse val timestampBySlicePid = AttributeValue.fromS("_") val managementStateBySlicePid = AttributeValue.fromS("_mgmt") } - - final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends RuntimeException } /** @@ -73,7 +72,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse projectionId: ProjectionId, client: DynamoDbAsyncClient) { import OffsetStoreDao.log - import OffsetStoreDao.BatchWriteFailed import OffsetStoreDao.MaxTransactItems import settings.offsetBatchSize import system.executionContext @@ -118,24 +116,13 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse } private def writeBatchWithRetries(request: BatchWriteItemRequest): Future[Seq[BatchWriteItemResponse]] = - Retry.batchWrite( + Requests.batchWriteWithRetries( client, request, settings.retrySettings.maxRetries, settings.retrySettings.minBackoff, settings.retrySettings.maxBackoff, - settings.retrySettings.randomFactor, - onRetry = (response, retry, delay) => - if (log.isDebugEnabled) { - val count = response.unprocessedItems.asScala.valuesIterator.map(_.size).sum - log.debug( - "Not all writes in batch were applied, retrying in [{} ms]: [{}] unapplied writes, [{}/{}] retries", - delay.toMillis, - count, - retry, - settings.retrySettings.maxRetries) - }, - failOnMaxRetries = new BatchWriteFailed(_))(system) + settings.retrySettings.randomFactor)(system) def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = { import OffsetStoreDao.OffsetStoreAttributes._ diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/javadsl/Retry.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/javadsl/Requests.scala similarity index 79% rename from akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/javadsl/Retry.scala rename to akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/javadsl/Requests.scala index 1bfe2231d..9931e8b99 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/javadsl/Retry.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/javadsl/Requests.scala @@ -25,7 +25,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse -object Retry { +object Requests { /** * Perform a DynamoDB batch write, retrying writes for any unprocessed items (with exponential backoff). @@ -36,35 +36,22 @@ object Retry { * @param minBackoff minimum duration to backoff between retries * @param maxBackoff maximum duration to backoff between retries * @param randomFactor adds jitter to the retry delay (use 0 for no jitter) - * @param onRetry called before each retry, with the response, the current retry count, and the delay for this retry - * @param failOnMaxRetries if max retries is reached, create a throwable for the failed future result * @param system the actor system (for scheduling) - * @return all responses from attempts (in order) + * @return when successful return all responses from attempts, + * otherwise [[akka.projection.dynamodb.Requests.BatchWriteFailed]] with last response */ - def batchWrite( + def batchWriteWithRetries( client: DynamoDbAsyncClient, request: BatchWriteItemRequest, maxRetries: Int, minBackoff: Duration, maxBackoff: Duration, randomFactor: Double, - onRetry: Procedure3[BatchWriteItemResponse, Integer, Duration], - failOnMaxRetries: JFunction[BatchWriteItemResponse, Throwable], system: ActorSystem[_]): CompletionStage[JList[BatchWriteItemResponse]] = - retryWithBackoff( - request, - (request: BatchWriteItemRequest) => client.batchWriteItem(request), - (request: BatchWriteItemRequest, response: BatchWriteItemResponse) => - if (response.hasUnprocessedItems && !response.unprocessedItems.isEmpty) - Optional.of(request.toBuilder.requestItems(response.unprocessedItems).build()) - else Optional.empty[BatchWriteItemRequest](), - maxRetries, - minBackoff, - maxBackoff, - randomFactor, - onRetry, - failOnMaxRetries, - system) + scaladsl.Requests + .batchWriteWithRetries(client, request, maxRetries, minBackoff.toScala, maxBackoff.toScala, randomFactor)(system) + .map(_.asJava)(ExecutionContext.parasitic) + .asJava /** * Retry generic requests with exponential backoff. @@ -95,7 +82,7 @@ object Retry { onRetry: Procedure3[Response, Integer, Duration], failOnMaxRetries: JFunction[Response, Throwable], system: ActorSystem[_]): CompletionStage[JList[Response]] = - scaladsl.Retry + scaladsl.Requests .retryWithBackoff( request, (request: Request) => attempt.apply(request).asScala, diff --git a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/scaladsl/Retry.scala b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/scaladsl/Requests.scala similarity index 60% rename from akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/scaladsl/Retry.scala rename to akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/scaladsl/Requests.scala index 6750c295b..57ce5733d 100644 --- a/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/scaladsl/Retry.scala +++ b/akka-projection-dynamodb/src/main/scala/akka/projection/dynamodb/scaladsl/Requests.scala @@ -4,19 +4,20 @@ package akka.projection.dynamodb.scaladsl -import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.jdk.FutureConverters._ import akka.actor.typed.ActorSystem -import akka.pattern.BackoffSupervisor -import akka.pattern.after +import akka.projection.dynamodb.Requests.BatchWriteFailed import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse -object Retry { +object Requests { + import akka.projection.dynamodb.Requests.batchUnprocessedTotal + import akka.projection.dynamodb.Requests.log + import akka.projection.dynamodb.Requests.retry /** * Perform a DynamoDB batch write, retrying writes for any unprocessed items (with exponential backoff). @@ -27,20 +28,16 @@ object Retry { * @param minBackoff minimum duration to backoff between retries * @param maxBackoff maximum duration to backoff between retries * @param randomFactor adds jitter to the retry delay (use 0 for no jitter) - * @param onRetry called before each retry, with the response, the current retry count, and the delay for this retry - * @param failOnMaxRetries if max retries is reached, create a throwable for the failed future result - * @return all responses from attempts (in order) + * @return when successful return all responses from attempts, + * otherwise [[akka.projection.dynamodb.Requests.BatchWriteFailed]] with last response */ - def batchWrite( + def batchWriteWithRetries( client: DynamoDbAsyncClient, request: BatchWriteItemRequest, maxRetries: Int, minBackoff: FiniteDuration, maxBackoff: FiniteDuration, - randomFactor: Double, - onRetry: (BatchWriteItemResponse, Int, FiniteDuration) => Unit, - failOnMaxRetries: BatchWriteItemResponse => Throwable)( - implicit system: ActorSystem[_]): Future[Seq[BatchWriteItemResponse]] = + randomFactor: Double)(implicit system: ActorSystem[_]): Future[Seq[BatchWriteItemResponse]] = retryWithBackoff( request, (request: BatchWriteItemRequest) => client.batchWriteItem(request).asScala, @@ -52,8 +49,16 @@ object Retry { minBackoff, maxBackoff, randomFactor, - onRetry, - failOnMaxRetries) + onRetry = (response: BatchWriteItemResponse, retry: Int, delay: FiniteDuration) => + if (log.isDebugEnabled) { + log.debug( + "Not all items in batch were written, [{}] unprocessed items, retrying in [{} ms], [{}/{}] retries", + batchUnprocessedTotal(response), + delay.toMillis, + retry, + maxRetries) + }, + failOnMaxRetries = new BatchWriteFailed(_)) /** * Retry generic requests with exponential backoff. @@ -83,44 +88,4 @@ object Retry { onRetry: (Response, Int, FiniteDuration) => Unit, failOnMaxRetries: Response => Throwable)(implicit system: ActorSystem[_]): Future[Seq[Response]] = retry(request, attempt, decideRetry, maxRetries, minBackoff, maxBackoff, randomFactor, onRetry, failOnMaxRetries) - - private[akka] def retry[Request, Response]( - request: Request, - attempt: Request => Future[Response], - decideRetry: (Request, Response) => Option[Request], - maxRetries: Int, - minBackoff: FiniteDuration, - maxBackoff: FiniteDuration, - randomFactor: Double, - onRetry: (Response, Int, FiniteDuration) => Unit, - failOnMaxRetries: Response => Throwable, - retries: Int = 0)(implicit system: ActorSystem[_]): Future[Seq[Response]] = { - import system.executionContext - attempt(request).flatMap { response => - decideRetry(request, response) match { - case Some(nextRequest) => - if (retries >= maxRetries) { - Future.failed(failOnMaxRetries(response)) - } else { // retry after exponential backoff - val nextRetry = retries + 1 - val delay = BackoffSupervisor.calculateDelay(retries, minBackoff, maxBackoff, randomFactor) - onRetry(response, nextRetry, delay) - after(delay) { - retry( - nextRequest, - attempt, - decideRetry, - maxRetries, - minBackoff, - maxBackoff, - randomFactor, - onRetry, - failOnMaxRetries, - nextRetry) - }.map { responses => response +: responses }(ExecutionContext.parasitic) - } - case None => Future.successful(Seq(response)) - } - } - } } diff --git a/docs/src/main/paradox/dynamodb.md b/docs/src/main/paradox/dynamodb.md index 66de78cef..0e2f29c9d 100644 --- a/docs/src/main/paradox/dynamodb.md +++ b/docs/src/main/paradox/dynamodb.md @@ -263,8 +263,8 @@ When using @ref:[`DynamoDBProjection.atLeastOnceGroupedWithin`](#at-least-once-g If a [batch write](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html) to DynamoDB is being used, it's possible for items in the batch to fail to be written, and the response should be checked for -unprocessed items. A @apidoc[akka.projection.dynamodb.*.Retry$] utility is provided, to retry writes (with exponential -backoff) for any unprocessed items. +unprocessed items. A @apidoc[akka.projection.dynamodb.*.Requests$] utility is provided, to retry batch writes (with +exponential backoff) for any unprocessed items. Java : @@snip [grouped handler](/akka-projection-dynamodb-integration/src/test/java/projection/docs/javadsl/ProjectionDocExample.java) { #grouped-handler }