Skip to content

Commit

Permalink
Merge pull request #669 from ptrdom/e2e-test-fix
Browse files Browse the repository at this point in the history
Fix e2e tests
  • Loading branch information
lgajowy authored Apr 6, 2023
2 parents c722d99 + 9643759 commit 8dd2e6d
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 134 deletions.
121 changes: 65 additions & 56 deletions e2e-test/src/it/scala/io/scalac/mesmer/e2e/E2ETest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import java.nio.file.Paths
import com.dimafeng.testcontainers.DockerComposeContainer
import com.dimafeng.testcontainers.ExposedService
import io.circe.Json
import io.circe.parser._
import io.circe.yaml._
import io.circe.yaml.syntax._
import org.scalatest.EitherValues
Expand Down Expand Up @@ -55,59 +54,69 @@ trait E2ETest extends PatienceConfiguration with EitherValues with TryValues wit
"sbt" :: Nil
}

private val (projectRoot, dockerComposeFile) = {
private val constructDockerComposePath = (projectRoot: File) =>
Paths.get(projectRoot.getAbsolutePath, "examples/docker/docker-compose.yaml").toFile

private val projectRoot = {
// sbt shell needs `./`, IntelliJ run/debug configurations need `../../`
val maybeProjectRoots = Set(new File("../"), new File("./"))
val constructDockerComposePath = (projectRoot: File) =>
Paths.get(projectRoot.getAbsolutePath, "examples/docker/docker-compose.yaml").toFile
val projectRoot = maybeProjectRoots
maybeProjectRoots
.find(maybeProjectRoot => constructDockerComposePath(maybeProjectRoot).exists())
.getOrElse(sys.error("Project root not found"))

// remove `container_name` attribute from docker-compose services, because testcontainers do not support it,
// copy `examples/docker` to a temp directory where we apply the docker-compose transformation
val dockerComposeJson = Using(Source.fromFile(constructDockerComposePath(projectRoot))) { source =>
parser.parse(source.reader()).value
}.success.value.hcursor
.downField("services")
.withFocus(
_.withObject(services =>
Json.obj(
services.keys.map { serviceKey =>
(
serviceKey,
services(serviceKey).value.mapObject(_.filterKeys(_ != "container_name"))
)
}.toSeq: _*
)
)
)
.top
.value
.asYaml
.spaces2
val tmpDockerDirectory = Files.createTempDirectory("mesmer-").toFile
FileUtils.copyDirectory(Paths.get(projectRoot.toString, "examples/docker").toFile, tmpDockerDirectory)
val tmpDockerCompose = Paths.get(tmpDockerDirectory.getAbsolutePath, "docker-compose.yaml").toFile
Files.write(tmpDockerCompose.toPath, dockerComposeJson.getBytes(StandardCharsets.UTF_8))
(projectRoot, tmpDockerCompose)
}

private val containerDef = DockerComposeContainer.Def(
dockerComposeFile,
exposedServices = Seq(
ExposedService("prometheus", 9090)
)
)

implicit override def patienceConfig: PatienceConfig = PatienceConfig(
timeout = scaled(Span(60, Seconds)),
interval = scaled(Span(150, Millis))
)

protected def withExample(sbtCommand: String, startTestString: String = "Example started")(
prometheusApiBlock: PrometheusApi => Unit
protected def withExample(
sbtCommand: String,
additionalServices: Set[String] = Set.empty,
startTestString: String = "Example started"
)(
collectorApiBlock: OpenTelemetryCollectorApi => Unit
): Unit = {
val dockerComposeFile = {
// remove `container_name` attribute from docker-compose services, because testcontainers do not support it,
// copy `examples/docker` to a temp directory where we apply the docker-compose transformation
val dockerComposeJson = Using(Source.fromFile(constructDockerComposePath(projectRoot))) { source =>
parser.parse(source.reader()).value
}.success.value.hcursor
.downField("services")
.withFocus(
_.withObject(services =>
Json.obj(
services.keys
.filter((Set("otel-collector") ++ additionalServices).contains)
.map { serviceKey =>
(
serviceKey,
services(serviceKey).value.mapObject(_.filterKeys(_ != "container_name"))
)
}
.toSeq: _*
)
)
)
.top
.value
.asYaml
.spaces2
val tmpDockerDirectory = Files.createTempDirectory("mesmer-").toFile
FileUtils.copyDirectory(Paths.get(projectRoot.toString, "examples/docker").toFile, tmpDockerDirectory)
val tmpDockerCompose = Paths.get(tmpDockerDirectory.getAbsolutePath, "docker-compose.yaml").toFile
Files.write(tmpDockerCompose.toPath, dockerComposeJson.getBytes(StandardCharsets.UTF_8))
tmpDockerCompose
}

val containerDef = DockerComposeContainer.Def(
dockerComposeFile,
exposedServices = Seq(
ExposedService(OpenTelemetryCollectorApi.serviceName, OpenTelemetryCollectorApi.prometheusExporterPort)
)
)

val sbtProcessFn = (block: () => Unit) => {
val processHandlePromise = Promise[Unit]()
val sbtOptions = List(
Expand Down Expand Up @@ -176,7 +185,7 @@ trait E2ETest extends PatienceConfiguration with EitherValues with TryValues wit
container.start()

try
sbtProcessFn(() => prometheusApiBlock(new PrometheusApi(container)))
sbtProcessFn(() => collectorApiBlock(new OpenTelemetryCollectorApi(container)))
finally
container.stop()
}
Expand All @@ -185,21 +194,25 @@ trait E2ETest extends PatienceConfiguration with EitherValues with TryValues wit

object E2ETest {

class PrometheusApi(container: DockerComposeContainer) {
object OpenTelemetryCollectorApi {
val serviceName = "otel-collector"
val prometheusExporterPort = 8889
}
class OpenTelemetryCollectorApi(container: DockerComposeContainer) {
import OpenTelemetryCollectorApi._

private val logger = LoggerFactory.getLogger(getClass)

val prometheusPort = {
val port = container.getServicePort("prometheus", 9090)
logger.info(s"Prometheus service bound to http://localhost:$port")
val prometheusExporterBoundPort = {
val port = container.getServicePort(serviceName, prometheusExporterPort)
logger.info(s"OpenTelemetry Collector service Prometheus exporter bound to http://localhost:$port")
port
}

def assert(
query: String,
block: Json => Unit
def assertMetrics(
block: String => Unit
): Unit = {
val urlString = s"http://localhost:$prometheusPort/api/v1/query?query=$query"
val urlString = s"http://localhost:$prometheusExporterBoundPort/metrics"
val request = HttpRequest
.newBuilder()
.uri(URI.create(urlString))
Expand All @@ -208,11 +221,7 @@ object E2ETest {
.newBuilder()
.build()
val response = client.send(request, BodyHandlers.ofString())
parse(response.body())
.fold(
ex => sys.error(s"failed parsing response [${response.body()}] to JSON, error $ex"),
json => block(json)
)
block(response.body())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package io.scalac.mesmer.e2e

import org.scalatest.wordspec.AnyWordSpec

class ExampleAkkaStreamTest extends AnyWordSpec with E2ETest with PrometheusMetrics {
class ExampleAkkaStreamTest extends AnyWordSpec with E2ETest with PrometheusExporterMetrics {
import PrometheusExporterMetrics.Metric._

private val akkaStreamMetrics = Seq(
"mesmer_akka_streams_running_streams",
"mesmer_akka_streams_actors",
"mesmer_akka_stream_processed_messages",
"mesmer_akka_streams_running_operators",
"mesmer_akka_streams_operator_demand"
).map("promexample_" + _)
Gauge("mesmer_akka_streams_running_streams"),
Gauge("mesmer_akka_streams_actors"),
Counter("mesmer_akka_stream_processed_messages"),
Counter("mesmer_akka_streams_running_operators"),
Counter("mesmer_akka_streams_operator_demand")
)

"Akka Stream example" should {
"produce stream metrics" in withExample("exampleAkkaStream/run") { prometheusApi =>
akkaStreamMetrics.foreach(assertMetricExists(prometheusApi)(_))
"produce stream metrics" in withExample("exampleAkkaStream/run") { collectorApiBlock =>
assertMetricsExists(collectorApiBlock, "promexample")(akkaStreamMetrics)
}
}
}
49 changes: 28 additions & 21 deletions e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleAkkaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import java.util.UUID

import org.scalatest.wordspec.AnyWordSpec

class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusMetrics {
class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusExporterMetrics {
import PrometheusExporterMetrics.Metric._

// Akka example does not produce following metrics:
// mesmer_akka_actor_failed_messages
Expand All @@ -18,27 +19,29 @@ class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusMetrics {
// mesmer_akka_cluster_unreachable_nodes
private val akkaMetrics = {
val actorMetrics = Seq(
"mesmer_akka_actor_mailbox_size",
"mesmer_akka_actor_stashed_messages",
"mesmer_akka_actor_sent_messages",
"mesmer_akka_actor_actors_created",
"mesmer_akka_actor_actors_terminated"
) ++
prometheusHistogram("mesmer_akka_actor_message_processing_time") ++
prometheusHistogram("mesmer_akka_actor_mailbox_time")
val persistenceMetrics = Seq("mesmer_akka_persistence_snapshot") ++
prometheusHistogram("mesmer_akka_persistence_recovery_time") ++
prometheusHistogram("mesmer_akka_persistence_event_time")
Gauge("mesmer_akka_actor_mailbox_size"),
Counter("mesmer_akka_actor_stashed_messages"),
Counter("mesmer_akka_actor_sent_messages"),
Counter("mesmer_akka_actor_actors_created"),
Counter("mesmer_akka_actor_actors_terminated"),
Histogram("mesmer_akka_actor_message_processing_time"),
Histogram("mesmer_akka_actor_mailbox_time")
)
val persistenceMetrics = Seq(
Counter("mesmer_akka_persistence_snapshot"),
Histogram("mesmer_akka_persistence_recovery_time"),
Histogram("mesmer_akka_persistence_event_time")
)
val clusterMetrics = Seq(
"mesmer_akka_cluster_node_down",
"mesmer_akka_cluster_reachable_nodes",
"mesmer_akka_cluster_entities_on_node",
"mesmer_akka_cluster_shards_per_region",
"mesmer_akka_cluster_entities_per_region",
"mesmer_akka_cluster_shard_regions_on_node"
Counter("mesmer_akka_cluster_node_down"),
Gauge("mesmer_akka_cluster_reachable_nodes"),
Gauge("mesmer_akka_cluster_entities_on_node"),
Gauge("mesmer_akka_cluster_shards_per_region"),
Gauge("mesmer_akka_cluster_entities_per_region"),
Gauge("mesmer_akka_cluster_shard_regions_on_node")
)
actorMetrics ++ persistenceMetrics ++ clusterMetrics
}.map("promexample_" + _)
}

private def exampleApiRequest(accountId: UUID) = {
val request = HttpRequest
Expand All @@ -53,13 +56,17 @@ class ExampleAkkaTest extends AnyWordSpec with E2ETest with PrometheusMetrics {
}

"Akka example" should {
"produce metrics" in withExample("exampleAkka/run", startTestString = "Starting http server at") { prometheusApi =>
"produce metrics" in withExample(
"exampleAkka/run",
additionalServices = Set("postgres"),
startTestString = "Starting http server at"
) { collectorApiBlock =>
// to produce cluster and persistence metrics
val accountId = UUID.randomUUID()
// snapshots are created for every 10 events
(1 to 10).foreach(_ => exampleApiRequest(accountId))

akkaMetrics.foreach(assertMetricExists(prometheusApi)(_))
assertMetricsExists(collectorApiBlock, "promexample")(akkaMetrics)
}
}
}
32 changes: 17 additions & 15 deletions e2e-test/src/it/scala/io/scalac/mesmer/e2e/ExampleZioTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,26 @@ package io.scalac.mesmer.e2e

import org.scalatest.wordspec.AnyWordSpec

class ExampleZioTest extends AnyWordSpec with E2ETest with PrometheusMetrics {
class ExampleZioTest extends AnyWordSpec with E2ETest with PrometheusExporterMetrics {
import PrometheusExporterMetrics.Metric._

private val zioMetrics = (Seq(
"mesmer_zio_executor_size",
"mesmer_zio_executor_capacity",
"mesmer_zio_executor_concurrency",
"mesmer_zio_executor_worker_count",
"mesmer_zio_executor_dequeued_count",
"mesmer_zio_executor_enqueued_count",
"mesmer_zio_forwarded_jvm_info",
"mesmer_zio_forwarded_zio_fiber_started",
"mesmer_zio_forwarded_zio_fiber_successes",
"mesmer_zio_forwarded_zio_fiber_fork_locations"
) ++ prometheusHistogram("mesmer_zio_forwarded_zio_fiber_lifetimes")).map("promexample_" + _)
private val zioMetrics = Seq(
Gauge("mesmer_zio_executor_size"),
Gauge("mesmer_zio_executor_capacity"),
Gauge("mesmer_zio_executor_concurrency"),
Gauge("mesmer_zio_executor_worker_count"),
Gauge("mesmer_zio_executor_dequeued_count"),
Gauge("mesmer_zio_executor_enqueued_count"),
Gauge("mesmer_zio_forwarded_jvm_info"),
Counter("mesmer_zio_forwarded_zio_fiber_started"),
Counter("mesmer_zio_forwarded_zio_fiber_successes"),
Counter("mesmer_zio_forwarded_zio_fiber_fork_locations"),
Histogram("mesmer_zio_forwarded_zio_fiber_lifetimes")
)

"ZIO example" should {
"produce both runtime and executor metrics" in withExample("exampleZio/run") { prometheusApi =>
zioMetrics.foreach(assertMetricExists(prometheusApi)(_))
"produce both runtime and executor metrics" in withExample("exampleZio/run") { collectorApiBlock =>
assertMetricsExists(collectorApiBlock, "promexample")(zioMetrics)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.scalac.mesmer.e2e

import io.scalac.mesmer.e2e.E2ETest.OpenTelemetryCollectorApi
import org.scalatest.EitherValues
import org.scalatest.Suite
import org.scalatest.concurrent.Eventually
import org.scalatest.matchers.should.Matchers

import scala.jdk.CollectionConverters._

trait PrometheusExporterMetrics extends Eventually with Matchers with EitherValues { this: Suite =>
import PrometheusExporterMetrics._

def assertMetricsExists(collectorApi: OpenTelemetryCollectorApi, metricNamePrefix: String)(
metrics: Seq[Metric]
): Unit =
eventually {
collectorApi.assertMetrics { response =>
val expected = metrics.map {
case Metric.Counter(name) =>
s"# TYPE ${metricNamePrefix}_$name counter"
case Metric.Gauge(name) =>
s"# TYPE ${metricNamePrefix}_$name gauge"
case Metric.Histogram(name) =>
s"# TYPE ${metricNamePrefix}_$name histogram"
}

val actual = response
.lines()
.iterator()
.asScala
.filter(_.startsWith("# TYPE"))
.toSeq

val result = expected.diff(actual)

if (result.nonEmpty) {
fail(
s"Prometheus exporter missing expected metrics - [${result.mkString("[", ",", "]")}], \nexpected - ${expected
.mkString("[", ",", "]")}, \nactual - ${actual.mkString("[", ",", "]")}"
)
}
}
}

}

object PrometheusExporterMetrics {
sealed trait Metric {
val name: String
}
object Metric {
case class Counter(name: String) extends Metric
case class Gauge(name: String) extends Metric
case class Histogram(name: String) extends Metric
}
}
Loading

0 comments on commit 8dd2e6d

Please sign in to comment.