From 35e134e4ecb4c1b15453925e1d93f6ed09a238e3 Mon Sep 17 00:00:00 2001 From: James Roper Date: Tue, 27 Aug 2019 13:38:21 +1000 Subject: [PATCH] Added fixes and allowed building both native and non native images --- build.sbt | 86 ++++++++++++++----- node-support/src/crdt-support.js | 3 +- operator/cloudstate.yaml | 6 +- operator/deploy/02-operator-config.yaml | 6 +- proxy/cassandra/src/graal/reflect-config.json | 14 ++- proxy/core/src/graal/reflect-config.json | 14 ++- .../io/cloudstate/proxy/crdt/CrdtEntity.scala | 38 +++++++- .../proxy/crdt/CrdtEntitySpec.scala | 10 ++- 8 files changed, 144 insertions(+), 33 deletions(-) diff --git a/build.sbt b/build.sbt index a8dbf29ff..e68c617f2 100644 --- a/build.sbt +++ b/build.sbt @@ -1,3 +1,4 @@ +import java.io.File import java.util.Date import com.typesafe.sbt.packager.docker.DockerChmodType @@ -86,7 +87,8 @@ lazy val root = (project in file(".")) .aggregate(`proxy-core`, `proxy-cassandra`, `java-support`, `java-shopping-cart`,`akka-client`, operator, `tck`) .settings(common) -lazy val proxyDockerBuild = settingKey[Option[(String, String)]]("Docker artifact name and configuration file which gets overridden by the buildProxy command") +lazy val proxyDockerBuild = settingKey[Option[(String, Option[String])]]("Docker artifact name and configuration file which gets overridden by the buildProxy command") +lazy val nativeImageDockerBuild = settingKey[Boolean]("Whether the docker image should be based on the native image or not.") val dockerTagVersion = !sys.props.get("docker.tag.version").forall(_ == "false") @@ -119,50 +121,92 @@ def dockerSettings: Seq[Setting[_]] = Seq( def buildProxyHelp(commandName: String, name: String) = Help((s"$commandName ", s"Execute the given docker scoped task (eg, publishLocal or publish) for the the $name build of the proxy.")) -def buildProxyCommand(commandName: String, project: => Project, name: String, configResource: String): Command = +def buildProxyCommand(commandName: String, project: => Project, name: String, configResource: Option[String], native: Boolean): Command = { + val cn = if (native) s"dockerBuildNative$commandName" + else s"dockerBuild$commandName" + val imageName = if (native) s"native-$name" + else name + val configResourceSetting = configResource match { + case Some(resource) => "Some(\"" + resource + "\")" + case None => "None" + } Command.single( - commandName, - buildProxyHelp(commandName, name) + cn, + buildProxyHelp(cn, name) ) { (state, command) => List( s"project ${project.id}", - s"""set proxyDockerBuild := Some(("cloudstate-proxy-$name", "$configResource"))""", + s"""set proxyDockerBuild := Some(("cloudstate-proxy-$imageName", $configResourceSetting))""", + s"""set nativeImageDockerBuild := $native""", s"docker:$command", "set proxyDockerBuild := None", "project root" ) ::: state } - -def dockerBuildCassandraCommand = - Command.single("dockerBuildCassandra", buildProxyHelp("dockerBuildCassandra", "cassandra")) { (state, command) => - s"proxy-cassandra/docker:$command" :: state - } +} commands ++= Seq( - buildProxyCommand("dockerBuildDevMode", `proxy-core`, "dev-mode", "dev-mode.conf"), - buildProxyCommand("dockerBuildNoJournal", `proxy-core`, "no-journal", "no-journal.conf"), - buildProxyCommand("dockerBuildInMemory", `proxy-core`, "in-memory", "in-memory.conf"), - dockerBuildCassandraCommand + buildProxyCommand("DevMode", `proxy-core`, "dev-mode", Some("dev-mode.conf"), true), + buildProxyCommand("DevMode", `proxy-core`, "dev-mode", Some("dev-mode.conf"), false), + buildProxyCommand("NoJournal", `proxy-core`, "no-journal", Some("no-journal.conf"), true), + buildProxyCommand("NoJournal", `proxy-core`, "no-journal", Some("no-journal.conf"), false), + buildProxyCommand("InMemory", `proxy-core`, "in-memory", Some("in-memory.conf"), true), + buildProxyCommand("InMemory", `proxy-core`, "in-memory", Some("in-memory.conf"), false), + buildProxyCommand("Cassandra", `proxy-cassandra`, "cassandra", None, true), + buildProxyCommand("Cassandra", `proxy-cassandra`, "cassandra", None, false), + Command.single("dockerBuildAll", buildProxyHelp("dockerBuildAll", "all")) { (state, command) => + List("DevMode", "NoJournal", "InMemory", "Cassandra") + .flatMap(c => List(c, s"Native$c")) + .map(c => s"dockerBuild$c $command") ::: state + } ) // Shared settings for native image and docker builds def nativeImageDockerSettings: Seq[Setting[_]] = dockerSettings ++ Seq( + nativeImageDockerBuild := false, graalVMVersion := Some("19.1.1"), graalVMNativeImageOptions ++= sharedNativeImageSettings, - (mappings in Universal) := Seq( - (packageBin in GraalVMNativeImage).value -> s"bin/${executableScriptName.value}" - ), + (mappings in Docker) := Def.taskDyn { + if (nativeImageDockerBuild.value) { + Def.task { + Seq( + (packageBin in GraalVMNativeImage).value -> s"${(defaultLinuxInstallLocation in Docker).value}/bin/${executableScriptName.value}" + ) + } + } else { + Def.task { + // This is copied from the native packager DockerPlugin, because I don't think a dynamic task can reuse the + // old value of itself in the dynamic part. + def renameDests(from: Seq[(File, String)], dest: String) = + for { + (f, path) <- from + newPath = "%s/%s" format (dest, path) + } yield (f, newPath) + + renameDests((mappings in Universal).value, (defaultLinuxInstallLocation in Docker).value) + } + } + }.value, dockerBaseImage := "bitnami/java:11-prod", // Need to make sure it has group execute permission // Note I think this is leading to quite large docker images :( - dockerChmodType := DockerChmodType.Custom("u+x,g+x"), + dockerChmodType := { + val old = dockerChmodType.value + if (nativeImageDockerBuild.value) { + DockerChmodType.Custom("u+x,g+x") + } else { + old + } + }, dockerEntrypoint := { val old = dockerEntrypoint.value - val withLibraryPath = old :+ "-Djava.library.path=/opt/bitnami/java/lib" + val withLibraryPath = if (nativeImageDockerBuild.value) { + old :+ "-Djava.library.path=/opt/bitnami/java/lib" + } else old proxyDockerBuild.value match { - case Some((_, configResource)) => withLibraryPath :+ s"-Dconfig.resource=$configResource" - case None => withLibraryPath + case Some((_, Some(configResource))) => withLibraryPath :+ s"-Dconfig.resource=$configResource" + case _ => withLibraryPath } } ) diff --git a/node-support/src/crdt-support.js b/node-support/src/crdt-support.js index 726118071..5049fc5b9 100644 --- a/node-support/src/crdt-support.js +++ b/node-support/src/crdt-support.js @@ -395,6 +395,7 @@ class CrdtHandler { handleCrdtStreamIn(crdtStreamIn) { if (crdtStreamIn.state) { + this.streamDebug("Received state for CRDT type %s", crdtStreamIn.changed.delta); this.handleState(crdtStreamIn.state); this.handleStateChange(); } else if (crdtStreamIn.changed) { @@ -402,7 +403,7 @@ class CrdtHandler { this.currentState.applyDelta(crdtStreamIn.changed, this.entity.anySupport, crdts.createCrdtForState); this.handleStateChange(); } else if (crdtStreamIn.deleted) { - this.streamDebug("CRDT deleted"); + this.streamDebug("Received CRDT deleted"); this.currentState = null; this.handleStateChange(); } else if (crdtStreamIn.command) { diff --git a/operator/cloudstate.yaml b/operator/cloudstate.yaml index 84a8471e9..cfbccc883 100644 --- a/operator/cloudstate.yaml +++ b/operator/cloudstate.yaml @@ -72,9 +72,9 @@ data: # Proxy configuration proxy { image { - cassandra = "cloudstateio/cloudstate-proxy-cassandra:latest" - no-journal = "cloudstateio/cloudstate-proxy-no-journal:latest" - in-memory = "cloudstateio/cloudstate-proxy-in-memory:latest" + cassandra = "cloudstateio/cloudstate-proxy-native-cassandra:latest" + no-journal = "cloudstateio/cloudstate-proxy-native-no-journal:latest" + in-memory = "cloudstateio/cloudstate-proxy-native-in-memory:latest" } } } diff --git a/operator/deploy/02-operator-config.yaml b/operator/deploy/02-operator-config.yaml index d53eba6b8..c868f6787 100644 --- a/operator/deploy/02-operator-config.yaml +++ b/operator/deploy/02-operator-config.yaml @@ -16,9 +16,9 @@ data: # Proxy configuration proxy { image { - cassandra = "cloudstateio/cloudstate-proxy-cassandra:latest" - no-journal = "cloudstateio/cloudstate-proxy-no-journal:latest" - in-memory = "cloudstateio/cloudstate-proxy-in-memory:latest" + cassandra = "cloudstateio/cloudstate-proxy-native-cassandra:latest" + no-journal = "cloudstateio/cloudstate-proxy-native-no-journal:latest" + in-memory = "cloudstateio/cloudstate-proxy-native-in-memory:latest" } } } diff --git a/proxy/cassandra/src/graal/reflect-config.json b/proxy/cassandra/src/graal/reflect-config.json index e6a386450..32e2031ee 100644 --- a/proxy/cassandra/src/graal/reflect-config.json +++ b/proxy/cassandra/src/graal/reflect-config.json @@ -132,7 +132,7 @@ { "name":"akka.cluster.ClusterHeartbeatReceiver", "allDeclaredFields":true, - "methods":[{"name":"","parameterTypes":[] }] + "methods":[{"name":"","parameterTypes":["scala.Function0"] }] }, { "name":"akka.cluster.ClusterHeartbeatSender", @@ -629,6 +629,10 @@ { "name":"akka.routing.TailChoppingPool" }, +{ + "name":"akka.serialization.BooleanSerializer", + "methods":[{"name":"","parameterTypes":["akka.actor.ExtendedActorSystem"] }] +}, { "name":"akka.serialization.ByteArraySerializer", "methods":[{"name":"","parameterTypes":["akka.actor.ExtendedActorSystem"] }] @@ -961,6 +965,14 @@ "name":"io.cloudstate.proxy.autoscaler.NoScaler", "allDeclaredFields":true }, +{ + "name":"io.cloudstate.proxy.crdt.CrdtEntity", + "fields" : [ + { "name" : "context", "allowWrite" : true }, + { "name" : "self", "allowWrite" : true } + ], + "allDeclaredFields":true +}, { "name":"io.cloudstate.proxy.eventsourced.EventSourcedEntity", "fields" : [ diff --git a/proxy/core/src/graal/reflect-config.json b/proxy/core/src/graal/reflect-config.json index 2db9bf063..e3e91b129 100644 --- a/proxy/core/src/graal/reflect-config.json +++ b/proxy/core/src/graal/reflect-config.json @@ -121,7 +121,7 @@ { "name":"akka.cluster.ClusterHeartbeatReceiver", "allDeclaredFields":true, - "methods":[{"name":"","parameterTypes":[] }] + "methods":[{"name":"","parameterTypes":["scala.Function0"] }] }, { "name":"akka.cluster.ClusterHeartbeatSender", @@ -599,6 +599,10 @@ { "name":"akka.routing.TailChoppingPool" }, +{ + "name":"akka.serialization.BooleanSerializer", + "methods":[{"name":"","parameterTypes":["akka.actor.ExtendedActorSystem"] }] +}, { "name":"akka.serialization.ByteArraySerializer", "methods":[{"name":"","parameterTypes":["akka.actor.ExtendedActorSystem"] }] @@ -883,6 +887,14 @@ "name":"io.cloudstate.proxy.autoscaler.NoScaler", "allDeclaredFields":true }, +{ + "name":"io.cloudstate.proxy.crdt.CrdtEntity", + "fields" : [ + { "name" : "context", "allowWrite" : true }, + { "name" : "self", "allowWrite" : true } + ], + "allDeclaredFields":true +}, { "name":"io.cloudstate.proxy.eventsourced.EventSourcedEntity", "fields" : [ diff --git a/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala b/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala index a30ade98c..a7f9cfd5d 100644 --- a/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala +++ b/proxy/core/src/main/scala/io/cloudstate/proxy/crdt/CrdtEntity.scala @@ -61,6 +61,37 @@ object CrdtEntity { * the rest, and whenever update or mergeDelta is called, keep track of the changes in a shared delta tracking * object. That object should get set by this actor, and once present, all calls to merge/mergeDelta/update etc * will add changes to the delta tracking object. + * + * So here's the general principle of how this actor works. + * + * - The actor first establishes a stream to the user function as well as fetches the current state of the entity. + * Until both of those are returned, it stashes commands, and once it has both, it unstashes. + * - When a command is received, if the command is streamed, we need to respond with a Source that materializes to + * an actor ref that we can send replies to, so that gets done first, otherwise we go straight to command handling + * logic. + * - The actor seeks to keep its state in sync with the user functions state. The user functions state is not a CRDT, + * so at any one time, only one of them may be allowed to update their state, otherwise concurrent updates won't be + * able to be reconciled. There are two times that the user function is allowed to update its state, one is while + * it's handling a command, the other is while it's handling a stream cancelled. If it is not currently handling a + * command or a stream cancelled, then the actor is free to update the state, and push deltas to the user function + * to keep it in sync. The outstandingMutatingRequests variable is used to track which mode we are in, if greater + * than zero, we are not allowed to update our state except on direction by the user function. + * - We use a replicator subscription to receive state updates. If outstandingMutatingRequests is not zero, we ignore + * any change events from that subscription, otherwise, we convert them to deltas and forward them to the user + * function. + * - When we receive a command, we do the following: + * - Increment outstanding mutating operations + * - Forward the command to the user function + * - When we receive a reply from the user function, we do the following: + * - Perform any update as required by the user function + * - Send a reply back to the stream/initiator of the command + * - If there is more than one outstanding mutating operation, we just decrement it, and we're done. + * - Otherwise, we don't decrement yet. Instead, because we may have ignored some updates while the operations were + * underway, we do a local get on the replicator. + * - When we get the response (either success or not found) we decrement outstanding mutating operations, and then + * check if it's zero (a command have have arrived while we were doing the read), and if it is, then we calculate + * and send any deltas found, and we're done. + * - Similar logic is also used for stream cancelled messages. */ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, entityDiscovery: EntityDiscovery)(implicit mat: Materializer) extends Actor with Stash with ActorLogging { @@ -154,7 +185,7 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en private def maybeStart() = { if (relay != null && state != null) { - log.debug("Received relay and state, starting.") + log.debug("{} - Received relay and state, starting.", entityId) val wireState = state.map(WireTransformer.toWireState) @@ -179,7 +210,7 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en case CrdtChange.NoChange => // Nothing to do case CrdtChange.IncompatibleChange => - throw new RuntimeException(s"Incompatible CRDT change from $value to $data") + throw new RuntimeException(s"Incompatible CRDT change from $value to $data for entity $entityId") case CrdtChange.Updated(delta) => sendToRelay(CrdtStreamIn.Message.Changed(delta)) } @@ -301,6 +332,9 @@ final class CrdtEntity(client: Crdt, configuration: CrdtEntity.Configuration, en maybeSendAndUpdateState(success.dataValue) } + case NotFound(_, _) => + outstandingMutatingOperations -= 1 + case UpdateTimeout(_, Some(InitiatorReply(commandId, _, _))) => failCommandAndCrash(commandId, "Failed to update CRDT at requested write consistency", None) diff --git a/proxy/core/src/test/scala/io/cloudstate/proxy/crdt/CrdtEntitySpec.scala b/proxy/core/src/test/scala/io/cloudstate/proxy/crdt/CrdtEntitySpec.scala index 9d34853e2..bae986524 100644 --- a/proxy/core/src/test/scala/io/cloudstate/proxy/crdt/CrdtEntitySpec.scala +++ b/proxy/core/src/test/scala/io/cloudstate/proxy/crdt/CrdtEntitySpec.scala @@ -55,7 +55,7 @@ class CrdtEntitySpec extends AbstractCrdtEntitySpec { expectDelta().change shouldBe 9 } - "send missed updates once a command has been handled with more than local consitency" in { + "send missed updates once a command has been handled with more than local consistency" in { update(_ :+ 5) createAndExpectInit() val cid = sendAndExpectCommand("cmd", command) @@ -67,6 +67,14 @@ class CrdtEntitySpec extends AbstractCrdtEntitySpec { expectDelta().change shouldBe 9 } + "send updates when the entity still doesn't exist after a command has been handled" in { + createAndExpectInit() + val cid = sendAndExpectCommand("cmd", command) + sendAndExpectReply(cid, CrdtStateAction.Action.Empty, CrdtWriteConsistency.LOCAL) + update(_ :+ 3) + expectState().value == 3 + } + "not send missed updates if there is still another command being handled" in { update(_ :+ 5) createAndExpectInit()