Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bump scala and sbt #149

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .bsp/sbt.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"name":"sbt","version":"1.5.1","bspVersion":"2.0.0-M5","languages":["scala"],"argv":["/usr/lib/jvm/java-17-openjdk-amd64/bin/java","-Xms100m","-Xmx100m","-classpath","/home/geobos/.local/share/JetBrains/IntelliJIdea2021.2/Scala/launcher/sbt-launch.jar","xsbt.boot.Boot","-bsp","--sbt-launch-jar=/home/geobos/.local/share/JetBrains/IntelliJIdea2021.2/Scala/launcher/sbt-launch.jar"]}
56 changes: 30 additions & 26 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@

scalaVersion := "2.12.6" // just for the root
scalaVersion := "2.13.8" // just for the root

val akkaVersion = "2.5.16"
val akkaHttpVersion = "10.1.4"
val akkaInMemory = "com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.1.1"
val akkaVersion = "2.5.27"
val akkaHttpVersion = "10.1.8"
val akkaInMemory = "com.github.dnvriend" %% "akka-persistence-inmemory" % "2.5.15.2"
val assertJ = "org.assertj" % "assertj-core" % "3.2.0"

import sbtrelease._
Expand All @@ -19,7 +19,7 @@ def setVersionOnly(selectVersion: Versions => String): ReleaseStep = { st: Stat
val versionStr = (if (useGlobal) globalVersionString else versionString) format selected

reapply(Seq(
if (useGlobal) version in ThisBuild := selected
if (useGlobal) ThisBuild / version := selected
else version := selected
), st)
}
Expand All @@ -32,7 +32,7 @@ releaseVersionBump := { System.getProperty("BUMP", "default").toLowerCase match
case "bugfix" => sbtrelease.Version.Bump.Bugfix
case "default" => sbtrelease.Version.Bump.default
}}

releaseVersion := { ver => Version(ver)
.map(_.withoutQualifier)
.map(_.bump(releaseVersionBump.value).string).getOrElse(versionFormatError)
Expand All @@ -49,7 +49,7 @@ releaseProcess := Seq(
inquireVersions,
setReleaseVersion,
runClean,
runTest,
runTest,
tagRelease,
publishArtifacts,
pushChanges
Expand All @@ -58,26 +58,29 @@ releaseProcess := Seq(
lazy val projectSettings = Seq(
licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT"))),
organization := "com.tradeshift",
scalaVersion := "2.12.6",
crossScalaVersions := Seq("2.12.6"),
scalaVersion := "2.13.8",
crossScalaVersions := Seq("2.13.8"),
publishMavenStyle := true,
javacOptions ++= Seq("-source", "1.8"),
javacOptions in (Compile, Keys.compile) ++= Seq("-target", "1.8", "-Xlint", "-Xlint:-processing", "-Xlint:-serial", "-Werror"),
javacOptions in doc ++= Seq("-Xdoclint:none"),
Compile / Keys.compile / javacOptions ++= Seq("-target", "1.8", "-Xlint", "-Xlint:-processing", "-Xlint:-serial"),
doc / javacOptions ++= Seq("-Xdoclint:none"),
scalacOptions ++= Seq("-target:jvm-1.8"),
EclipseKeys.executionEnvironment := Some(EclipseExecutionEnvironment.JavaSE18),
EclipseKeys.createSrc := EclipseCreateSrc.Default + EclipseCreateSrc.ManagedClasses,
EclipseKeys.withSource := true,
javaOptions += "-Xmx128M",
fork := true,
resolvers ++= Seq(
Resolver.bintrayRepo("readytalk", "maven"),
Resolver.jcenterRepo),
"Local Maven Repository" at "file://"+Path.userHome+"/.m2/repository",
"tradeshift-public" at "https://maven.tradeshift.net/content/repositories/tradeshift-public",
"tradeshift-snapshots" at "https://maven.tradeshift.net/content/repositories/tradeshift-public-snapshots"),
dependencyOverrides += "com.google.protobuf" % "protobuf-java" % "2.6.1",
protobufRunProtoc in ProtobufConfig := { args =>
ProtobufConfig / protobufRunProtoc:= { args =>
com.github.os72.protocjar.Protoc.runProtoc("-v261" +: args.toArray)
},
testOptions += Tests.Argument(TestFrameworks.JUnit, "-a"),
credentials += Credentials(Path.userHome / ".ivy2" / ".credentials"),

libraryDependencies ++= Seq(
"io.vavr" % "vavr" % "0.9.0",
"org.slf4j" % "slf4j-api" % "1.7.12",
Expand All @@ -92,7 +95,7 @@ lazy val projectSettings = Seq(
"org.forgerock.cuppa" % "cuppa" % "1.3.1" % "test",
"org.forgerock.cuppa" % "cuppa-junit" % "1.3.1" % "test",
"org.apache.cassandra" % "cassandra-all" % "3.9" % "test" exclude("ch.qos.logback", "logback-classic"),
"com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "0.83" % "test",
"com.typesafe.akka" %% "akka-persistence-cassandra-launcher" % "1.0.5" % "test",
"com.github.tomakehurst" % "wiremock" % "1.58" % "test",
"org.xmlunit" % "xmlunit-core" % "2.5.0" % "test",
"org.xmlunit" % "xmlunit-matchers" % "2.5.0" % "test"
Expand All @@ -101,10 +104,10 @@ lazy val projectSettings = Seq(
git.baseVersion := "0.1.0",
git.gitTagToVersionNumber := {
case VersionRegex(v,"") => Some(v)
case VersionRegex(v,"SNAPSHOT") => Some(s"$v-SNAPSHOT")
case VersionRegex(v,"SNAPSHOT") => Some(s"$v-SNAPSHOT")
case VersionRegex(v,s) => Some(s"$v-$s-SNAPSHOT")
case s => None
}
}
)

lazy val commonSettings = projectSettings ++ Seq(
Expand All @@ -119,18 +122,19 @@ lazy val commonSettings = projectSettings ++ Seq(
"com.typesafe.akka" %% "akka-persistence-query" % akkaVersion,
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-jackson" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-persistence-cassandra" % "0.83",
"com.typesafe.akka" %% "akka-persistence-cassandra" % "1.0.5",
"org.quicktheories" % "quicktheories" % "0.26" % "test",
"org.slf4j" % "slf4j-log4j12" % "1.7.12"
"org.slf4j" % "slf4j-log4j12" % "1.7.12",
"javax.xml.bind" % "jaxb-api" % "2.3.1"
)
}
}
)

lazy val kamonSettings = Seq(
libraryDependencies ++= {
Seq(
"io.kamon" %% "kamon-core" % "1.1.3",
"io.kamon" %% "kamon-system-metrics" % "1.0.0",
"io.kamon" %% "kamon-core" % "2.5.1",
"io.kamon" %% "kamon-system-metrics" % "2.5.0",
"org.aspectj" % "aspectjweaver" % "1.8.13",
"com.readytalk" % "metrics3-statsd" % "4.1.0" // to log cassandra (codahale / dropwizard) metrics into statsd
)
Expand Down Expand Up @@ -214,7 +218,7 @@ lazy val `ts-reaktive-marshal-xerces` = project
.settings(javaSettings: _*)
.settings(
libraryDependencies ++= Seq(
"xerces" % "xercesImpl" % "2.11.0"
"xerces" % "xercesImpl" % "2.11.0"
)
)
.dependsOn(`ts-reaktive-marshal-akka`, `ts-reaktive-testkit` % "test", `ts-reaktive-testkit-assertj` % "test")
Expand Down Expand Up @@ -243,7 +247,7 @@ lazy val `ts-reaktive-marshal-scala` = project
lazy val `ts-reaktive-xsd` = project
.settings(projectSettings: _*)
.settings(libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % "3.0.1" % "test"
"org.scalatest" %% "scalatest" % "3.2.11" % "test"
))
.dependsOn(
`ts-reaktive-marshal-scala`,
Expand All @@ -258,7 +262,7 @@ lazy val `ts-reaktive-actors` = project
.settings(kamonSettings: _*)
.settings(
// the .proto files of this project are supposed to be included by others, so they're added to the .jar
unmanagedResourceDirectories in Compile += (sourceDirectory in ProtobufConfig).value
Compile / unmanagedResourceDirectories+= (ProtobufConfig / sourceDirectory).value
)
.dependsOn(`ts-reaktive-java`, `ts-reaktive-akka`, `ts-reaktive-testkit` % "test")

Expand All @@ -276,7 +280,7 @@ lazy val `ts-reaktive-backup` = project
.settings(javaSettings: _*)
.settings(
libraryDependencies ++= Seq(
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % "0.11"
"com.lightbend.akka" %% "akka-stream-alpakka-s3" % "3.0.4"
)
)
.dependsOn(`ts-reaktive-replication`, `ts-reaktive-actors` % ProtobufConfig.name, `ts-reaktive-marshal-akka`, `ts-reaktive-testkit` % "test")
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.0.2
sbt.version=1.5.1
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected MaterializerActor(Map<String, String> additionalMetricTags) {
public void postRestart(Throwable reason) throws Exception {
super.postRestart(reason);
metrics.getRestarts().increment();
metrics.getStreams().set(0);
metrics.getStreams().update(0);
}

@Override
Expand Down Expand Up @@ -345,7 +345,7 @@ private void reimport(Set<String> entityIds) {
.conflate((t1, t2) -> (t1.isAfter(t2)) ? t1 : t2)
.throttle(1, Duration.ofSeconds(1))
.map(t -> {
metrics.getReimportRemaining().set(ChronoUnit.MILLIS.between(t, maxTimestamp));
metrics.getReimportRemaining().update(ChronoUnit.MILLIS.between(t, maxTimestamp));
return Done.getInstance();
})
.toMat(Sink.onComplete(result -> self.tell("reimportComplete", self())), Keep.left())
Expand Down Expand Up @@ -375,13 +375,13 @@ private void cancelReimport() {

private void recordOffsetMetric() {
Seq<UUID> ids = workers.getIds();
metrics.getWorkers().set(workers.getIds().size());
metrics.getWorkers().update(workers.getIds().size());
for (int i = 0; i < ids.size(); i++) {
Instant offset = workers.getTimestamp(ids.apply(i));
metrics.getOffset(i).set(offset.toEpochMilli());
metrics.getDelay(i).set(Duration.between(offset, Instant.now()).toMillis());
metrics.getOffset(i).update(offset.toEpochMilli());
metrics.getDelay(i).update(Duration.between(offset, Instant.now()).toMillis());
for (Instant end: workers.getEndTimestamp(workers.getIds().apply(i))) {
metrics.getRemaining(i).set(Duration.between(offset, end).toMillis());
metrics.getRemaining(i).update(Duration.between(offset, end).toMillis());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,67 +1,68 @@
package com.tradeshift.reaktive.materialize;

import io.vavr.collection.Map;
import java.util.HashMap;
import java.util.Map;

import kamon.Kamon;
import kamon.metric.Counter;
import kamon.metric.CounterMetric;
import kamon.metric.Gauge;
import kamon.metric.GaugeMetric;
import kamon.metric.Histogram;
import kamon.metric.HistogramMetric;
import kamon.metric.MeasurementUnit;
import kamon.tag.TagSet;

public class MaterializerMetrics {
private final Map<String, String> baseTags;
private final CounterMetric events;
private final Map<String, Object> baseTags;
private final Counter events;
private final Counter restarts;
private final Gauge reimportRemaining;
/** The current timestamp for each worker, in milliseconds since the epoch */
private final GaugeMetric offset;
private final Gauge offset;
/** The delay between the timestamp of each worker and now(), in milliseconds */
private final GaugeMetric delay;
private final Gauge delay;
/** The time remaining for each worker, in milliseconds (if there is an end timestamp) */
private final GaugeMetric remaining;
private final Gauge remaining;
/** The duration, milliseconds, of materializing a single event */
private final HistogramMetric materializationDuration;
private final Histogram materializationDuration;
private final Gauge workers;
private final Gauge streams;

public MaterializerMetrics(String name, Map<String, String> additionalTags) {
baseTags = additionalTags.put("journal-materializer", name);
java.util.Map<String, String> tags = baseTags.toJavaMap();
this.events = Kamon.counter("journal-materializer.events");
this.restarts = Kamon.counter("journal-materializer.restarts").refine(tags);
this.reimportRemaining = Kamon.gauge("journal-materializer.reimport-remaining", MeasurementUnit.time().milliseconds()).refine(tags);
this.offset = Kamon.gauge("journal-materializer.offset", MeasurementUnit.time().milliseconds());
this.delay = Kamon.gauge("journal-materializer.delay", MeasurementUnit.time().milliseconds());
this.remaining = Kamon.gauge("journal-materializer.remaining", MeasurementUnit.time().milliseconds());
this.materializationDuration = Kamon.histogram("journal-materializer.materialization-duration", MeasurementUnit.time().milliseconds());
this.workers = Kamon.gauge("journal-materializer.workers").refine(tags);
this.streams = Kamon.gauge("journal-materializer.streams").refine(tags);
public MaterializerMetrics(String name, io.vavr.collection.Map<String, String> additionalTags) {
additionalTags.put("journal-materializer", name);
baseTags = new HashMap<>(additionalTags.toJavaMap());
TagSet tagSet = TagSet.from(baseTags);
this.events = Kamon.counter("journal-materializer.events").withoutTags();
this.restarts = Kamon.counter("journal-materializer.restarts").withTags(tagSet);
this.reimportRemaining = Kamon.gauge("journal-materializer.reimport-remaining", MeasurementUnit.time().milliseconds()).withTags(tagSet);
this.offset = Kamon.gauge("journal-materializer.offset", MeasurementUnit.time().milliseconds()).withoutTags();
this.delay = Kamon.gauge("journal-materializer.delay", MeasurementUnit.time().milliseconds()).withoutTags();
this.remaining = Kamon.gauge("journal-materializer.remaining", MeasurementUnit.time().milliseconds()).withoutTags();
this.materializationDuration = Kamon.histogram("journal-materializer.materialization-duration", MeasurementUnit.time().milliseconds()).withoutTags();
this.workers = Kamon.gauge("journal-materializer.workers").withTags(tagSet);
this.streams = Kamon.gauge("journal-materializer.streams").withTags(tagSet);
}

public Counter getEvents(int index) {
return events.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
return events.withTags(TagSet.from(baseTags).withTag("index", String.valueOf(index)));
}

public Counter getRestarts() {
return restarts;
}

public Gauge getOffset(int index) {
return offset.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
return offset.withTags(TagSet.from(baseTags).withTag("index", String.valueOf(index)));
}

public Gauge getDelay(int index) {
return delay.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
return delay.withTags(TagSet.from(baseTags).withTag("index", String.valueOf(index)));
}

public Gauge getRemaining(int index) {
return remaining.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
return remaining.withTags(TagSet.from(baseTags).withTag("index", String.valueOf(index)));
}

public Histogram getMaterializationDuration(int index) {
return materializationDuration.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
return materializationDuration.withTags(TagSet.from(baseTags).withTag("index", String.valueOf(index)));
}

public Gauge getReimportRemaining() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public FlowShape<ByteString, ByteString> shape() {
@Override
public GraphStageLogic createLogic(Attributes attr) {
return new GraphStageLogic(shape) {
ByteString buf = ByteString.empty();
ByteString buf = ByteString.emptyByteString();
List<ByteString> deframed = new ArrayList<>();
{
setHandler(in, new AbstractInHandler() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ object AsyncUnmarshallers {

def entityToStream(): Unmarshaller[HttpEntity, Source[ByteString, NotUsed]] =
Unmarshaller.sync(new java.util.function.Function[HttpEntity, Source[ByteString, NotUsed]] {
override def apply(entity: HttpEntity) = entity.getDataBytes().asScala.mapMaterializedValue(obj => NotUsed).asJava
override def apply(entity: HttpEntity) = entity.getDataBytes().asScala.mapMaterializedValue(obj => NotUsed.notUsed()).asJava
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import java.util.concurrent.TimeUnit;

import akka.actor.ActorSystem;
import akka.persistence.cassandra.CassandraMetricsRegistry;

import akka.stream.alpakka.cassandra.CassandraMetricsRegistry;
import com.codahale.metrics.MetricRegistry;
import com.readytalk.metrics.StatsDReporter;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,50 +1,46 @@
package com.tradeshift.reaktive.cassandra;

import static com.tradeshift.reaktive.ListenableFutures.toJava;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.Statement;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.persistence.cassandra.ConfigSessionProvider;
import akka.persistence.cassandra.session.CassandraSessionSettings;
import akka.stream.alpakka.cassandra.DefaultSessionProvider;
import akka.stream.javadsl.Source;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.Runnables;
import io.vavr.collection.Seq;
import io.vavr.control.Option;
/**
* Provides asynchronous, non-blocking access to a cassandra session.
*/
public class CassandraSession {
private final akka.persistence.cassandra.session.javadsl.CassandraSession delegate;
private final akka.stream.alpakka.cassandra.javadsl.CassandraSession delegate;

public CassandraSession(ActorSystem system, String metricsCategory, Function<Session,CompletionStage<Done>> init) {
this.delegate = new akka.persistence.cassandra.session.javadsl.CassandraSession(system,
new ConfigSessionProvider(system, system.settings().config().getConfig("cassandra-journal")),
new CassandraSessionSettings(system.settings().config().getConfig("cassandra-journal")),
system.dispatcher(), system.log(), metricsCategory, init);
public CassandraSession(ActorSystem system, String metricsCategory, Function<CqlSession,CompletionStage<Done>> init) {
this.delegate = new akka.stream.alpakka.cassandra.javadsl.CassandraSession(system,
new DefaultSessionProvider(system, system.settings().config().getConfig("cassandra-journal")),
system.dispatcher(), system.log(), metricsCategory, init, Runnables.doNothing());
}

public CassandraSession(ActorSystem system, String metricsCategory, Seq<String> initializationStatements) {
this(system, metricsCategory, s -> executeInitStatements(s, initializationStatements));
}

public static CompletionStage<Done> executeInitStatements(Session session, Seq<String> statements) {
public static CompletionStage<Done> executeInitStatements(CqlSession session, Seq<String> statements) {
if (statements.isEmpty()) {
return CompletableFuture.completedFuture(Done.getInstance());
}

return toJava(session.executeAsync(statements.head())).thenCompose(rs -> executeInitStatements(session, statements.tail()));
return session.executeAsync(statements.head()).thenCompose(rs -> executeInitStatements(session, statements.tail()));
}

public CompletionStage<Session> getUnderlying() {
public CompletionStage<CqlSession> getUnderlying() {
return delegate.underlying();
}

Expand Down
Loading