Skip to content

Commit

Permalink
Merge pull request #141 from ScalaConsultants/fix_aggregation
Browse files Browse the repository at this point in the history
Fix aggregation
  • Loading branch information
Jakub Czuchnowski authored Jun 21, 2021
2 parents 0db667d + bb856a8 commit 874e08b
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 195 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ class AkkaActorAgentTest
val check: classic.ActorContext => Any = ctx => {
val metrics = ActorCellDecorator.get(ctx).flatMap(_.mailboxTimeAgg.metrics).value
metrics.count should be(messages)
metrics.avg should be(((waitingMessages * idle.toNanos) / messages) +- ToleranceNanos)
metrics.sum should be((waitingMessages * idle.toNanos) +- ToleranceNanos)
metrics.min should be(0L +- ToleranceNanos)
metrics.max should be(idle.toNanos +- ToleranceNanos)
Expand All @@ -141,7 +140,6 @@ class AkkaActorAgentTest
val check: classic.ActorContext => Any = ctx => {
val metrics = ActorCellDecorator.get(ctx).flatMap(_.processingTimeAgg.metrics).value
metrics.count should be(messages)
metrics.avg should be(((workingMessages * processing.toNanos) / messages) +- ToleranceNanos)
metrics.sum should be((workingMessages * processing.toNanos) +- ToleranceNanos)
metrics.min should be(0L +- ToleranceNanos)
metrics.max should be(processing.toNanos +- ToleranceNanos)
Expand Down Expand Up @@ -214,7 +212,6 @@ class AkkaActorAgentTest
expectStashSize(StashMessageCount * 3)
}

// it should "record the amount of received messages" in testWithContextAndActor[String](_ => Behaviors.ignore) {
it should "record the amount of received messages" in {
testWithoutEffect[Unit]((), (), ())(
(0, check(_.receivedMessages)(_.get() should be(0))),
Expand All @@ -224,7 +221,6 @@ class AkkaActorAgentTest
)
}

// it should "record the amount of failed messages without supervision" in testWithContextAndActor[String](_ =>
it should "record the amount of failed messages without supervision" in {

testEffect[String](
Expand Down Expand Up @@ -317,7 +313,6 @@ class AkkaActorAgentTest
receiver ! PoisonPill
}

// it should "record the amount of sent messages properly in typed akka" in testWithContextAndActor[String] { ctx =>
it should "record the amount of sent messages properly in typed akka" in {

testWithChecks(
Expand Down
34 changes: 21 additions & 13 deletions core/src/main/scala/io/scalac/mesmer/core/util/AggMetric.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ package io.scalac.mesmer.core.util

import io.scalac.mesmer.core.util.AggMetric.LongValueAggMetric.fromTimeSeries

sealed trait AggMetric[@specialized(Long) T, @specialized(Long) Avg] {
sealed trait AggMetric[@specialized(Long) T] {
def min: T
def max: T
def sum: T
def avg: Avg
def count: Int
}

Expand All @@ -20,33 +19,42 @@ object AggMetric {
* @param sum
* @param count
*/
final case class LongValueAggMetric(min: Long, max: Long, avg: Long, sum: Long, count: Int)
extends AggMetric[Long, Long] {

def combine(timeSeries: TimeSeries[Long, Long]): LongValueAggMetric =
combine(fromTimeSeries(timeSeries))

def combine(other: LongValueAggMetric): LongValueAggMetric = {
final case class LongValueAggMetric(min: Long, max: Long, sum: Long, count: Int) extends AggMetric[Long] {

def sum(timeSeries: TimeSeries[Long, Long]): LongValueAggMetric =
sum(fromTimeSeries(timeSeries))

/**
* Sums all monotonically increasing values from this and other aggregation and
* compute values for min and max
* @param other
* @return
*/
def sum(other: LongValueAggMetric): LongValueAggMetric = {
val count = this.count + other.count
val sum = this.sum + other.sum
val avg = if (count == 0) 0L else Math.floorDiv(sum, count)

LongValueAggMetric(
min = if (this.min < other.min) this.min else other.min,
max = if (this.max > other.max) this.max else other.max,
avg = avg,
sum = sum,
count = count
)
}

def sum(next: LongValueAggMetric): LongValueAggMetric =
/**
* Adds this aggregation monotonically increasing counters to other
* and leave it's min and max untouched
* @param next aggregations which min and max will be preserved
* @return
*/
def addTo(next: LongValueAggMetric): LongValueAggMetric =
next.copy(sum = next.sum + this.sum, count = next.count + this.count)
}

final object LongValueAggMetric {
def fromTimeSeries(ts: TimeSeries[Long, Long]): LongValueAggMetric =
LongValueAggMetric(ts.min, ts.max, ts.avg, ts.sum, ts.count)
LongValueAggMetric(ts.min, ts.max, ts.sum, ts.count)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ final class LongNoLockAggregator(val maxSize: Int = 100, val compactionRemaining
val ts = new LongTimeSeries(listBuffer.toSeq)
aggRef
.get()
.fold(aggRef.set(Some(LongValueAggMetric.fromTimeSeries(ts))))(agg => aggRef.set(Some(agg.combine(ts))))
.fold(aggRef.set(Some(LongValueAggMetric.fromTimeSeries(ts))))(agg => aggRef.set(Some(agg.sum(ts))))
true
} finally reentrantLock.unlock()
} else false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ class LongNoLockAggregatorTest extends AsyncFlatSpec with Matchers {
result.sum should be((for (i <- 1 to ps) yield ns * i * factor).sum)
result.min should be(factor)
result.max should be(ps * factor)
result.avg should be(math.round(((ps + 1) / 2.0) * factor))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.scalac.mesmer.extension

import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

import akka.Done
Expand All @@ -24,15 +25,13 @@ import io.scalac.mesmer.core.model.Tag
import io.scalac.mesmer.core.util.ActorCellOps
import io.scalac.mesmer.core.util.ActorPathOps
import io.scalac.mesmer.core.util.ActorRefOps
import io.scalac.mesmer.core.util.Timestamp
import io.scalac.mesmer.extension.ActorEventsMonitorActor._
import io.scalac.mesmer.extension.actor.ActorCellDecorator
import io.scalac.mesmer.extension.actor.ActorMetrics
import io.scalac.mesmer.extension.actor.MetricStorageFactory
import io.scalac.mesmer.extension.metric.ActorMetricsMonitor
import io.scalac.mesmer.extension.metric.ActorMetricsMonitor.Labels
import io.scalac.mesmer.extension.metric.MetricObserver.Result
import io.scalac.mesmer.extension.metric.SyncWith
import io.scalac.mesmer.extension.service.ActorTreeService
import io.scalac.mesmer.extension.service.ActorTreeService.Command.GetActorTree
import io.scalac.mesmer.extension.service.ActorTreeService.Command.TagSubscribe
Expand Down Expand Up @@ -62,8 +61,7 @@ object ActorEventsMonitorActor {
node: Option[Node],
pingOffset: FiniteDuration,
storageFactory: MetricStorageFactory[ActorKey],
actorMetricsReader: ActorMetricsReader = ReflectiveActorMetricsReader,
timestampFactory: () => Timestamp
actorMetricsReader: ActorMetricsReader = ReflectiveActorMetricsReader
)(implicit askTimeout: Timeout): Behavior[Command] =
Behaviors.setup[Command] { ctx =>
GenericBehaviors
Expand All @@ -76,8 +74,7 @@ object ActorEventsMonitorActor {
pingOffset,
storageFactory,
scheduler,
actorMetricsReader,
timestampFactory
actorMetricsReader
).start(ref, loop = true)
}
}
Expand Down Expand Up @@ -131,8 +128,7 @@ private[extension] class ActorEventsMonitorActor private[extension] (
pingOffset: FiniteDuration,
storageFactory: MetricStorageFactory[ActorKey],
scheduler: TimerScheduler[Command],
actorMetricsReader: ActorMetricsReader = ReflectiveActorMetricsReader,
timestampFactory: () => Timestamp
actorMetricsReader: ActorMetricsReader = ReflectiveActorMetricsReader
)(implicit val askTimeout: Timeout) {

import context._
Expand All @@ -147,8 +143,7 @@ private[extension] class ActorEventsMonitorActor private[extension] (

private[this] val treeSnapshot = new AtomicReference[Option[Vector[(Labels, ActorMetrics)]]](None)

@volatile
private var lastCollectionTimestamp: Timestamp = timestampFactory()
private[this] val exported = new AtomicBoolean(false)

private def updateMetric(extractor: ActorMetrics => Option[Long])(result: Result[Long, Labels]): Unit = {
val state = treeSnapshot.get()
Expand All @@ -162,25 +157,22 @@ private[extension] class ActorEventsMonitorActor private[extension] (
private def registerUpdaters(): Unit = {

import boundMonitor._
SyncWith()
.`with`(mailboxSize)(updateMetric(_.mailboxSize))
.`with`(failedMessages)(updateMetric(_.failedMessages))
.`with`(processedMessages)(updateMetric(_.processedMessages))
.`with`(receivedMessages)(updateMetric(_.receivedMessages))
.`with`(mailboxTimeAvg)(updateMetric(_.mailboxTime.map(_.avg)))
.`with`(mailboxTimeMax)(updateMetric(_.mailboxTime.map(_.max)))
.`with`(mailboxTimeMin)(updateMetric(_.mailboxTime.map(_.min)))
.`with`(mailboxTimeSum)(updateMetric(_.mailboxTime.map(_.sum)))
.`with`(processingTimeAvg)(updateMetric(_.processingTime.map(_.avg)))
.`with`(processingTimeMin)(updateMetric(_.processingTime.map(_.min)))
.`with`(processingTimeMax)(updateMetric(_.processingTime.map(_.max)))
.`with`(processingTimeSum)(updateMetric(_.processingTime.map(_.sum)))
.`with`(sentMessages)(updateMetric(_.sentMessages))
.`with`(stashSize)(updateMetric(_.stashSize))
.`with`(droppedMessages)(updateMetric(_.droppedMessages))
.afterAll {
lastCollectionTimestamp = timestampFactory()
}

mailboxSize.setUpdater(updateMetric(_.mailboxSize))
failedMessages.setUpdater(updateMetric(_.failedMessages))
processedMessages.setUpdater(updateMetric(_.processedMessages))
receivedMessages.setUpdater(updateMetric(_.receivedMessages))
mailboxTimeCount.setUpdater(updateMetric(_.mailboxTime.map(_.count)))
mailboxTimeMax.setUpdater(updateMetric(_.mailboxTime.map(_.max)))
mailboxTimeMin.setUpdater(updateMetric(_.mailboxTime.map(_.min)))
mailboxTimeSum.setUpdater(updateMetric(_.mailboxTime.map(_.sum)))
processingTimeCount.setUpdater(updateMetric(_.processingTime.map(_.count)))
processingTimeMin.setUpdater(updateMetric(_.processingTime.map(_.min)))
processingTimeMax.setUpdater(updateMetric(_.processingTime.map(_.max)))
processingTimeSum.setUpdater(updateMetric(_.processingTime.map(_.sum)))
sentMessages.setUpdater(updateMetric(_.sentMessages))
stashSize.setUpdater(updateMetric(_.stashSize))
droppedMessages.setUpdater(updateMetric(_.droppedMessages))

}

Expand Down Expand Up @@ -268,12 +260,13 @@ private[extension] class ActorEventsMonitorActor private[extension] (
log.debug("Capturing current actor tree state")

val currentSnapshot = treeSnapshot.get().getOrElse(Vector.empty)

val metrics = storage.iterable.map { case (key, metrics) =>
currentSnapshot.find { case (labels, _) =>
labels.actorPath == key
}.fold((Labels(key, node), metrics)) { case (labels, existingMetrics) =>
(labels, existingMetrics.combine(metrics))
}
currentSnapshot
.find(_._1.actorPath == key) // finds if metric already exists
.fold((Labels(key, node), metrics)) { case (labels, existingMetrics) =>
(labels, existingMetrics.addTo(metrics))
}
}.toVector

treeSnapshot.set(Some(metrics))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import io.scalac.mesmer.core.model._
import io.scalac.mesmer.core.support.ModulesSupport
import io.scalac.mesmer.core.util.ModuleInfo
import io.scalac.mesmer.core.util.ModuleInfo.Modules
import io.scalac.mesmer.core.util.Timestamp
import io.scalac.mesmer.extension.ActorEventsMonitorActor.ReflectiveActorMetricsReader
import io.scalac.mesmer.extension.actor.MutableActorMetricStorageFactory
import io.scalac.mesmer.extension.config.AkkaMonitoringConfig
Expand Down Expand Up @@ -180,8 +179,7 @@ final class AkkaMonitoring(private val system: ActorSystem[_], val config: AkkaM
clusterNodeName,
ExportInterval,
new MutableActorMetricStorageFactory,
ReflectiveActorMetricsReader,
() => Timestamp.create()
ReflectiveActorMetricsReader
)
)
.onFailure(SupervisorStrategy.restart),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,31 @@ final case class ActorMetrics(
unhandled <- unhandledMessages
} yield received - unhandled

def combine(other: ActorMetrics): ActorMetrics = ActorMetrics(
/**
* Adds this ActorMetrics monotonically increasing counter to other.
* This will leave other metrics untouched - aggregations, gauges etc
* @param other
* @return
*/
def addTo(other: ActorMetrics): ActorMetrics = ActorMetrics(
mailboxSize = combineLong(mailboxSize, other.mailboxSize),
mailboxTime = addToAggregation(mailboxTime, other.mailboxTime),
receivedMessages = combineLong(receivedMessages, other.receivedMessages),
unhandledMessages = combineLong(unhandledMessages, other.unhandledMessages),
failedMessages = combineLong(failedMessages, other.failedMessages),
processingTime = addToAggregation(processingTime, other.processingTime),
sentMessages = combineLong(sentMessages, other.sentMessages),
stashSize = combineLong(stashSize, other.stashSize),
droppedMessages = combineLong(droppedMessages, other.droppedMessages)
)

/**
* Sums this ActorMetrics with other - this means than all counters - monotonically and nonmonotinically increasing
* are add up. Max and min are computed normally.
* @param other ActorMetrics to be summed up with this one
* @return
*/
def sum(other: ActorMetrics): ActorMetrics = ActorMetrics(
mailboxSize = combineLong(mailboxSize, other.mailboxSize),
mailboxTime = combineAggregation(mailboxTime, other.mailboxTime),
receivedMessages = combineLong(receivedMessages, other.receivedMessages),
Expand All @@ -37,7 +61,12 @@ final case class ActorMetrics(
private def combineAggregation(
first: Option[LongValueAggMetric],
second: Option[LongValueAggMetric]
): Option[LongValueAggMetric] = combineOption(first, second)(_.combine(_))
): Option[LongValueAggMetric] = combineOption(first, second)(_.sum(_))

private def addToAggregation(
first: Option[LongValueAggMetric],
second: Option[LongValueAggMetric]
): Option[LongValueAggMetric] = combineOption(first, second)(_.addTo(_))

private def combineOption[T](first: Option[T], second: Option[T])(combine: (T, T) => T): Option[T] =
(first, second) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final class MutableActorMetricStorageFactory[K] extends MetricStorageFactory[K]
def iterable: Iterable[(K, ActorMetrics)] = persistentBuffer.toVector

def compute(key: K): this.type = {
val result = buffer.values.fold(ActorMetrics.empty)(_.combine(_))
val result = buffer.values.fold(ActorMetrics.empty)(_.sum(_))
buffer.clear()
save(key, result, true)
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ object ActorMetricsMonitor {
trait BoundMonitor extends Bound {
def mailboxSize: MetricObserver[Long, Labels]
// TODO Create an abstraction to aggregate multiple metrics (e.g: mailboxTimeAgg: MetricObserverAgg[Long])
def mailboxTimeAvg: MetricObserver[Long, Labels]
def mailboxTimeMin: MetricObserver[Long, Labels]
def mailboxTimeMax: MetricObserver[Long, Labels]
def mailboxTimeSum: MetricObserver[Long, Labels]
def mailboxTimeCount: MetricObserver[Long, Labels]
def stashSize: MetricObserver[Long, Labels]
def receivedMessages: MetricObserver[Long, Labels]
def processedMessages: MetricObserver[Long, Labels]
def failedMessages: MetricObserver[Long, Labels]
def processingTimeAvg: MetricObserver[Long, Labels]
def processingTimeMin: MetricObserver[Long, Labels]
def processingTimeMax: MetricObserver[Long, Labels]
def processingTimeSum: MetricObserver[Long, Labels]
def processingTimeCount: MetricObserver[Long, Labels]
def sentMessages: MetricObserver[Long, Labels]
def droppedMessages: MetricObserver[Long, Labels]

Expand Down
Loading

0 comments on commit 874e08b

Please sign in to comment.