Skip to content

Commit

Permalink
Merge pull request #471 from ScalaConsultants/reinstate-actors-create…
Browse files Browse the repository at this point in the history
…d-metric

Reinstate actors created metric
  • Loading branch information
lgajowy authored Aug 18, 2022
2 parents 10e139c + 4b6ea09 commit 46810c1
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import akka.actor.{ ActorSystem, PoisonPill }
import akka.dispatch.{ BoundedPriorityMailbox, BoundedStablePriorityMailbox, Envelope }
import com.typesafe.config.{ Config, ConfigFactory }
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.metrics.data.MetricDataType
import io.scalac.mesmer.agent.utils.{ OtelAgentTest, SafeLoadSystem }
import io.scalac.mesmer.core.akka.model.AttributeNames
import io.scalac.mesmer.core.config.MesmerPatienceConfig
Expand Down Expand Up @@ -128,7 +127,7 @@ class ActorMailboxTest
props
)

assertMetric("mesmer_akka_dropped_total") { data =>
assertMetric("mesmer_akka_actor_dropped_messages_total") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.actor.typed.{ ActorRef, Behavior, SupervisorStrategy }
import akka.actor.{ PoisonPill, Props }
import akka.{ actor => classic }
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.metrics.data.{ LongPointData, MetricDataType }
import io.scalac.mesmer.agent.utils.{ OtelAgentTest, SafeLoadSystem }
import io.scalac.mesmer.core.actor.{ ActorCellDecorator, ActorCellMetrics }
import io.scalac.mesmer.core.akka.model.AttributeNames
Expand Down Expand Up @@ -35,14 +34,12 @@ final class AkkaActorTest

private val Tolerance: Double = scaled(25.millis).toMillis.toDouble

override protected def beforeEach() {
override protected def beforeEach(): Unit =
super.beforeEach()
}

import AkkaActorAgentTest._

private final val StashMessageCount = 10
private final val ToleranceNanos = 20_000_000

private def publishActorContext[T](
contextRef: ActorRef[classic.ActorContext]
Expand Down Expand Up @@ -97,7 +94,7 @@ final class AkkaActorTest
private def testBehavior[T](behavior: T => Behavior[T])(messages: T*)(
checks: (Int, classic.ActorContext => Any)*
): Any =
testWithChecks(_ => Behaviors.receiveMessage[T](behavior), false)(messages: _*)(checks: _*)
testWithChecks(_ => Behaviors.receiveMessage[T](behavior), probes = false)(messages: _*)(checks: _*)

private def testEffect[T](effect: T => Unit, probes: Boolean = true)(messages: T*)(
checks: (Int, classic.ActorContext => Any)*
Expand All @@ -113,14 +110,25 @@ final class AkkaActorTest
private def check[T](extr: ActorCellMetrics => T)(checkFunc: T => Any): classic.ActorContext => Any = ctx =>
checkFunc(ActorCellDecorator.getMetrics(ctx).map(extr).value)

"AkkaActorAgent" should "record actors created total count" in {
system.systemActorOf(Behaviors.ignore, createUniqueId)

assertMetric("mesmer_akka_actor_actors_created_total") { data =>
val totalCount = data.getLongSumData.getPoints.asScala.map {
_.getValue
}.sum
totalCount should be > 0L
}
}

"AkkaActorAgent" should "record mailbox time properly" in {
val idle = 100.milliseconds
val messages = 3
val waitingMessages = messages - 1
val expectedValue = idle.toMillis.toDouble

val check: classic.ActorContext => Any = ctx =>
assertMetric("mesmer_akka_mailbox_time") { data =>
assertMetric("mesmer_akka_actor_mailbox_time") { data =>
val points = data.getHistogramData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
Expand Down Expand Up @@ -152,7 +160,7 @@ final class AkkaActorTest
val expectedValue = processing.toMillis

val check: classic.ActorContext => Any = ctx =>
assertMetric("mesmer_akka_message_processing_time") { data =>
assertMetric("mesmer_akka_actor_message_processing_time") { data =>
val points = data.getHistogramData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
Expand Down Expand Up @@ -181,7 +189,7 @@ final class AkkaActorTest
List.fill(count)(Message).foreach(m => stashActor ! m)

def expectStashSize(size: Int): Unit =
assertMetric("mesmer_akka_stashed_messages_total") { data =>
assertMetric("mesmer_akka_actor_stashed_messages_total") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
Expand All @@ -205,14 +213,13 @@ final class AkkaActorTest
}

it should "record stash operation from actors beginning for typed actors" in {
val stashActor = system.systemActorOf(TypedStash(StashMessageCount * 4), "typedStashActor")
val inspectionProbe = createTestProbe[StashSize]
val stashActor = system.systemActorOf(TypedStash(StashMessageCount * 4), "typedStashActor")

def sendMessage(count: Int): Unit =
List.fill(count)(Message).foreach(stashActor.tell)

def expectStashSize(size: Int): Unit =
assertMetric("mesmer_akka_stashed_messages_total") { data =>
assertMetric("mesmer_akka_actor_stashed_messages_total") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
Expand Down Expand Up @@ -248,7 +255,7 @@ final class AkkaActorTest
it should "record the amount of failed messages without supervision" in {

def expect(assert: Vector[Long] => Any)(context: classic.ActorContext): Any =
assertMetric("mesmer_akka_failed_messages_total") { data =>
assertMetric("mesmer_akka_actor_failed_messages_total") { data =>
// if no data is found it's OK too
if (!data.isEmpty) {
val points = data.getLongSumData.getPoints.asScala
Expand Down Expand Up @@ -280,7 +287,7 @@ final class AkkaActorTest
it should "record the amount of failed messages with supervision" in {

def expect(assert: Vector[Long] => Any)(context: classic.ActorContext): Any =
assertMetric("mesmer_akka_failed_messages_total") { data =>
assertMetric("mesmer_akka_actor_failed_messages_total") { data =>
// if no data is found it's OK too
if (!data.isEmpty) {
val points = data.getLongSumData.getPoints.asScala
Expand Down Expand Up @@ -319,7 +326,7 @@ final class AkkaActorTest
def expectEmpty(
context: classic.ActorContext
): Any =
assertMetric("mesmer_akka_unhandled_messages_total") { data =>
assertMetric("mesmer_akka_actor_unhandled_messages_total") { data =>
if (!data.isEmpty) {
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Expand All @@ -335,7 +342,7 @@ final class AkkaActorTest
def expectNum(num: Int)(
context: classic.ActorContext
): Any =
assertMetric("mesmer_akka_unhandled_messages_total") { data =>
assertMetric("mesmer_akka_actor_unhandled_messages_total") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
Expand Down Expand Up @@ -377,7 +384,7 @@ final class AkkaActorTest
val receiver = classicSystem.actorOf(classic.Props(new Receiver), createUniqueId)
val sender = system.classicSystem.actorOf(classic.Props(new Sender(receiver)), createUniqueId)

def expectedSendMessages(num: Int): Any = assertMetric("mesmer_akka_sent_messages_total") { data =>
def expectedSendMessages(num: Int): Any = assertMetric("mesmer_akka_actor_sent_messages_total") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
Expand All @@ -402,14 +409,15 @@ final class AkkaActorTest

it should "record the amount of sent messages properly in typed akka" in {

def expectedEmpty(context: classic.ActorContext): Any = assertMetric("mesmer_akka_sent_messages_total") { data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
.contains(context.self.path.toStringWithoutAddress)
)
.toVector
points.map(_.getValue) should be(empty)
def expectedEmpty(context: classic.ActorContext): Any = assertMetric("mesmer_akka_actor_sent_messages_total") {
data =>
val points = data.getLongSumData.getPoints.asScala
.filter(point =>
Option(point.getAttributes.get(AttributeKey.stringKey(AttributeNames.ActorPath)))
.contains(context.self.path.toStringWithoutAddress)
)
.toVector
points.map(_.getValue) should be(empty)
}

testWithChecks(
Expand All @@ -424,7 +432,7 @@ final class AkkaActorTest
case _ => Behaviors.same
}
},
false
probes = false
)("forward", "", "forward", "forward")(
(0, expectedEmpty),
(1, expectedEmpty),
Expand All @@ -441,15 +449,20 @@ object AkkaActorAgentTest {
type Fixture = TestProbe[ActorEvent]

sealed trait Command
final case object Open extends Command
final case object Close extends Command

final case object Open extends Command

final case object Close extends Command

final case object Message extends Command

// replies
final case class StashSize(stash: Option[Long])

object ClassicStashActor {
def props(): Props = Props(new ClassicStashActor)
}

class ClassicStashActor extends classic.Actor with classic.Stash with classic.ActorLogging {
def receive: Receive = closed

Expand Down Expand Up @@ -493,6 +506,7 @@ object AkkaActorAgentTest {
buffer.stash(m)
Behaviors.same
}

private def open(): Behavior[Command] =
Behaviors.receiveMessagePartial {
case Close =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package akka.actor.impl;

import io.scalac.mesmer.otelextension.instrumentations.akka.actor.InstrumentsProvider;
import net.bytebuddy.asm.Advice;

public class LocalActorRefProviderAdvice {

@Advice.OnMethodExit
public static void actorOfExit() {
InstrumentsProvider.instance().actorsCreated().add(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public List<TypeInstrumentation> typeInstrumentations() {
AkkaActorAgent.boundedQueueBasedMessageQueueConstructorAdvice(),
AkkaActorAgent.boundedQueueBasedMessageQueueQueueAdvice(),
AkkaActorAgent.boundedMessageQueueSemanticsEnqueueAdvice(),
AkkaActorAgent.actorCellReceived());
AkkaActorAgent.actorCellReceived(),
AkkaActorAgent.actorCreatedAdvice());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,4 +150,10 @@ object AkkaActorAgent {
)
)

val actorCreatedAdvice: TypeInstrumentation = Instrumentation(
matchers.named("akka.actor.LocalActorRefProvider")
).`with`(
Advice(named("actorOf"), "akka.actor.impl.LocalActorRefProviderAdvice")
)

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,53 @@ trait Instruments {
def stashedMessages: LongCounter

def sentMessages: LongCounter

def actorsCreated: LongCounter
}

object Instruments {
def apply(provider: MeterProvider): Instruments = new Instruments {

lazy val failedMessages: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_failed_messages_total")
.counterBuilder("mesmer_akka_actor_failed_messages_total")
.build()

lazy val processingTime: LongHistogram = provider
.get("mesmer")
.histogramBuilder("mesmer_akka_message_processing_time")
.histogramBuilder("mesmer_akka_actor_message_processing_time")
.ofLongs()
.build()

lazy val unhandledMessages: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_unhandled_messages_total")
.counterBuilder("mesmer_akka_actor_unhandled_messages_total")
.build()

lazy val droppedMessages: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_dropped_total")
.counterBuilder("mesmer_akka_actor_dropped_messages_total")
.build()

lazy val mailboxTime: LongHistogram = provider
.get("mesmer")
.histogramBuilder("mesmer_akka_mailbox_time")
.histogramBuilder("mesmer_akka_actor_mailbox_time")
.ofLongs()
.build()

lazy val stashedMessages: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_stashed_messages_total")
.counterBuilder("mesmer_akka_actor_stashed_messages_total")
.build()

lazy val sentMessages: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_sent_messages_total")
.counterBuilder("mesmer_akka_actor_sent_messages_total")
.build()

lazy val actorsCreated: LongCounter = provider
.get("mesmer")
.counterBuilder("mesmer_akka_actor_actors_created_total")
.build()
}
}
Expand Down

0 comments on commit 46810c1

Please sign in to comment.