diff --git a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/E2ETest.scala b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/E2ETest.scala index e396f8bf5..67134bc6a 100644 --- a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/E2ETest.scala +++ b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/E2ETest.scala @@ -12,7 +12,6 @@ import java.nio.file.Paths import com.dimafeng.testcontainers.DockerComposeContainer import com.dimafeng.testcontainers.ExposedService import io.circe.Json -import io.circe.parser._ import io.circe.yaml._ import io.circe.yaml.syntax._ import org.scalatest.EitherValues @@ -55,59 +54,69 @@ trait E2ETest extends PatienceConfiguration with EitherValues with TryValues wit "sbt" :: Nil } - private val (projectRoot, dockerComposeFile) = { + private val constructDockerComposePath = (projectRoot: File) => + Paths.get(projectRoot.getAbsolutePath, "examples/docker/docker-compose.yaml").toFile + + private val projectRoot = { // sbt shell needs `./`, IntelliJ run/debug configurations need `../../` val maybeProjectRoots = Set(new File("../"), new File("./")) - val constructDockerComposePath = (projectRoot: File) => - Paths.get(projectRoot.getAbsolutePath, "examples/docker/docker-compose.yaml").toFile - val projectRoot = maybeProjectRoots + maybeProjectRoots .find(maybeProjectRoot => constructDockerComposePath(maybeProjectRoot).exists()) .getOrElse(sys.error("Project root not found")) - - // remove `container_name` attribute from docker-compose services, because testcontainers do not support it, - // copy `examples/docker` to a temp directory where we apply the docker-compose transformation - val dockerComposeJson = Using(Source.fromFile(constructDockerComposePath(projectRoot))) { source => - parser.parse(source.reader()).value - }.success.value.hcursor - .downField("services") - .withFocus( - _.withObject(services => - Json.obj( - services.keys.map { serviceKey => - ( - serviceKey, - services(serviceKey).value.mapObject(_.filterKeys(_ != "container_name")) - ) - }.toSeq: _* - ) - ) - ) - .top - .value - .asYaml - .spaces2 - val tmpDockerDirectory = Files.createTempDirectory("mesmer-").toFile - FileUtils.copyDirectory(Paths.get(projectRoot.toString, "examples/docker").toFile, tmpDockerDirectory) - val tmpDockerCompose = Paths.get(tmpDockerDirectory.getAbsolutePath, "docker-compose.yaml").toFile - Files.write(tmpDockerCompose.toPath, dockerComposeJson.getBytes(StandardCharsets.UTF_8)) - (projectRoot, tmpDockerCompose) } - private val containerDef = DockerComposeContainer.Def( - dockerComposeFile, - exposedServices = Seq( - ExposedService("prometheus", 9090) - ) - ) - implicit override def patienceConfig: PatienceConfig = PatienceConfig( timeout = scaled(Span(60, Seconds)), interval = scaled(Span(150, Millis)) ) - protected def withExample(sbtCommand: String, startTestString: String = "Example started")( - prometheusApiBlock: PrometheusApi => Unit + protected def withExample( + sbtCommand: String, + additionalServices: Set[String] = Set.empty, + startTestString: String = "Example started" + )( + collectorApiBlock: OpenTelemetryCollectorApi => Unit ): Unit = { + val dockerComposeFile = { + // remove `container_name` attribute from docker-compose services, because testcontainers do not support it, + // copy `examples/docker` to a temp directory where we apply the docker-compose transformation + val dockerComposeJson = Using(Source.fromFile(constructDockerComposePath(projectRoot))) { source => + parser.parse(source.reader()).value + }.success.value.hcursor + .downField("services") + .withFocus( + _.withObject(services => + Json.obj( + services.keys + .filter((Set("otel-collector") ++ additionalServices).contains) + .map { serviceKey => + ( + serviceKey, + services(serviceKey).value.mapObject(_.filterKeys(_ != "container_name")) + ) + } + .toSeq: _* + ) + ) + ) + .top + .value + .asYaml + .spaces2 + val tmpDockerDirectory = Files.createTempDirectory("mesmer-").toFile + FileUtils.copyDirectory(Paths.get(projectRoot.toString, "examples/docker").toFile, tmpDockerDirectory) + val tmpDockerCompose = Paths.get(tmpDockerDirectory.getAbsolutePath, "docker-compose.yaml").toFile + Files.write(tmpDockerCompose.toPath, dockerComposeJson.getBytes(StandardCharsets.UTF_8)) + tmpDockerCompose + } + + val containerDef = DockerComposeContainer.Def( + dockerComposeFile, + exposedServices = Seq( + ExposedService(OpenTelemetryCollectorApi.serviceName, OpenTelemetryCollectorApi.prometheusExporterPort) + ) + ) + val sbtProcessFn = (block: () => Unit) => { val processHandlePromise = Promise[Unit]() val sbtOptions = List( @@ -176,7 +185,7 @@ trait E2ETest extends PatienceConfiguration with EitherValues with TryValues wit container.start() try - sbtProcessFn(() => prometheusApiBlock(new PrometheusApi(container))) + sbtProcessFn(() => collectorApiBlock(new OpenTelemetryCollectorApi(container))) finally container.stop() } @@ -185,21 +194,25 @@ trait E2ETest extends PatienceConfiguration with EitherValues with TryValues wit object E2ETest { - class PrometheusApi(container: DockerComposeContainer) { + object OpenTelemetryCollectorApi { + val serviceName = "otel-collector" + val prometheusExporterPort = 8889 + } + class OpenTelemetryCollectorApi(container: DockerComposeContainer) { + import OpenTelemetryCollectorApi._ private val logger = LoggerFactory.getLogger(getClass) - val prometheusPort = { - val port = container.getServicePort("prometheus", 9090) - logger.info(s"Prometheus service bound to http://localhost:$port") + val prometheusExporterBoundPort = { + val port = container.getServicePort(serviceName, prometheusExporterPort) + logger.info(s"OpenTelemetry Collector service Prometheus exporter bound to http://localhost:$port") port } - def assert( - query: String, - block: Json => Unit + def assertMetrics( + block: String => Unit ): Unit = { - val urlString = s"http://localhost:$prometheusPort/api/v1/query?query=$query" + val urlString = s"http://localhost:$prometheusExporterBoundPort/metrics" val request = HttpRequest .newBuilder() .uri(URI.create(urlString)) @@ -208,11 +221,7 @@ object E2ETest { .newBuilder() .build() val response = client.send(request, BodyHandlers.ofString()) - parse(response.body()) - .fold( - ex => sys.error(s"failed parsing response [${response.body()}] to JSON, error $ex"), - json => block(json) - ) + block(response.body()) } } } diff --git a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaStreamTest.scala b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaStreamTest.scala index 341c3f5e8..35fb1f380 100644 --- a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaStreamTest.scala +++ b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaStreamTest.scala @@ -2,19 +2,20 @@ package io.scalac.mesmer.e2e import org.scalatest.wordspec.AnyWordSpec -class ExampleAkkaStreamTest extends AnyWordSpec with E2ETest with PrometheusMetrics { +class ExampleAkkaStreamTest extends AnyWordSpec with E2ETest with PrometheusExporterMetrics { + import PrometheusExporterMetrics.Metric._ private val akkaStreamMetrics = Seq( - "mesmer_akka_streams_running_streams", - "mesmer_akka_streams_actors", - "mesmer_akka_stream_processed_messages", - "mesmer_akka_streams_running_operators", - "mesmer_akka_streams_operator_demand" - ).map("promexample_" + _) + Gauge("mesmer_akka_streams_running_streams"), + Gauge("mesmer_akka_streams_actors"), + Counter("mesmer_akka_stream_processed_messages"), + Counter("mesmer_akka_streams_running_operators"), + Counter("mesmer_akka_streams_operator_demand") + ) "Akka Stream example" should { - "produce stream metrics" in withExample("exampleAkkaStream/run") { prometheusApi => - akkaStreamMetrics.foreach(assertMetricExists(prometheusApi)(_)) + "produce stream metrics" in withExample("exampleAkkaStream/run") { collectorApiBlock => + assertMetricsExists(collectorApiBlock, "promexample")(akkaStreamMetrics) } } } diff --git a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaTest.scala b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaTest.scala index 91b1fde84..28448a132 100644 --- a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaTest.scala +++ b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaTest.scala @@ -9,7 +9,8 @@ import java.util.UUID import org.scalatest.wordspec.AnyWordSpec -class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusMetrics { +class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusExporterMetrics { + import PrometheusExporterMetrics.Metric._ // Akka example does not produce following metrics: // mesmer_akka_actor_failed_messages @@ -18,27 +19,29 @@ class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusMetrics { // mesmer_akka_cluster_unreachable_nodes private val akkaMetrics = { val actorMetrics = Seq( - "mesmer_akka_actor_mailbox_size", - "mesmer_akka_actor_stashed_messages", - "mesmer_akka_actor_sent_messages", - "mesmer_akka_actor_actors_created", - "mesmer_akka_actor_actors_terminated" - ) ++ - prometheusHistogram("mesmer_akka_actor_message_processing_time") ++ - prometheusHistogram("mesmer_akka_actor_mailbox_time") - val persistenceMetrics = Seq("mesmer_akka_persistence_snapshot") ++ - prometheusHistogram("mesmer_akka_persistence_recovery_time") ++ - prometheusHistogram("mesmer_akka_persistence_event_time") + Gauge("mesmer_akka_actor_mailbox_size"), + Counter("mesmer_akka_actor_stashed_messages"), + Counter("mesmer_akka_actor_sent_messages"), + Counter("mesmer_akka_actor_actors_created"), + Counter("mesmer_akka_actor_actors_terminated"), + Histogram("mesmer_akka_actor_message_processing_time"), + Histogram("mesmer_akka_actor_mailbox_time") + ) + val persistenceMetrics = Seq( + Counter("mesmer_akka_persistence_snapshot"), + Histogram("mesmer_akka_persistence_recovery_time"), + Histogram("mesmer_akka_persistence_event_time") + ) val clusterMetrics = Seq( - "mesmer_akka_cluster_node_down", - "mesmer_akka_cluster_reachable_nodes", - "mesmer_akka_cluster_entities_on_node", - "mesmer_akka_cluster_shards_per_region", - "mesmer_akka_cluster_entities_per_region", - "mesmer_akka_cluster_shard_regions_on_node" + Counter("mesmer_akka_cluster_node_down"), + Gauge("mesmer_akka_cluster_reachable_nodes"), + Gauge("mesmer_akka_cluster_entities_on_node"), + Gauge("mesmer_akka_cluster_shards_per_region"), + Gauge("mesmer_akka_cluster_entities_per_region"), + Gauge("mesmer_akka_cluster_shard_regions_on_node") ) actorMetrics ++ persistenceMetrics ++ clusterMetrics - }.map("promexample_" + _) + } private def exampleApiRequest(accountId: UUID) = { val request = HttpRequest @@ -53,13 +56,17 @@ class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusMetrics { } "Akka example" should { - "produce metrics" in withExample("exampleAkka/run", startTestString = "Starting http server at") { prometheusApi => + "produce metrics" in withExample( + "exampleAkka/run", + additionalServices = Set("postgres"), + startTestString = "Starting http server at" + ) { collectorApiBlock => // to produce cluster and persistence metrics val accountId = UUID.randomUUID() // snapshots are created for every 10 events (1 to 10).foreach(_ => exampleApiRequest(accountId)) - akkaMetrics.foreach(assertMetricExists(prometheusApi)(_)) + assertMetricsExists(collectorApiBlock, "promexample")(akkaMetrics) } } } diff --git a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleZioTest.scala b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleZioTest.scala index 2955628df..dd0c81e33 100644 --- a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleZioTest.scala +++ b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleZioTest.scala @@ -2,24 +2,26 @@ package io.scalac.mesmer.e2e import org.scalatest.wordspec.AnyWordSpec -class ExampleZioTest extends AnyWordSpec with E2ETest with PrometheusMetrics { +class ExampleZioTest extends AnyWordSpec with E2ETest with PrometheusExporterMetrics { + import PrometheusExporterMetrics.Metric._ - private val zioMetrics = (Seq( - "mesmer_zio_executor_size", - "mesmer_zio_executor_capacity", - "mesmer_zio_executor_concurrency", - "mesmer_zio_executor_worker_count", - "mesmer_zio_executor_dequeued_count", - "mesmer_zio_executor_enqueued_count", - "mesmer_zio_forwarded_jvm_info", - "mesmer_zio_forwarded_zio_fiber_started", - "mesmer_zio_forwarded_zio_fiber_successes", - "mesmer_zio_forwarded_zio_fiber_fork_locations" - ) ++ prometheusHistogram("mesmer_zio_forwarded_zio_fiber_lifetimes")).map("promexample_" + _) + private val zioMetrics = Seq( + Gauge("mesmer_zio_executor_size"), + Gauge("mesmer_zio_executor_capacity"), + Gauge("mesmer_zio_executor_concurrency"), + Gauge("mesmer_zio_executor_worker_count"), + Gauge("mesmer_zio_executor_dequeued_count"), + Gauge("mesmer_zio_executor_enqueued_count"), + Gauge("mesmer_zio_forwarded_jvm_info"), + Counter("mesmer_zio_forwarded_zio_fiber_started"), + Counter("mesmer_zio_forwarded_zio_fiber_successes"), + Counter("mesmer_zio_forwarded_zio_fiber_fork_locations"), + Histogram("mesmer_zio_forwarded_zio_fiber_lifetimes") + ) "ZIO example" should { - "produce both runtime and executor metrics" in withExample("exampleZio/run") { prometheusApi => - zioMetrics.foreach(assertMetricExists(prometheusApi)(_)) + "produce both runtime and executor metrics" in withExample("exampleZio/run") { collectorApiBlock => + assertMetricsExists(collectorApiBlock, "promexample")(zioMetrics) } } } diff --git a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/PrometheusExporterMetrics.scala b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/PrometheusExporterMetrics.scala new file mode 100644 index 000000000..914608a2b --- /dev/null +++ b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/PrometheusExporterMetrics.scala @@ -0,0 +1,57 @@ +package io.scalac.mesmer.e2e + +import io.scalac.mesmer.e2e.E2ETest.OpenTelemetryCollectorApi +import org.scalatest.EitherValues +import org.scalatest.Suite +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers + +import scala.jdk.CollectionConverters._ + +trait PrometheusExporterMetrics extends Eventually with Matchers with EitherValues { this: Suite => + import PrometheusExporterMetrics._ + + def assertMetricsExists(collectorApi: OpenTelemetryCollectorApi, metricNamePrefix: String)( + metrics: Seq[Metric] + ): Unit = + eventually { + collectorApi.assertMetrics { response => + val expected = metrics.map { + case Metric.Counter(name) => + s"# TYPE ${metricNamePrefix}_$name counter" + case Metric.Gauge(name) => + s"# TYPE ${metricNamePrefix}_$name gauge" + case Metric.Histogram(name) => + s"# TYPE ${metricNamePrefix}_$name histogram" + } + + val actual = response + .lines() + .iterator() + .asScala + .filter(_.startsWith("# TYPE")) + .toSeq + + val result = expected.diff(actual) + + if (result.nonEmpty) { + fail( + s"Prometheus exporter missing expected metrics - [${result.mkString("[", ",", "]")}], \nexpected - ${expected + .mkString("[", ",", "]")}, \nactual - ${actual.mkString("[", ",", "]")}" + ) + } + } + } + +} + +object PrometheusExporterMetrics { + sealed trait Metric { + val name: String + } + object Metric { + case class Counter(name: String) extends Metric + case class Gauge(name: String) extends Metric + case class Histogram(name: String) extends Metric + } +} diff --git a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/PrometheusMetrics.scala b/e2e-test/src/it/scala/io/scalac/mesmer/e2e/PrometheusMetrics.scala deleted file mode 100644 index d13dbc5ff..000000000 --- a/e2e-test/src/it/scala/io/scalac/mesmer/e2e/PrometheusMetrics.scala +++ /dev/null @@ -1,32 +0,0 @@ -package io.scalac.mesmer.e2e - -import io.circe.Json -import io.scalac.mesmer.e2e.E2ETest.PrometheusApi -import org.scalatest.EitherValues -import org.scalatest.Suite -import org.scalatest.concurrent.Eventually -import org.scalatest.matchers.should.Matchers - -trait PrometheusMetrics extends Eventually with Matchers with EitherValues { this: Suite => - - def prometheusHistogram(metricName: String): Seq[String] = Seq( - "sum", - "count", - "bucket" - ).map(s"${metricName}_" + _) - - def assertMetricExists(prometheusApi: PrometheusApi)(metricName: String): Unit = - withClue(s"Metric [$metricName] should be produced") { - eventually { - prometheusApi.assert( - metricName, - response => - response.hcursor - .downField("data") - .downField("result") - .as[Seq[Json]] - .value should not be empty - ) - } - } -} diff --git a/examples/docker/docker-compose.yaml b/examples/docker/docker-compose.yaml index caf392baf..8e8d97a75 100644 --- a/examples/docker/docker-compose.yaml +++ b/examples/docker/docker-compose.yaml @@ -15,7 +15,7 @@ services: otel-collector: container_name: mesmer_example_otel_collector - image: otel/opentelemetry-collector-contrib-dev:latest + image: otel/opentelemetry-collector-contrib:0.75.0 command: [--config=/etc/otel-collector-config.yaml, ''] volumes: - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml