Skip to content

Commit

Permalink
Merge pull request #233 from ScalaConsultants/ot-update-1.7
Browse files Browse the repository at this point in the history
Bump up opentelemetry metrics to 1.7
  • Loading branch information
Jakub Czuchnowski authored Dec 16, 2021
2 parents 8d7511c + b9bb955 commit 0b9207f
Show file tree
Hide file tree
Showing 68 changed files with 770 additions and 725 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class ProxiedQueue {
@OnMethodExit
public static void constructor(
@FieldValue(value = queueFieldName, readOnly = false) BlockingQueue<Envelope> queue,
@This BlockingQueue<Envelope> self
@This BlockingQueue<Envelope> self
) {
queue = new BoundedQueueProxy<>(self);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package io.scalac.mesmer.core

import io.scalac.mesmer.core.model.RawAttributes

trait AttributeSerializer[T] extends (T => RawAttributes) {
final def apply(value: T): RawAttributes = serialize(value)
def serialize(value: T): RawAttributes
}

object AttributeSerializer {
implicit def fromAttributesSerializable[T <: AttributesSerializable]: AttributeSerializer[T] = _.serialize
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.scalac.mesmer.core

import io.scalac.mesmer.core.model.RawAttributes

trait AttributesSerializable {
def serialize: RawAttributes
}

This file was deleted.

12 changes: 0 additions & 12 deletions core/src/main/scala/io/scalac/mesmer/core/LabelSerializer.scala

This file was deleted.

10 changes: 5 additions & 5 deletions core/src/main/scala/io/scalac/mesmer/core/model/Tag.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package io.scalac.mesmer.core.model

import io.scalac.mesmer.core.model.Tag.StreamName.StreamNameLabel
import io.scalac.mesmer.core.model.Tag.StreamName.StreamNameAttributeKey

sealed trait Tag extends Any {
def serialize: Seq[(String, String)]

override def toString: String =
this.serialize.map { case (label, value) =>
s"$label -> $value"
this.serialize.map { case (attributeKey, attributeValue) =>
s"$attributeKey -> $attributeValue"
}.mkString("[", ", ", "]")
}

Expand Down Expand Up @@ -75,7 +75,7 @@ object Tag {
sealed trait StreamName extends Any with Tag {
def name: String

def serialize: Seq[(String, String)] = Seq(StreamNameLabel -> name)
def serialize: Seq[(String, String)] = Seq(StreamNameAttributeKey -> name)
}

object StreamName {
Expand All @@ -84,7 +84,7 @@ object Tag {
def apply(matName: StreamName, terminalStage: StageName): StreamName =
TerminalOperatorStreamName(matName, terminalStage)

private final val StreamNameLabel = "stream_name"
private final val StreamNameAttributeKey = "stream_name"

final class StreamNameImpl(val name: String) extends AnyVal with StreamName

Expand Down
29 changes: 16 additions & 13 deletions core/src/main/scala/io/scalac/mesmer/core/model/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ package object model {
type PersistenceId = String @@ PersistenceIdTag
type ActorPath = String @@ ActorPathTag
type ActorKey = ActorPath
type RawLabels = Seq[(String, String)]

implicit val nodeLabelSerializer: LabelSerializer[Node] = node => Seq("node" -> node.unwrap)
implicit val interfaceLabelSerializer: LabelSerializer[Interface] = interface => Seq("interface" -> interface.unwrap)
implicit val portLabelSerializer: LabelSerializer[Port] = port => Seq("port" -> port.unwrap.toString)
implicit val regionLabelSerializer: LabelSerializer[Region] = region => Seq("region" -> region.unwrap)
implicit val pathLabelSerializer: LabelSerializer[Path] = path => Seq("path" -> path.unwrap)
implicit val statusLabelSerializer: LabelSerializer[Status] = status =>
type RawAttributes = Seq[(String, String)]

implicit val nodeAttributeSerializer: AttributeSerializer[Node] = node => Seq("node" -> node.unwrap)
implicit val interfaceAttributeSerializer: AttributeSerializer[Interface] = interface =>
Seq("interface" -> interface.unwrap)
implicit val portAttributeSerializer: AttributeSerializer[Port] = port => Seq("port" -> port.unwrap.toString)
implicit val regionAttributeSerializer: AttributeSerializer[Region] = region => Seq("region" -> region.unwrap)
implicit val pathAttributeSerializer: AttributeSerializer[Path] = path => Seq("path" -> path.unwrap)
implicit val statusAttributeSerializer: AttributeSerializer[Status] = status =>
Seq("status" -> status.unwrap, "status_group" -> s"${status.charAt(0)}xx")

implicit val methodLabelSerializer: LabelSerializer[Method] = method => Seq("method" -> method.unwrap)
implicit val actorPathLabelSerializer: LabelSerializer[ActorPath] = actorPath => Seq("actor_path" -> actorPath.unwrap)
implicit val persistenceIdLabelSerializer: LabelSerializer[PersistenceId] = persistenceId =>
implicit val methodAttributeSerializer: AttributeSerializer[Method] = method => Seq("method" -> method.unwrap)
implicit val actorPathAttributeSerializer: AttributeSerializer[ActorPath] = actorPath =>
Seq("actor_path" -> actorPath.unwrap)
implicit val persistenceIdAttributeSerializer: AttributeSerializer[PersistenceId] = persistenceId =>
Seq("persistence_id" -> persistenceId.asInstanceOf[String])

/**
Expand Down Expand Up @@ -78,11 +80,12 @@ package object model {
}

implicit class SerializationOps[T](private val tag: T) extends AnyVal {
def serialize(implicit ls: LabelSerializer[T]): RawLabels = ls.serialize(tag)
def serialize(implicit ls: AttributeSerializer[T]): RawAttributes = ls.serialize(tag)
}

implicit class OptionSerializationOps[T](private val optTag: Option[T]) extends AnyVal {
def serialize(implicit ls: LabelSerializer[T]): RawLabels = optTag.fold[RawLabels](Seq.empty)(ls.serialize)
def serialize(implicit ls: AttributeSerializer[T]): RawAttributes =
optTag.fold[RawAttributes](Seq.empty)(ls.serialize)
}

}
2 changes: 0 additions & 2 deletions example/docker/otel-collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ exporters:
prometheus:
endpoint: "0.0.0.0:8889"
namespace: promexample
const_labels:
label1: value1
logging:
loglevel: info

Expand Down
26 changes: 12 additions & 14 deletions example/src/main/scala/example/Boot.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package example

import java.util.Collections

import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
Expand All @@ -19,14 +17,15 @@ import example.domain.AccountStateActor
import example.domain.JsonCodecs
import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter
import io.opentelemetry.sdk.metrics.SdkMeterProvider
import io.opentelemetry.sdk.metrics.export.IntervalMetricReader
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader
import org.slf4j.Logger
import org.slf4j.LoggerFactory

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.io.StdIn
import scala.jdk.CollectionConverters._
import scala.jdk.DurationConverters._
import scala.language.postfixOps
import scala.util.Failure
import scala.util.Success
Expand All @@ -43,17 +42,18 @@ object Boot extends App with FailFastCirceSupport with JsonCodecs {
)
.resolve

def initOpenTelemetryMetrics(exportInterval: Long): IntervalMetricReader = {
val metricExporter: OtlpGrpcMetricExporter = OtlpGrpcMetricExporter.getDefault()
def initOpenTelemetryMetrics(exportInterval: FiniteDuration): Unit = {

val metricExporter: OtlpGrpcMetricExporter = OtlpGrpcMetricExporter.getDefault

val meterProvider: SdkMeterProvider = SdkMeterProvider.builder().buildAndRegisterGlobal()
val factory = PeriodicMetricReader.create(metricExporter, exportInterval.toJava)

IntervalMetricReader
val meterProvider: SdkMeterProvider = SdkMeterProvider
.builder()
.setMetricExporter(metricExporter)
.setMetricProducers(Collections.singleton(meterProvider))
.setExportIntervalMillis(exportInterval)
.buildAndStart()
.registerMetricReader(factory)
.buildAndRegisterGlobal()

sys.addShutdownHook(meterProvider.shutdown())
}

def startUp(local: Boolean): Unit = {
Expand All @@ -74,7 +74,7 @@ object Boot extends App with FailFastCirceSupport with JsonCodecs {
// this is important to initialize metric exporting before actor system when starting mesmer from configuration
// mesmer on initialization gets hook to OTel metricProvider and if it's not initialized before it defaults to noop
logger.info("Starting metric exporter")
val metricReader = initOpenTelemetryMetrics(exportInterval)
initOpenTelemetryMetrics(exportInterval.millis)

implicit val system: ActorSystem[Nothing] =
ActorSystem[Nothing](Behaviors.empty, systemName, config)
Expand Down Expand Up @@ -110,8 +110,6 @@ object Boot extends App with FailFastCirceSupport with JsonCodecs {

StdIn.readLine()

sys.addShutdownHook(metricReader.shutdown())

sys.addShutdownHook {
binding
.flatMap(_.unbind())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.scalac.mesmer.extension.ActorEventsMonitorActor._
import io.scalac.mesmer.extension.actor.ActorMetrics
import io.scalac.mesmer.extension.actor.MetricStorageFactory
import io.scalac.mesmer.extension.metric.ActorMetricsMonitor
import io.scalac.mesmer.extension.metric.ActorMetricsMonitor.Labels
import io.scalac.mesmer.extension.metric.ActorMetricsMonitor.Attributes
import io.scalac.mesmer.extension.metric.MetricObserver.Result
import io.scalac.mesmer.extension.service.ActorTreeService
import io.scalac.mesmer.extension.service.ActorTreeService.Command.GetActorTree
Expand Down Expand Up @@ -140,13 +140,13 @@ private[extension] class ActorEventsMonitorActor private[extension] (

private[this] val boundMonitor = monitor.bind()

private[this] val treeSnapshot = new AtomicReference[Option[Vector[(Labels, ActorMetrics)]]](None)
private[this] val treeSnapshot = new AtomicReference[Option[Vector[(Attributes, ActorMetrics)]]](None)

private def updateMetric(extractor: ActorMetrics => Option[Long])(result: Result[Long, Labels]): Unit = {
private def updateMetric(extractor: ActorMetrics => Option[Long])(result: Result[Long, Attributes]): Unit = {
val state = treeSnapshot.get()
state
.foreach(_.foreach { case (labels, metrics) =>
extractor(metrics).foreach(value => result.observe(value, labels))
.foreach(_.foreach { case (attributes, metrics) =>
extractor(metrics).foreach(value => result.observe(value, attributes))
})
}

Expand Down Expand Up @@ -261,8 +261,8 @@ private[extension] class ActorEventsMonitorActor private[extension] (
val metrics = storage.iterable.map { case (key, metrics) =>
currentSnapshot
.find(_._1.actorPath == key) // finds if metric already exists
.fold((Labels(key, node), metrics)) { case (labels, existingMetrics) =>
(labels, existingMetrics.addTo(metrics))
.fold((Attributes(key, node), metrics)) { case (attributes, existingMetrics) =>
(attributes, existingMetrics.addTo(metrics))
}
}.toVector

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ import io.scalac.mesmer.extension.config.BufferConfig
import io.scalac.mesmer.extension.config.CachingConfig
import io.scalac.mesmer.extension.metric.MetricObserver.Result
import io.scalac.mesmer.extension.metric.StreamMetricsMonitor
import io.scalac.mesmer.extension.metric.StreamMetricsMonitor.EagerLabels
import io.scalac.mesmer.extension.metric.StreamMetricsMonitor.{ Labels => GlobalLabels }
import io.scalac.mesmer.extension.metric.StreamMetricsMonitor.EagerAttributes
import io.scalac.mesmer.extension.metric.StreamMetricsMonitor.{ Attributes => GlobalAttributes }
import io.scalac.mesmer.extension.metric.StreamOperatorMetricsMonitor
import io.scalac.mesmer.extension.metric.StreamOperatorMetricsMonitor.Labels
import io.scalac.mesmer.extension.metric.StreamOperatorMetricsMonitor.Attributes
import io.scalac.mesmer.extension.service.ActorTreeService
import io.scalac.mesmer.extension.service.ActorTreeService.Command.GetActors
import io.scalac.mesmer.extension.service.actorTreeServiceKey
Expand Down Expand Up @@ -242,7 +242,7 @@ final class AkkaStreamMonitoring(
private val bufferConfig = BufferConfig.fromConfig(context.system.settings.config, AkkaStreamModule)
private val indexCache = ConnectionsIndexCache.bounded(cachingConfig.maxEntries)
private val operationsBoundMonitor = streamOperatorMonitor.bind()
private val boundStreamMonitor = streamMonitor.bind(EagerLabels(node))
private val boundStreamMonitor = streamMonitor.bind(EagerAttributes(node))

import context._

Expand All @@ -265,8 +265,8 @@ final class AkkaStreamMonitoring(
val streams = globalProcessedSnapshot.get()
streams.foreach { statsSeq =>
for (stats <- statsSeq) {
val labels = GlobalLabels(node, stats.streamName)
result.observe(stats.processesMessages, labels)
val attributes = GlobalAttributes(node, stats.streamName)
result.observe(stats.processesMessages, attributes)
}
}
}
Expand Down Expand Up @@ -470,26 +470,26 @@ final class AkkaStreamMonitoring(
localDemandSnapshot.clear()
}

private def observeSnapshot(result: Result[Long, Labels], snapshot: Option[Seq[SnapshotEntry]]): Unit =
private def observeSnapshot(result: Result[Long, Attributes], snapshot: Option[Seq[SnapshotEntry]]): Unit =
snapshot.foreach(_.foreach {
case SnapshotEntry(stageInfo, Some(StageData(value, connectedWith))) =>
val labels =
Labels(
val attributes =
Attributes(
stageInfo.stageName,
stageInfo.subStreamName.streamName,
stageInfo.terminal,
node,
Some(connectedWith)
)
result.observe(value, labels)
result.observe(value, attributes)
case _ => // ignore metrics without data
})

private def observeOperators(result: Result[Long, Labels], snapshot: Option[Seq[SnapshotEntry]]): Unit =
private def observeOperators(result: Result[Long, Attributes], snapshot: Option[Seq[SnapshotEntry]]): Unit =
snapshot.foreach(_.groupBy(_.stage.subStreamName.streamName).foreach { case (streamName, snapshots) =>
snapshots.groupBy(_.stage.stageName.nameOnly).foreach { case (stageName, elems) =>
val labels = Labels(stageName, streamName, terminal = false, node, None)
result.observe(elems.size, labels)
val attributes = Attributes(stageName, streamName, terminal = false, node, None)
result.observe(elems.size, attributes)
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.cluster.typed.Subscribe
import io.scalac.mesmer.core.model._
import io.scalac.mesmer.extension.ClusterEventsMonitor.Command.MemberEventWrapper
import io.scalac.mesmer.extension.metric.ClusterMetricsMonitor
import io.scalac.mesmer.extension.metric.ClusterMetricsMonitor.Labels
import io.scalac.mesmer.extension.metric.ClusterMetricsMonitor.Attributes

object ClusterEventsMonitor extends ClusterMonitorActor {

Expand All @@ -29,7 +29,7 @@ object ClusterEventsMonitor extends ClusterMonitorActor {
classOf[MemberEvent]
)

val boundMonitor = clusterMonitor.bind(Labels(selfMember.uniqueAddress.toNode))
val boundMonitor = clusterMonitor.bind(Attributes(selfMember.uniqueAddress.toNode))

boundMonitor.nodeDown.incValue(0L)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.scalac.mesmer.core.config.ConfigurationUtils._
import io.scalac.mesmer.core.model._
import io.scalac.mesmer.core.util.CachedQueryResult
import io.scalac.mesmer.extension.metric.ClusterMetricsMonitor
import io.scalac.mesmer.extension.metric.ClusterMetricsMonitor.Labels
import io.scalac.mesmer.extension.metric.ClusterMetricsMonitor.Attributes

class ClusterRegionsMonitorActor
object ClusterRegionsMonitorActor extends ClusterMonitorActor {
Expand All @@ -41,8 +41,8 @@ object ClusterRegionsMonitorActor extends ClusterMonitorActor {
import system.executionContext

val node = selfMember.uniqueAddress.toNode
val labels = Labels(node)
val boundMonitor = monitor.bind(labels)
val attributes = Attributes(node)
val boundMonitor = monitor.bind(attributes)

val regions = new Regions(
system,
Expand All @@ -52,7 +52,7 @@ object ClusterRegionsMonitorActor extends ClusterMonitorActor {
.setUpdater(result =>
entry.get.foreach { regionStats =>
val entities = regionStats.values.sum
result.observe(entities, Labels(node, Some(region)))
result.observe(entities, Attributes(node, Some(region)))
logger.trace("Recorded amount of entities per region {}", entities)
}
)
Expand All @@ -61,7 +61,7 @@ object ClusterRegionsMonitorActor extends ClusterMonitorActor {
.setUpdater(result =>
entry.get.foreach { regionStats =>
val shards = regionStats.size
result.observe(shards, Labels(node, Some(region)))
result.observe(shards, Attributes(node, Some(region)))
logger.trace("Recorded amount of shards per region {}", shards)
}
)
Expand All @@ -71,13 +71,13 @@ object ClusterRegionsMonitorActor extends ClusterMonitorActor {
boundMonitor.entitiesOnNode.setUpdater { result =>
regions.regionStats.map { regionsStats =>
val entities = regionsStats.view.values.flatMap(_.values).sum
result.observe(entities, labels)
result.observe(entities, attributes)
logger.trace("Recorded amount of entities on node {}", entities)
}
}

boundMonitor.shardRegionsOnNode.setUpdater { result =>
result.observe(regions.size, labels)
result.observe(regions.size, attributes)
logger.trace("Recorded amount of regions on node {}", regions)
}

Expand Down
Loading

0 comments on commit 0b9207f

Please sign in to comment.