From 319a879a0fefcdaccf598bbc869d523ff2317516 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 15 Nov 2024 15:29:18 +0100 Subject: [PATCH] chore: Verbose logs after retrying the projection a number of times --- .../akka/projection/internal/ProjectionSettings.scala | 9 +++++++-- .../grpc/consumer/scaladsl/GrpcReadJournal.scala | 4 ---- .../projection/grpc/internal/ConnectionException.scala | 3 --- project/Dependencies.scala | 2 +- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala b/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala index aed0536d6..302ee0381 100644 --- a/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala +++ b/akka-projection-core/src/main/scala/akka/projection/internal/ProjectionSettings.scala @@ -7,11 +7,11 @@ package akka.projection.internal import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration._ import scala.jdk.DurationConverters._ - import akka.actor.typed.ActorSystem import akka.annotation.InternalApi import akka.projection.HandlerRecoveryStrategy import akka.projection.Projection +import akka.stream.Attributes.LogLevels import akka.stream.RestartSettings import com.typesafe.config.Config @@ -50,7 +50,12 @@ private[projection] object ProjectionSettings { val maxRestarts = restartBackoffConfig.getInt("max-restarts") if (maxRestarts >= 0) RestartSettings(minBackoff, maxBackoff, randomFactor) else RestartSettings(minBackoff, maxBackoff, randomFactor).withMaxRestarts(maxRestarts, minBackoff) - } + }.withLogSettings( + RestartSettings.LogSettings.defaultSettings + .withLogLevel(LogLevels.Warning) + // Once we have retried many times, it could still be a transient failure but is + // more likely to be a permanent problem, so increase verbosity/include full stack trace + .withVerboseLogsAfter(5)) new ProjectionSettings( restartSettings, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 686c7f325..cb52b078c 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -376,11 +376,7 @@ final class GrpcReadJournal private ( .invoke(streamIn) .recover { case ex: akka.grpc.GrpcServiceException if ex.status.getCode == Status.Code.UNAVAILABLE => - // this means we couldn't connect, will be retried, relatively common, so make it less noisy, - // Users still want to be able to figure out non-transient errors, so log with full exception details at debug val port = clientSettings.servicePortName.getOrElse(clientSettings.defaultPort.toString) - if (log.isDebugEnabled) - log.debug(s"Connection to ${clientSettings.serviceName}:$port for stream id $streamId failed or lost", ex) throw new ConnectionException(clientSettings.serviceName, port, streamId) case th: Throwable => diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala index 2cfb3c121..67318a9b7 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/ConnectionException.scala @@ -6,12 +6,9 @@ package akka.projection.grpc.internal import akka.annotation.InternalApi -import scala.util.control.NoStackTrace - /** * INTERNAL API */ @InternalApi private[akka] final class ConnectionException(host: String, port: String, streamId: String) extends RuntimeException(s"Connection to $host:$port for stream id $streamId failed or lost, will be retried") - with NoStackTrace diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9e1689c68..d77e3dea9 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -16,7 +16,7 @@ object Dependencies { val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3 object Versions { - val Akka = sys.props.getOrElse("build.akka.version", "2.10.0") + val Akka = sys.props.getOrElse("build.akka.version", "2.10.0+18-e14c0ccd+20241115-1522-SNAPSHOT") val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" } val Alpakka = "9.0.0"