Skip to content

Commit

Permalink
Merge branch 'main' into update/akka-actor-2.6.15
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Czuchnowski authored Jun 21, 2021
2 parents 52413b1 + 874e08b commit 7eb4660
Show file tree
Hide file tree
Showing 25 changed files with 1,385 additions and 513 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ trait InstrumentModuleFactory {
protected def supportedModules: SupportedModules

protected def instrument(tpe: Type): TypeInstrumentation = TypeInstrumentation(TypeTarget(tpe, supportedModules))

}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ class AkkaActorAgentTest
val check: classic.ActorContext => Any = ctx => {
val metrics = ActorCellDecorator.get(ctx).flatMap(_.mailboxTimeAgg.metrics).value
metrics.count should be(messages)
metrics.avg should be(((waitingMessages * idle.toNanos) / messages) +- ToleranceNanos)
metrics.sum should be((waitingMessages * idle.toNanos) +- ToleranceNanos)
metrics.min should be(0L +- ToleranceNanos)
metrics.max should be(idle.toNanos +- ToleranceNanos)
Expand All @@ -141,7 +140,6 @@ class AkkaActorAgentTest
val check: classic.ActorContext => Any = ctx => {
val metrics = ActorCellDecorator.get(ctx).flatMap(_.processingTimeAgg.metrics).value
metrics.count should be(messages)
metrics.avg should be(((workingMessages * processing.toNanos) / messages) +- ToleranceNanos)
metrics.sum should be((workingMessages * processing.toNanos) +- ToleranceNanos)
metrics.min should be(0L +- ToleranceNanos)
metrics.max should be(processing.toNanos +- ToleranceNanos)
Expand Down Expand Up @@ -214,7 +212,6 @@ class AkkaActorAgentTest
expectStashSize(StashMessageCount * 3)
}

// it should "record the amount of received messages" in testWithContextAndActor[String](_ => Behaviors.ignore) {
it should "record the amount of received messages" in {
testWithoutEffect[Unit]((), (), ())(
(0, check(_.receivedMessages)(_.get() should be(0))),
Expand All @@ -224,7 +221,6 @@ class AkkaActorAgentTest
)
}

// it should "record the amount of failed messages without supervision" in testWithContextAndActor[String](_ =>
it should "record the amount of failed messages without supervision" in {

testEffect[String](
Expand Down Expand Up @@ -317,7 +313,6 @@ class AkkaActorAgentTest
receiver ! PoisonPill
}

// it should "record the amount of sent messages properly in typed akka" in testWithContextAndActor[String] { ctx =>
it should "record the amount of sent messages properly in typed akka" in {

testWithChecks(
Expand Down
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import sbt.Package.{ MainClass, ManifestAttributes }

inThisBuild(
List(
scalaVersion := "2.13.4",
scalaVersion := "2.13.6",
organization := "io.scalac",
homepage := Some(url("https://github.com/ScalaConsultants/mesmer-akka-agent")),
licenses := List("Apache-2.0" -> url("http://www.apache.org/licenses/LICENSE-2.0")),
Expand All @@ -23,7 +23,6 @@ inThisBuild(
),
scalacOptions ++= Seq("-deprecation", "-feature"),
semanticdbEnabled := true,
semanticdbVersion := scalafixSemanticdb.revision,
scalacOptions += "-Wunused:imports",
scalafixDependencies += "com.github.liancheng" %% "organize-imports" % "0.5.0",
scalafixScalaBinaryVersion := "2.13"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ package object model {
* Command signalling that actor should send accumulated metrics in reply
*/
private[scalac] case object PushMetrics

}
23 changes: 23 additions & 0 deletions core/src/main/scala/io/scalac/mesmer/core/akka/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.scalac.mesmer.core

import scala.math.PartialOrdering

import io.scalac.mesmer.core.model.ActorPath

package object akka {

implicit val actorPathPartialOrdering: PartialOrdering[ActorPath] = new PartialOrdering[ActorPath] {
def tryCompare(x: ActorPath, y: ActorPath): Option[Int] =
(x, y) match {
case (xPath, yPath) if xPath == yPath => Some(0)
case (xPath, yPath) if xPath.startsWith(yPath) => Some(1)
case (xPath, yPath) if yPath.startsWith(xPath) => Some(-1)
case _ => None
}

def lteq(x: ActorPath, y: ActorPath): Boolean = actorLevel(x) <= actorLevel(y)

private def actorLevel(path: ActorPath): Int = path.count(_ == '/')
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ private[scalac] final case class ActorRefDetails(
ref: classic.ActorRef,
tags: Set[Tag],
configuration: ActorConfiguration
)
) {
def withTag(tag: Tag): ActorRefDetails = copy(tags = tags + tag)
}
9 changes: 7 additions & 2 deletions core/src/main/scala/io/scalac/mesmer/core/model/Tag.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ sealed trait Tag extends Any {
}

object Tag {
val stream: Tag = StreamTag
val all: Tag = All
val stream: Tag = StreamTag
val terminated: Tag = TerminatedTag
val all: Tag = All

private case object All extends Tag {
override def serialize: Seq[(String, String)] = Seq.empty
Expand All @@ -23,6 +24,10 @@ object Tag {
lazy val serialize: Seq[(String, String)] = Seq(("stream", "true"))
}

private case object TerminatedTag extends Tag {
lazy val serialize: Seq[(String, String)] = Seq.empty
}

sealed trait StageName extends Any with Tag {
def name: String
def nameOnly: StageName
Expand Down
37 changes: 23 additions & 14 deletions core/src/main/scala/io/scalac/mesmer/core/util/AggMetric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package io.scalac.mesmer.core.util

import io.scalac.mesmer.core.util.AggMetric.LongValueAggMetric.fromTimeSeries

sealed trait AggMetric[@specialized(Long) T, @specialized(Long) Avg] {
sealed trait AggMetric[@specialized(Long) T] {
def min: T
def max: T
def sum: T
def avg: Avg
def count: Int
}

Expand All @@ -20,32 +19,42 @@ object AggMetric {
* @param sum
* @param count
*/
final case class LongValueAggMetric(min: Long, max: Long, avg: Long, sum: Long, count: Int)
extends AggMetric[Long, Long] {

def combine(timeSeries: TimeSeries[Long, Long]): LongValueAggMetric =
combine(fromTimeSeries(timeSeries))

def combine(other: LongValueAggMetric): LongValueAggMetric = {
final case class LongValueAggMetric(min: Long, max: Long, sum: Long, count: Int) extends AggMetric[Long] {

def sum(timeSeries: TimeSeries[Long, Long]): LongValueAggMetric =
sum(fromTimeSeries(timeSeries))

/**
* Sums all monotonically increasing values from this and other aggregation and
* compute values for min and max
* @param other
* @return
*/
def sum(other: LongValueAggMetric): LongValueAggMetric = {
val count = this.count + other.count
val sum = this.sum + other.sum
val avg = if (count == 0) 0L else Math.floorDiv(sum, count)

LongValueAggMetric(
min = if (this.min < other.min) this.min else other.min,
max = if (this.max > other.min) this.max else other.max,
avg = avg,
max = if (this.max > other.max) this.max else other.max,
sum = sum,
count = count
)
}

def sum(next: LongValueAggMetric): LongValueAggMetric =
/**
* Adds this aggregation monotonically increasing counters to other
* and leave it's min and max untouched
* @param next aggregations which min and max will be preserved
* @return
*/
def addTo(next: LongValueAggMetric): LongValueAggMetric =
next.copy(sum = next.sum + this.sum, count = next.count + this.count)
}

final object LongValueAggMetric {
def fromTimeSeries(ts: TimeSeries[Long, Long]): LongValueAggMetric =
LongValueAggMetric(ts.min, ts.max, ts.avg, ts.sum, ts.count)
LongValueAggMetric(ts.min, ts.max, ts.sum, ts.count)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ final class LongNoLockAggregator(val maxSize: Int = 100, val compactionRemaining
val ts = new LongTimeSeries(listBuffer.toSeq)
aggRef
.get()
.fold(aggRef.set(Some(LongValueAggMetric.fromTimeSeries(ts))))(agg => aggRef.set(Some(agg.combine(ts))))
.fold(aggRef.set(Some(LongValueAggMetric.fromTimeSeries(ts))))(agg => aggRef.set(Some(agg.sum(ts))))
true
} finally reentrantLock.unlock()
} else false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class LongNoLockAggregatorTest extends AsyncFlatSpec with Matchers {
result.sum should be((for (i <- 1 to ps) yield ns * i * factor).sum)
result.min should be(factor)
result.max should be(ps * factor)
result.avg should be(math.round(((ps + 1) / 2.0) * factor))
}
}

Expand Down
Loading

0 comments on commit 7eb4660

Please sign in to comment.