Skip to content

Commit

Permalink
bump: Akka core 2.10.1, r2dbc 3.1.2 (#1305)
Browse files Browse the repository at this point in the history
* include slice in log
* ignore scala-lang updates
* akka-pki dependency, due to mixed versions
  • Loading branch information
patriknw authored Jan 28, 2025
1 parent 6ec48a0 commit f1342e8
Show file tree
Hide file tree
Showing 28 changed files with 103 additions and 68 deletions.
2 changes: 2 additions & 0 deletions .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ updates.ignore = [

{groupId = "com.fasterxml.jackson.core" }
{ groupId = "ch.qos.logback", artifactId = "logback-classic", version = "1.5." }

{ groupId = "org.scala-lang" }
]

updatePullRequests = false
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ class DynamoDBTimestampOffsetProjectionSpec
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand Down Expand Up @@ -635,7 +635,7 @@ class DynamoDBTimestampOffsetProjectionSpec
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand All @@ -144,7 +144,7 @@ abstract class DynamoDBTimestampOffsetStoreBaseSpec(config: Config)
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class LoadEventQuerySpec(testContainerConf: TestContainerConf)
.loadEnvelope[String](pid.id, sequenceNr = 1L)
.futureValue
env.filtered shouldBe true
env.eventMetadata shouldBe None
env.internalEventMetadata shouldBe None
env.eventOption.isEmpty shouldBe true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class EventProducerServiceSpec
EventProducerSource(entityType7, streamId7, transformation, settings)
.withReplicatedEventMetadataTransformation(
env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {
"transform low level with metadata" in {
val transformer =
Transformation.empty.registerAsyncEnvelopeMapper((env: EventEnvelope[String]) =>
Future.successful(env.eventMetadata))
Future.successful(env.metadata[String]))
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
}

Expand All @@ -63,7 +63,7 @@ class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {

"fallback low level with metadata if no transformer exist for event" in {
val transformer = Transformation.empty.registerAsyncEnvelopeOrElseMapper((env: EventEnvelope[Any]) =>
Future.successful(env.eventMetadata))
Future.successful(env.metadata[String]))
transformer(envelope("whatever", Some("meta"))).futureValue should ===(Some("meta"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ private[akka] object EventPusherConsumerServiceImpl {
log.trace("Ignoring filtered event [{}] for pid [{}]", envelope.sequenceNr, envelope.persistenceId)
Future.successful(Done)
} else {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
envelope.metadata[ReplicatedEventMetadata] match {
case Some(replicatedEventMetadata) =>
// send event to entity in this replica
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(replicationSettings.selfReplicaId)
Expand All @@ -104,7 +104,8 @@ private[akka] object EventPusherConsumerServiceImpl {
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
replicatedEventMetadata.version,
envelope.internalEventMetadata)),
Some(replyTo)))
}

Expand All @@ -123,9 +124,9 @@ private[akka] object EventPusherConsumerServiceImpl {
error))
askResult

case unexpected =>
case None =>
throw new IllegalArgumentException(
s"Got unexpected type of event envelope metadata: ${unexpected.getClass} (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
s"Missing replication metadata (pid [${envelope.persistenceId}], seq_nr [${envelope.sequenceNr}]" +
", is the remote entity really a Replicated Event Sourced Entity?")
}
}
Expand All @@ -141,7 +142,7 @@ private[akka] object EventPusherConsumerServiceImpl {
event = envelope.eventOption.getOrElse(FilteredPayload),
isSnapshotEvent = fromSnapshot(envelope),
fillSequenceNumberGaps = fillSequenceNumberGaps,
metadata = envelope.eventMetadata,
metadata = envelope.internalEventMetadata,
tags = envelope.tags,
replyTo = replyTo))(d.settings.journalWriteTimeout, system.scheduler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ import scala.concurrent.ExecutionContext
val pid = env.persistenceId

// replicaId is used for validation of replay requests, to avoid replay for other replicas
if (replicaId.isEmpty && env.eventMetadata.exists(_.isInstanceOf[ReplicatedEventMetadata]))
if (replicaId.isEmpty && env.metadata[ReplicatedEventMetadata].isDefined)
replicaId = Some(ReplicationId.fromString(pid).replicaId)

if (producerFilter(env) && filter.matches(env)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private[akka] object ProtobufProtocolConversions {

def toEvent(transformedEvent: Any): Event = {
val protoEvent = protoAnySerialization.serialize(transformedEvent)
val metadata = env.eventMetadata.map(protoAnySerialization.serialize)
val metadata = env.internalEventMetadata.map(protoAnySerialization.serialize)
Event(
persistenceId = env.persistenceId,
seqNr = env.sequenceNr,
Expand Down Expand Up @@ -169,7 +169,7 @@ private[akka] object ProtobufProtocolConversions {
event.seqNr,
evt,
eventOffset.timestamp.toEpochMilli,
eventMetadata = metadata,
_eventMetadata = metadata,
PersistenceId.extractEntityType(event.persistenceId),
event.slice,
filtered = false,
Expand All @@ -188,7 +188,7 @@ private[akka] object ProtobufProtocolConversions {
event.seqNr,
eventOption = Some(serializedEvent),
eventOffset.timestamp.toEpochMilli,
eventMetadata = metadata,
_eventMetadata = metadata,
PersistenceId.extractEntityType(event.persistenceId),
event.slice,
filtered = false,
Expand All @@ -213,7 +213,7 @@ private[akka] object ProtobufProtocolConversions {
filtered.seqNr,
None,
eventOffset.timestamp.toEpochMilli,
eventMetadata = None,
_eventMetadata = None,
PersistenceId.extractEntityType(filtered.persistenceId),
filtered.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ import akka.projection.grpc.internal.proto.ReplicaInfo
if (envelope.eventOption.isEmpty)
true
else
envelope.eventMetadata match {
case Some(meta: ReplicatedEventMetadata) =>
envelope.metadata[ReplicatedEventMetadata] match {
case Some(meta) =>
!exclude(meta.originReplica)
case _ =>
case None =>
throw new IllegalArgumentException(
s"Got an event without replication metadata, not supported (pid: ${envelope.persistenceId}, seq_nr: ${envelope.sequenceNr})")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private[akka] object ReplicationImpl {
settings.producerFilter)
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
.withReplicatedEventMetadataTransformation(env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down Expand Up @@ -218,8 +218,8 @@ private[akka] object ReplicationImpl {
envelope.persistenceId) {
case (envelope, _) =>
if (!envelope.filtered) {
envelope.eventMetadata match {
case Some(replicatedEventMetadata: ReplicatedEventMetadata)
envelope.metadata[ReplicatedEventMetadata] match {
case Some(replicatedEventMetadata)
if replicatedEventMetadata.originReplica == settings.selfReplicaId =>
// skipping events originating from self replica (break cycle)
if (log.isTraceEnabled)
Expand All @@ -231,7 +231,7 @@ private[akka] object ReplicationImpl {
envelope.sequenceNr)
Future.successful(Done)

case Some(replicatedEventMetadata: ReplicatedEventMetadata) =>
case Some(replicatedEventMetadata) =>
val replicationId = ReplicationId.fromString(envelope.persistenceId)
val destinationReplicaId = replicationId.withReplica(settings.selfReplicaId)
val entityRef =
Expand All @@ -251,9 +251,11 @@ private[akka] object ReplicationImpl {
replicatedEventMetadata.originSequenceNr,
envelope.event,
envelope.timestamp,
Some(new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version)),
Some(
new ReplicatedPublishedEventMetaData(
replicatedEventMetadata.originReplica,
replicatedEventMetadata.version,
envelope.internalEventMetadata)),
Some(replyTo)))
askResult.failed.foreach(error =>
log.warn(
Expand Down Expand Up @@ -360,7 +362,7 @@ private[akka] object ReplicationImpl {
settings.eventProducerSettings.withAkkaSerializationOnly())
.withReplicatedEventOriginFilter(new EventOriginFilter(settings.selfReplicaId))
.withReplicatedEventMetadataTransformation(env =>
if (env.eventMetadata.isDefined) None
if (env.metadata[ReplicatedEventMetadata].isDefined) None
else {
// migrated from non-replicated, fill in metadata
Some(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ class R2dbcTimestampOffsetProjectionSpec
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand Down Expand Up @@ -484,7 +484,7 @@ class R2dbcTimestampOffsetProjectionSpec
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class R2dbcTimestampOffsetStoreSpec
env.sequenceNr,
eventOption = None,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
env.filtered,
Expand All @@ -109,7 +109,7 @@ class R2dbcTimestampOffsetStoreSpec
env.sequenceNr,
env.eventOption,
env.timestamp,
env.eventMetadata,
env.internalEventMetadata,
env.entityType,
env.slice,
filtered = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private[projection] class PostgresOffsetStoreDao(
slice: Int): Future[immutable.IndexedSeq[R2dbcOffsetStore.RecordWithProjectionKey]] = {
r2dbcExecutor.select("read timestamp offset")(
conn => {
logger.trace("reading timestamp offset for [{}]", projectionId)
logger.trace("reading timestamp offset slice [{}] for [{}]", slice, projectionId)
conn
.createStatement(selectTimestampOffsetSql)
.bind(0, slice)
Expand Down
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ object Dependencies {
val ScalaVersions = Dependencies.Scala2Versions :+ Dependencies.Scala3

object Versions {
val Akka = sys.props.getOrElse("build.akka.version", "2.10.0")
val Akka = sys.props.getOrElse("build.akka.version", "2.10.1")
val AkkaVersionInDocs = VersionNumber(Akka).numbers match { case Seq(major, minor, _*) => s"$major.$minor" }

val Alpakka = "9.0.0"
Expand All @@ -35,7 +35,7 @@ object Dependencies {
val AkkaPersistenceCassandra = "1.3.0"
val AkkaPersistenceJdbc = "5.5.0"

val AkkaPersistenceR2dbc = "1.3.1"
val AkkaPersistenceR2dbc = "1.3.2"
val AkkaPersistenceR2dbcVersionInDocs = VersionNumber(AkkaPersistenceR2dbc).numbers match {
case Seq(major, minor, _*) => s"$major.$minor"
}
Expand Down
9 changes: 7 additions & 2 deletions samples/grpc/iot-service-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.10.0</akka.version>
<akka.version>2.10.1</akka.version>
<akka-projection.version>1.6.7</akka-projection.version>
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
<akka-management.version>1.6.0</akka-management.version>
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
<akka-grpc.version>2.5.0</akka-grpc.version>
Expand Down Expand Up @@ -85,6 +85,11 @@
<artifactId>akka-discovery_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-pki_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-persistence-r2dbc_${scala.binary.version}</artifactId>
Expand Down
6 changes: 3 additions & 3 deletions samples/grpc/iot-service-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")

resolvers += "Akka library repository".at("https://repo.akka.io/maven")

scalaVersion := "2.13.15"
scalaVersion := "2.13.16"

Compile / scalacOptions ++= Seq(
"-release:11",
Expand All @@ -28,10 +28,10 @@ run / javaOptions ++= sys.props
.fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res"))
Global / cancelable := false // ctrl-c

val AkkaVersion = "2.10.0"
val AkkaVersion = "2.10.1"
val AkkaHttpVersion = "10.7.0"
val AkkaManagementVersion = "1.6.0"
val AkkaPersistenceR2dbcVersion = "1.3.1"
val AkkaPersistenceR2dbcVersion = "1.3.2"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.6.7")
val AkkaDiagnosticsVersion = "2.2.0"
Expand Down
9 changes: 7 additions & 2 deletions samples/grpc/local-drone-control-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<akka.version>2.10.0</akka.version>
<akka.version>2.10.1</akka.version>
<akka-projection.version>1.6.7</akka-projection.version>
<akka-persistence-r2dbc.version>1.3.1</akka-persistence-r2dbc.version>
<akka-persistence-r2dbc.version>1.3.2</akka-persistence-r2dbc.version>
<akka-management.version>1.6.0</akka-management.version>
<akka-diagnostics.version>2.2.0</akka-diagnostics.version>
<akka-http.version>10.7.0</akka-http.version>
Expand Down Expand Up @@ -90,6 +90,11 @@
<artifactId>akka-discovery_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-pki_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-persistence-r2dbc_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
scalaVersion := "2.13.15"
scalaVersion := "2.13.16"

enablePlugins(GatlingPlugin)

Expand Down
6 changes: 3 additions & 3 deletions samples/grpc/local-drone-control-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")

resolvers += "Akka library repository".at("https://repo.akka.io/maven")

scalaVersion := "2.13.15"
scalaVersion := "2.13.16"

Compile / scalacOptions ++= Seq(
"-release:11",
Expand All @@ -30,10 +30,10 @@ run / javaOptions ++= sys.props
.fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res"))
Global / cancelable := false // ctrl-c

val AkkaVersion = "2.10.0"
val AkkaVersion = "2.10.1"
val AkkaHttpVersion = "10.7.0"
val AkkaManagementVersion = "1.6.0"
val AkkaPersistenceR2dbcVersion = "1.3.1"
val AkkaPersistenceR2dbcVersion = "1.3.2"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.6.7")
val AkkaDiagnosticsVersion = "2.2.0"
Expand Down
Loading

0 comments on commit f1342e8

Please sign in to comment.