Skip to content

Commit

Permalink
always provide logging and exception for batch writes with retries
Browse files Browse the repository at this point in the history
  • Loading branch information
pvlugter committed Jan 8, 2025
1 parent 264c4aa commit 70f0100
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

// #handler
// #grouped-handler
import akka.projection.dynamodb.javadsl.Retry;
import akka.projection.dynamodb.javadsl.Requests;
import akka.projection.javadsl.Handler;
// #grouped-handler
// #handler
Expand Down Expand Up @@ -263,26 +263,8 @@ public CompletionStage<Done> process(List<EventEnvelope<ShoppingCart.Event>> env

// batch write, retrying writes for any unprocessed items (with exponential backoff)
CompletionStage<List<BatchWriteItemResponse>> result =
Retry.batchWrite(
client,
request,
maxRetries,
minBackoff,
maxBackoff,
randomFactor,
// called on each retry: log that unprocessed items are being retried
(response, retry, delay) ->
logger.debug(
"Retrying batch write in [{} ms], [{}/{}] retries, unprocessed items [{}]",
delay.toMillis(),
retry,
maxRetries,
response.unprocessedItems()),
// create the exception if max retries is reached
response ->
new RuntimeException(
"Failed to write batch, unprocessed items: " + response.unprocessedItems()),
system);
Requests.batchWriteWithRetries(
client, request, maxRetries, minBackoff, maxBackoff, randomFactor, system);

return result.thenApply(__ -> Done.getInstance());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ object ProjectionDocExample {
//#grouped-handler
import akka.Done
import akka.persistence.query.typed.EventEnvelope
import akka.projection.dynamodb.scaladsl.Retry
import akka.projection.dynamodb.scaladsl.Requests
import akka.projection.scaladsl.Handler
import software.amazon.awssdk.services.dynamodb.model.AttributeValue
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest
Expand Down Expand Up @@ -180,22 +180,14 @@ object ProjectionDocExample {
.build()

// batch write, retrying writes for any unprocessed items (with exponential backoff)
Retry
.batchWrite(
Requests
.batchWriteWithRetries(
client,
request,
maxRetries = 3,
minBackoff = 200.millis,
maxBackoff = 2.seconds,
randomFactor = 0.3,
onRetry = (response, retry, delay) =>
logger.debug(
"Retrying batch write in [{} ms], [{}/3] retries, unprocessed items [{}]",
delay.toMillis,
retry,
response.unprocessedItems),
failOnMaxRetries = response =>
new RuntimeException(s"Failed to write batch, unprocessed items [${response.unprocessedItems}]"))
randomFactor = 0.3)
.map(_ => Done)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# internal
ProblemFilters.exclude[MissingClassProblem]("akka.projection.dynamodb.internal.OffsetStoreDao$BatchWriteFailed")
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (C) 2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.dynamodb

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.jdk.CollectionConverters._

import akka.actor.typed.ActorSystem
import akka.pattern.BackoffSupervisor
import akka.pattern.after
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse

object Requests {

final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse)
extends RuntimeException(
s"Failed to batch write all items, [${batchUnprocessedTotal(lastResponse)}] unprocessed items remaining")

private[akka] val log: Logger = LoggerFactory.getLogger(getClass)

private[akka] def batchUnprocessedTotal(response: BatchWriteItemResponse): Int =
response.unprocessedItems.asScala.valuesIterator.map(_.size).sum

private[akka] def retry[Request, Response](
request: Request,
attempt: Request => Future[Response],
decideRetry: (Request, Response) => Option[Request],
maxRetries: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
onRetry: (Response, Int, FiniteDuration) => Unit,
failOnMaxRetries: Response => Throwable,
retries: Int = 0)(implicit system: ActorSystem[_]): Future[Seq[Response]] = {
import system.executionContext
attempt(request).flatMap { response =>
decideRetry(request, response) match {
case Some(nextRequest) =>
if (retries >= maxRetries) {
Future.failed(failOnMaxRetries(response))
} else { // retry after exponential backoff
val nextRetry = retries + 1
val delay = BackoffSupervisor.calculateDelay(retries, minBackoff, maxBackoff, randomFactor)
onRetry(response, nextRetry, delay)
after(delay) {
retry(
nextRequest,
attempt,
decideRetry,
maxRetries,
minBackoff,
maxBackoff,
randomFactor,
onRetry,
failOnMaxRetries,
nextRetry)
}.map { responses => response +: responses }(ExecutionContext.parasitic)
}
case None => Future.successful(Seq(response))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import akka.persistence.dynamodb.internal.InstantFactory
import akka.persistence.query.TimestampOffset
import akka.projection.ProjectionId
import akka.projection.dynamodb.DynamoDBProjectionSettings
import akka.projection.dynamodb.Requests.BatchWriteFailed
import akka.projection.dynamodb.internal.DynamoDBOffsetStore.Record
import akka.projection.dynamodb.scaladsl.Retry
import akka.projection.dynamodb.scaladsl.Requests
import akka.projection.internal.ManagementState
import org.slf4j.Logger
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -60,8 +61,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
val timestampBySlicePid = AttributeValue.fromS("_")
val managementStateBySlicePid = AttributeValue.fromS("_mgmt")
}

final class BatchWriteFailed(val lastResponse: BatchWriteItemResponse) extends RuntimeException
}

/**
Expand All @@ -73,7 +72,6 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
projectionId: ProjectionId,
client: DynamoDbAsyncClient) {
import OffsetStoreDao.log
import OffsetStoreDao.BatchWriteFailed
import OffsetStoreDao.MaxTransactItems
import settings.offsetBatchSize
import system.executionContext
Expand Down Expand Up @@ -118,24 +116,13 @@ import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse
}

private def writeBatchWithRetries(request: BatchWriteItemRequest): Future[Seq[BatchWriteItemResponse]] =
Retry.batchWrite(
Requests.batchWriteWithRetries(
client,
request,
settings.retrySettings.maxRetries,
settings.retrySettings.minBackoff,
settings.retrySettings.maxBackoff,
settings.retrySettings.randomFactor,
onRetry = (response, retry, delay) =>
if (log.isDebugEnabled) {
val count = response.unprocessedItems.asScala.valuesIterator.map(_.size).sum
log.debug(
"Not all writes in batch were applied, retrying in [{} ms]: [{}] unapplied writes, [{}/{}] retries",
delay.toMillis,
count,
retry,
settings.retrySettings.maxRetries)
},
failOnMaxRetries = new BatchWriteFailed(_))(system)
settings.retrySettings.randomFactor)(system)

def storeTimestampOffsets(offsetsBySlice: Map[Int, TimestampOffset]): Future[Done] = {
import OffsetStoreDao.OffsetStoreAttributes._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse

object Retry {
object Requests {

/**
* Perform a DynamoDB batch write, retrying writes for any unprocessed items (with exponential backoff).
Expand All @@ -36,35 +36,22 @@ object Retry {
* @param minBackoff minimum duration to backoff between retries
* @param maxBackoff maximum duration to backoff between retries
* @param randomFactor adds jitter to the retry delay (use 0 for no jitter)
* @param onRetry called before each retry, with the response, the current retry count, and the delay for this retry
* @param failOnMaxRetries if max retries is reached, create a throwable for the failed future result
* @param system the actor system (for scheduling)
* @return all responses from attempts (in order)
* @return when successful return all responses from attempts,
* otherwise [[akka.projection.dynamodb.Requests.BatchWriteFailed]] with last response
*/
def batchWrite(
def batchWriteWithRetries(
client: DynamoDbAsyncClient,
request: BatchWriteItemRequest,
maxRetries: Int,
minBackoff: Duration,
maxBackoff: Duration,
randomFactor: Double,
onRetry: Procedure3[BatchWriteItemResponse, Integer, Duration],
failOnMaxRetries: JFunction[BatchWriteItemResponse, Throwable],
system: ActorSystem[_]): CompletionStage[JList[BatchWriteItemResponse]] =
retryWithBackoff(
request,
(request: BatchWriteItemRequest) => client.batchWriteItem(request),
(request: BatchWriteItemRequest, response: BatchWriteItemResponse) =>
if (response.hasUnprocessedItems && !response.unprocessedItems.isEmpty)
Optional.of(request.toBuilder.requestItems(response.unprocessedItems).build())
else Optional.empty[BatchWriteItemRequest](),
maxRetries,
minBackoff,
maxBackoff,
randomFactor,
onRetry,
failOnMaxRetries,
system)
scaladsl.Requests
.batchWriteWithRetries(client, request, maxRetries, minBackoff.toScala, maxBackoff.toScala, randomFactor)(system)
.map(_.asJava)(ExecutionContext.parasitic)
.asJava

/**
* Retry generic requests with exponential backoff.
Expand Down Expand Up @@ -95,7 +82,7 @@ object Retry {
onRetry: Procedure3[Response, Integer, Duration],
failOnMaxRetries: JFunction[Response, Throwable],
system: ActorSystem[_]): CompletionStage[JList[Response]] =
scaladsl.Retry
scaladsl.Requests
.retryWithBackoff(
request,
(request: Request) => attempt.apply(request).asScala,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@

package akka.projection.dynamodb.scaladsl

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.jdk.FutureConverters._

import akka.actor.typed.ActorSystem
import akka.pattern.BackoffSupervisor
import akka.pattern.after
import akka.projection.dynamodb.Requests.BatchWriteFailed
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse

object Retry {
object Requests {
import akka.projection.dynamodb.Requests.batchUnprocessedTotal
import akka.projection.dynamodb.Requests.log
import akka.projection.dynamodb.Requests.retry

/**
* Perform a DynamoDB batch write, retrying writes for any unprocessed items (with exponential backoff).
Expand All @@ -27,20 +28,16 @@ object Retry {
* @param minBackoff minimum duration to backoff between retries
* @param maxBackoff maximum duration to backoff between retries
* @param randomFactor adds jitter to the retry delay (use 0 for no jitter)
* @param onRetry called before each retry, with the response, the current retry count, and the delay for this retry
* @param failOnMaxRetries if max retries is reached, create a throwable for the failed future result
* @return all responses from attempts (in order)
* @return when successful return all responses from attempts,
* otherwise [[akka.projection.dynamodb.Requests.BatchWriteFailed]] with last response
*/
def batchWrite(
def batchWriteWithRetries(
client: DynamoDbAsyncClient,
request: BatchWriteItemRequest,
maxRetries: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
onRetry: (BatchWriteItemResponse, Int, FiniteDuration) => Unit,
failOnMaxRetries: BatchWriteItemResponse => Throwable)(
implicit system: ActorSystem[_]): Future[Seq[BatchWriteItemResponse]] =
randomFactor: Double)(implicit system: ActorSystem[_]): Future[Seq[BatchWriteItemResponse]] =
retryWithBackoff(
request,
(request: BatchWriteItemRequest) => client.batchWriteItem(request).asScala,
Expand All @@ -52,8 +49,16 @@ object Retry {
minBackoff,
maxBackoff,
randomFactor,
onRetry,
failOnMaxRetries)
onRetry = (response: BatchWriteItemResponse, retry: Int, delay: FiniteDuration) =>
if (log.isDebugEnabled) {
log.debug(
"Not all items in batch were written, [{}] unprocessed items, retrying in [{} ms], [{}/{}] retries",
batchUnprocessedTotal(response),
delay.toMillis,
retry,
maxRetries)
},
failOnMaxRetries = new BatchWriteFailed(_))

/**
* Retry generic requests with exponential backoff.
Expand Down Expand Up @@ -83,44 +88,4 @@ object Retry {
onRetry: (Response, Int, FiniteDuration) => Unit,
failOnMaxRetries: Response => Throwable)(implicit system: ActorSystem[_]): Future[Seq[Response]] =
retry(request, attempt, decideRetry, maxRetries, minBackoff, maxBackoff, randomFactor, onRetry, failOnMaxRetries)

private[akka] def retry[Request, Response](
request: Request,
attempt: Request => Future[Response],
decideRetry: (Request, Response) => Option[Request],
maxRetries: Int,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
onRetry: (Response, Int, FiniteDuration) => Unit,
failOnMaxRetries: Response => Throwable,
retries: Int = 0)(implicit system: ActorSystem[_]): Future[Seq[Response]] = {
import system.executionContext
attempt(request).flatMap { response =>
decideRetry(request, response) match {
case Some(nextRequest) =>
if (retries >= maxRetries) {
Future.failed(failOnMaxRetries(response))
} else { // retry after exponential backoff
val nextRetry = retries + 1
val delay = BackoffSupervisor.calculateDelay(retries, minBackoff, maxBackoff, randomFactor)
onRetry(response, nextRetry, delay)
after(delay) {
retry(
nextRequest,
attempt,
decideRetry,
maxRetries,
minBackoff,
maxBackoff,
randomFactor,
onRetry,
failOnMaxRetries,
nextRetry)
}.map { responses => response +: responses }(ExecutionContext.parasitic)
}
case None => Future.successful(Seq(response))
}
}
}
}
4 changes: 2 additions & 2 deletions docs/src/main/paradox/dynamodb.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ When using @ref:[`DynamoDBProjection.atLeastOnceGroupedWithin`](#at-least-once-g

If a [batch write](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html) to DynamoDB
is being used, it's possible for items in the batch to fail to be written, and the response should be checked for
unprocessed items. A @apidoc[akka.projection.dynamodb.*.Retry$] utility is provided, to retry writes (with exponential
backoff) for any unprocessed items.
unprocessed items. A @apidoc[akka.projection.dynamodb.*.Requests$] utility is provided, to retry batch writes (with
exponential backoff) for any unprocessed items.

Java
: @@snip [grouped handler](/akka-projection-dynamodb-integration/src/test/java/projection/docs/javadsl/ProjectionDocExample.java) { #grouped-handler }
Expand Down

0 comments on commit 70f0100

Please sign in to comment.