diff --git a/build.sbt b/build.sbt index ac33e64f9..8e0803b3e 100644 --- a/build.sbt +++ b/build.sbt @@ -129,6 +129,7 @@ lazy val instrumentation = (project in file("instrumentation")) `kamon-akka-http`, `kamon-play`, `kamon-okhttp`, + `kamon-armeria` ) @@ -423,6 +424,21 @@ lazy val `kamon-okhttp` = (project in file("instrumentation/kamon-okhttp")) ).dependsOn(`kamon-core`, `kamon-executors`, `kamon-testkit` % "test") +lazy val `kamon-armeria` = (project in file("instrumentation/kamon-armeria")) + .disablePlugins(AssemblyPlugin) + .enablePlugins(JavaAgent) + .settings(instrumentationSettings) + .settings( + libraryDependencies ++= Seq( + kanelaAgent % "provided", + "com.linecorp.armeria" % "armeria" % "1.1.0" % "provided", + + scalatest % "test", + okHttp % "test", + logbackClassic % "test" + ) + ).dependsOn(`kamon-instrumentation-common`, `kamon-testkit` % "test") + /** * Reporters */ @@ -637,4 +653,5 @@ val `kamon-bundle` = (project in file("bundle/kamon-bundle")) `kamon-akka-http` % "shaded", `kamon-play` % "shaded", `kamon-okhttp` % "shaded", - ) \ No newline at end of file + `kamon-armeria` % "shaded" +) diff --git a/instrumentation/kamon-armeria/src/main/resources/reference.conf b/instrumentation/kamon-armeria/src/main/resources/reference.conf new file mode 100644 index 000000000..06ea9dd1f --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/resources/reference.conf @@ -0,0 +1,254 @@ +# =================================== # +# kamon-armeria reference configuration # +# =================================== # + + +kamon.instrumentation.armeria { + + # Settings to control the HTTP Server instrumentation. + # + # IMPORTANT: Besides the "initial-operation-name" and "unhandled-operation-name" settings, the entire configuration of + # the HTTP Server Instrumentation is based on the constructs provided by the Kamon Instrumentation Common library + # which will always fallback to the settings found under the "kamon.instrumentation.http-server.default" path. The + # default settings have been included here to make them easy to find and understand in the context of this project and + # commented out so that any changes to the default settings will actually have effect. + # + http-server { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + + + } + + + # + # Configuration for HTTP server metrics collection. + # + metrics { + + # Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming + # that the instrumentation is fully compliant: + # + # - http.server.requets + # - http.server.request.active + # - http.server.request.size + # - http.server.response.size + # - http.server.connection.lifetime + # - http.server.connection.usage + # - http.server.connection.open + # + # All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests + # metric will also have a status_code tag with the status code group (1xx, 2xx and so on). + # + #enabled = yes + } + + + # + # Configuration for HTTP request tracing. + # + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests + # and finish them when the response is sent back to the clients. + #enabled = yes + + # Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used + # only if all these conditions are met: + # - the context tag is present. + # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). + # - the identifier is valid in accordance to the identity provider. + #preferred-trace-id-tag = "none" + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span crekamon.trace.sampler = alwaysated by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + # Controls writing trace and span identifiers to HTTP response headers sent by the instrumented servers. The + # configuration can be set to either "none" to disable writing the identifiers on the response headers or to + # the header name to be used when writing the identifiers. + response-headers { + + # HTTP response header name for the trace identifier, or "none" to disable it. + #trace-id = "trace-id" + + # HTTP response header name for the server span identifier, or "none" to disable it. + #span-id = none + } + + # Custom mappings between routes and operation names. + operations { + + # The default operation name to be used when creating Spans to handle the HTTP server requests. In most + # cases it is not possible to define an operation name right at the moment of starting the HTTP server Span + # and in those cases, this operation name will be initially assigned to the Span. Instrumentation authors + # should do their best effort to provide a suitable operation name or make use of the "mappings" facilities. + default = "http.server.request" + + # The operation name to be assigned when an application cannot find any route/endpoint/controller to handle + # a given request. Depending on the instrumented framework, it might be possible to apply this operation + # name automatically or not, check the frameworks' instrumentation docs for more details. + unhandled = "unhandled" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - default: Uses the set default operation name + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.armeria.instrumentation.server.KamonArmeriaOperationNameGenerator" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + + } + } + } + } + + # Settings to control the HTTP Client instrumentation + # + # IMPORTANT: The entire configuration of the HTTP Client Instrumentation is based on the constructs provided by the + # Kamon Instrumentation Common library which will always fallback to the settings found under the + # "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to + # find and understand in the context of this project and commented out so that any changes to the default settings + # will actually have effect. + # + http-client { + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + } + + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests + # and finish them when the response is received from the server. + enabled = yes + + # Enables collection of span metrics using the `span.processing-time` metric. + span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + url = span + + # Use the http.method tag. + method = metric + + # Use the http.status_code tag. + status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + operations { + + # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP + # Client instrumentation will always try to use the HTTP Operation Name Generator configured bellow to get + # a name, but if it fails to generate it then this name will be used. + default = "http.client.request" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - hostname: Uses the request Host as the operation name. + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.armeria.instrumentation.client.KamonArmeriaOperationNameGenerator" + } + } + } +} + +kanela { + show-banner = false + modules { + armeria { + name = "Armeria instrumentation" + stoppable = true + instrumentations = [ + "kamon.armeria.instrumentation.server.ArmeriaHttpServerInstrumentation", + "kamon.armeria.instrumentation.client.ArmeriaHttpClientInstrumentation" + ] + within = [ "io.netty..*", "com.linecorp.armeria..*"] + } + } +} diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/BaseKamonArmeriaOperationNameGenerator.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/BaseKamonArmeriaOperationNameGenerator.scala new file mode 100644 index 000000000..79c045610 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/BaseKamonArmeriaOperationNameGenerator.scala @@ -0,0 +1,49 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation + +import kamon.instrumentation.http.HttpMessage.Request +import kamon.instrumentation.http.HttpOperationNameGenerator + +import scala.collection.concurrent.TrieMap + +trait BaseKamonArmeriaOperationNameGenerator extends HttpOperationNameGenerator { + + private val localCache = TrieMap.empty[String, String] + private val normalizePattern = """\$([^<]+)<[^>]+>""".r + + def name(request: Request): Option[String] = + Some( + localCache.getOrElseUpdate(key(request), { + // Convert paths of form GET /foo/bar/$paramname/blah to foo.bar.paramname.blah.get + val normalisedPath = normalisePath(request.path) + name(request, normalisedPath) + }) + ) + + protected def name(request: Request, normalisedPath: String): String + + protected def key(request: Request): String + + private def normalisePath(path: String): String = { + val p = normalizePattern.replaceAllIn(path, "$1").replace('/', '.').dropWhile(_ == '.') + val normalisedPath = { + if (p.lastOption.exists(_ != '.')) s"$p." + else p + } + normalisedPath + } +} diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientInstrumentation.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientInstrumentation.scala new file mode 100644 index 000000000..add98a324 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientInstrumentation.scala @@ -0,0 +1,74 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.client + +import java.util.concurrent.Executors + +import com.linecorp.armeria.client.{ClientBuilder, ClientRequestContext, DecoratingHttpClientFunction, HttpClient} +import com.linecorp.armeria.common.{HttpRequest, HttpResponse} +import kamon.Kamon +import kamon.armeria.instrumentation.client.ArmeriaHttpClientTracing.{getRequestBuilder, toKamonResponse} +import kamon.armeria.instrumentation.client.timing.Timing.takeTimings +import kamon.armeria.instrumentation.converters.FutureConverters +import kamon.instrumentation.http.HttpClientInstrumentation +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.libs.net.bytebuddy.asm.Advice + +import scala.concurrent.ExecutionContext +import scala.util.control.NonFatal + +class ArmeriaHttpClientInstrumentation extends InstrumentationBuilder { + onType("com.linecorp.armeria.client.ClientBuilder") + .advise(isConstructor, classOf[ArmeriaHttpClientBuilderAdvisor]) +} + +class ArmeriaHttpClientBuilderAdvisor + +object ArmeriaHttpClientBuilderAdvisor { + @Advice.OnMethodExit(suppress = classOf[Throwable]) + def addKamonDecorator(@Advice.This builder: ClientBuilder): Unit = { + builder.decorator(new KamonArmeriaDecoratingFunction()) + } +} + + +class KamonArmeriaDecoratingFunction extends DecoratingHttpClientFunction with FutureConverters { + private implicit val ec = ExecutionContext.fromExecutor(Executors.newCachedThreadPool()) + + private val httpClientConfig = Kamon.config.getConfig("kamon.instrumentation.armeria.http-client") + private val instrumentation = HttpClientInstrumentation.from(httpClientConfig, "armeria-http-client") + + override def execute(delegate: HttpClient, ctx: ClientRequestContext, req: HttpRequest): HttpResponse = { + val requestHandler = instrumentation.createHandler(getRequestBuilder(req), Kamon.currentContext) + + try { + ctx.log() + .whenComplete() + .toScala + .foreach(log => { + takeTimings(log, requestHandler.span) + requestHandler.processResponse(toKamonResponse(log)) + }) + delegate.execute(ctx, requestHandler.request) + } catch { + case NonFatal(error) => + requestHandler.span.fail(error) + requestHandler.span.finish() + throw error + } + } +} + diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientTracing.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientTracing.scala new file mode 100644 index 000000000..30e815e25 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientTracing.scala @@ -0,0 +1,77 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.client + +import com.linecorp.armeria.common.HttpRequest +import com.linecorp.armeria.common.logging.RequestLog +import kamon.context.HttpPropagation.HeaderWriter +import kamon.instrumentation.http.HttpMessage + +import scala.collection.immutable.Map +import scala.collection.{JavaConverters, mutable} + +object ArmeriaHttpClientTracing { + + def getRequestBuilder(request: HttpRequest): HttpMessage.RequestBuilder[HttpRequest] = new HttpMessage.RequestBuilder[HttpRequest]() { + private val _headers = mutable.Map[String, String]() + + override def read(header: String): Option[String] = Option(request.headers().get(header)) + + override def readAll: Map[String, String] = { + JavaConverters + .asScalaIteratorConverter(request.headers().iterator()) + .asScala + .map(entry => (entry.getKey.toString, entry.getValue)) + .toMap + + } + + override def url: String = request.uri().toString + + override def path: String = request.uri().getPath + + override def method: String = request.method().name() + + override def host: String = request.uri().getHost + + override def port: Int = request.uri().getPort + + override def write(header: String, value: String): Unit = { + _headers += (header -> value) + } + + override def build: HttpRequest = { + val newHeadersMap = request.headers.toBuilder + _headers.foreach { case (key, value) => newHeadersMap.add(key, value) } + request.withHeaders(newHeadersMap) + } + } + + def toKamonResponse(reqLog: RequestLog): HttpMessage.Response = new HttpMessage.Response() { + override def statusCode: Int = reqLog.responseHeaders().status().code() + } + + trait HeaderHandler extends HeaderWriter { + private val _headers = mutable.Map[String, String]() + + override def write(header: String, value: String): Unit = { + _headers += (header -> value) + } + + def headers: mutable.Map[String, String] = _headers + } + +} diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/KamonArmeriaOperationNameGenerator.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/KamonArmeriaOperationNameGenerator.scala new file mode 100644 index 000000000..4b07dafd8 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/KamonArmeriaOperationNameGenerator.scala @@ -0,0 +1,42 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.client + +import java.util.Locale + +import kamon.armeria.instrumentation.BaseKamonArmeriaOperationNameGenerator +import kamon.instrumentation.http.HttpMessage.Request + +/** + * A GET request to https://github.com/kamon-io/Kamon will generate the following operationName + * github.com.kamon-io.Kamon.get + */ +class KamonArmeriaOperationNameGenerator extends BaseKamonArmeriaOperationNameGenerator { + + override protected def name(request: Request, normalisedPath: String): String = + s"${request.host}.$normalisedPath${request.method.toLowerCase(Locale.ENGLISH)}" + + override protected def key(request: Request): String = + s"${request.host}${request.path}${request.method}" +} + +object KamonArmeriaOperationNameGenerator { + def apply() = new KamonArmeriaOperationNameGenerator() +} + + + + diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/timing/Timing.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/timing/Timing.scala new file mode 100644 index 000000000..65b7f9073 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/client/timing/Timing.scala @@ -0,0 +1,69 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.client.timing + +import java.util.concurrent.TimeUnit + +import com.linecorp.armeria.common.logging.RequestLog +import kamon.Kamon +import kamon.trace.Span + +/** + * Based on Armeria Brave Client implementation + * https://github.com/line/armeria/blob/master/brave/src/main/java/com/linecorp/armeria/client/brave/BraveClient.java + */ +object Timing { + + def takeTimings(log: RequestLog, span: Span): Unit = { + Option(log.connectionTimings()).foreach(timings => { + + logTiming(span, "connection-acquire.start", "connection-acquire.end", + timings.connectionAcquisitionStartTimeMicros(), + timings.connectionAcquisitionDurationNanos()); + + if (timings.dnsResolutionDurationNanos() != -1) { + logTiming(span, "dns-resolve.start", "dns-resolve.end", + timings.dnsResolutionStartTimeMicros(), + timings.dnsResolutionDurationNanos()); + } + + if (timings.socketConnectDurationNanos() != -1) { + logTiming(span, "socket-connect.start", "socket-connect.end", + timings.socketConnectStartTimeMicros(), + timings.socketConnectDurationNanos()); + } + + if (timings.pendingAcquisitionDurationNanos() != -1) { + logTiming(span, "connection-reuse.start", "connection-reuse.end", + timings.pendingAcquisitionStartTimeMicros(), + timings.pendingAcquisitionDurationNanos()); + } + }) + } + + private def logTiming(span: Span, + startName: String, + endName: String, + startTimeMicros: Long, + durationNanos: Long): Unit = { + + val startTimeNanos = TimeUnit.NANOSECONDS.convert(startTimeMicros, TimeUnit.MICROSECONDS) + + span.mark(startName, Kamon.clock().toInstant(startTimeNanos)) + span.mark(endName, Kamon.clock().toInstant(startTimeNanos + durationNanos)) + } + +} diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/converters/FutureConverters.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/converters/FutureConverters.scala new file mode 100644 index 000000000..f9e18538e --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/converters/FutureConverters.scala @@ -0,0 +1,45 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.converters + +import java.util.concurrent.CompletionStage + +import scala.concurrent.Future +import scala.concurrent.FutureConvertersImpl.{CF, P} + +trait FutureConverters { + + implicit class FutureConverterOps[T](cs: CompletionStage[T]) { + /** + * Taken from https://github.com/scala/scala-java8-compat/blob/master/src/main/scala/scala/compat/java8/FutureConverters.scala + * @return + */ + def toScala: Future[T] = { + cs match { + case cf: CF[T] => cf.wrapped + case _ => + val p = new P[T](cs) + cs whenComplete p + p.future + } + } + + } + +} + + + diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/ArmeriaHttpServerInstrumentation.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/ArmeriaHttpServerInstrumentation.scala new file mode 100644 index 000000000..ed936821d --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/ArmeriaHttpServerInstrumentation.scala @@ -0,0 +1,104 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.server + +import com.linecorp.armeria.common.HttpStatus.NOT_FOUND +import com.linecorp.armeria.server +import com.linecorp.armeria.server._ +import io.netty.channel.{Channel, ChannelPipeline} +import kamon.Kamon +import kamon.instrumentation.http.HttpServerInstrumentation +import kanela.agent.api.instrumentation.InstrumentationBuilder +import kanela.agent.api.instrumentation.bridge.FieldBridge +import kanela.agent.libs.net.bytebuddy.asm.Advice + +class ArmeriaHttpServerInstrumentation extends InstrumentationBuilder { + onSubTypesOf("io.netty.channel.Channel") + .mixin(classOf[HasRequestProcessingContextMixin]) + + onType("com.linecorp.armeria.server.HttpServerPipelineConfigurator") + .bridge(classOf[HttpPipelineConfiguratorInternalState]) + .advise(method("configureHttp"), classOf[ConfigureMethodAdvisor]) + + onType("com.linecorp.armeria.server.FallbackService") + .advise(method("serve"), classOf[ServeMethodAdvisor]) + + onType("com.linecorp.armeria.server.DefaultServiceRequestContext") + .bridge(classOf[ServiceRequestContextInternalState]) + +} + +class ConfigureMethodAdvisor + +object ConfigureMethodAdvisor { + + @Advice.OnMethodExit + def around(@Advice.This configurer: Object, + @Advice.Argument(0) p: ChannelPipeline): Unit = { + val serverPort = configurer.asInstanceOf[HttpPipelineConfiguratorInternalState].getServerPort + val hostName = serverPort.localAddress().getHostName + val port = serverPort.localAddress().getPort + + lazy val httpServerConfig = Kamon.config().getConfig("kamon.instrumentation.armeria.http-server") + lazy val serverInstrumentation = HttpServerInstrumentation.from(httpServerConfig, "armeria-http-server", hostName, port) + + p.addBefore("HttpServerHandler#0", "armeria-http-server-request-handler", ArmeriaHttpServerRequestHandler(serverInstrumentation)) + p.addLast("armeria-http-server-response-handler", ArmeriaHttpServerResponseHandler(serverInstrumentation)) + } +} + +trait HttpPipelineConfiguratorInternalState { + @FieldBridge("port") + def getServerPort: ServerPort +} + +class ServeMethodAdvisor + +object ServeMethodAdvisor { + /** + * When an HttpStatusException is thrown in {@link com.linecorp.armeria.server.FallbackService.handleNotFound()} is because the route doesn't exist + * so we must set unhandled operation name and we'll do it in {@link server.ArmeriaHttpServerResponseHandler.write()} + */ + @Advice.OnMethodExit(onThrowable = classOf[HttpStatusException]) + def around(@Advice.Argument(0) ctx: ServiceRequestContext with ServiceRequestContextInternalState, + @Advice.Thrown statusException: HttpStatusException): Unit = { + val processingContext = ctx.getChannel.asInstanceOf[HasRequestProcessingContext].getRequestProcessingContext + if (statusException.httpStatus.code() == NOT_FOUND.code()) { + processingContext.requestHandler.span.name("") + } + } +} + +trait ServiceRequestContextInternalState { + @FieldBridge("ch") + def getChannel: Channel +} + +trait HasRequestProcessingContext { + def setRequestProcessingContext(requestProcessingContext: RequestProcessingContext): Unit + + def getRequestProcessingContext: RequestProcessingContext +} + +class HasRequestProcessingContextMixin extends HasRequestProcessingContext { + @volatile var _requestProcessingContext: RequestProcessingContext = _ + + override def setRequestProcessingContext(requestProcessing: RequestProcessingContext): Unit = + _requestProcessingContext = requestProcessing + + override def getRequestProcessingContext: RequestProcessingContext = + _requestProcessingContext +} diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/KamonArmeriaOperationNameGenerator.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/KamonArmeriaOperationNameGenerator.scala new file mode 100644 index 000000000..02484b714 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/KamonArmeriaOperationNameGenerator.scala @@ -0,0 +1,40 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.server + +import java.util.Locale + +import kamon.armeria.instrumentation.BaseKamonArmeriaOperationNameGenerator +import kamon.instrumentation.http.HttpMessage.Request +/** + * A GET request to https://localhost:8080/kamon-io/Kamon will generate the following operationName + * kamon-io.Kamon.get + */ +class KamonArmeriaOperationNameGenerator extends BaseKamonArmeriaOperationNameGenerator { + + override protected def name(request: Request, normalisedPath: String): String = + s"$normalisedPath${request.method.toLowerCase(Locale.ENGLISH)}" + + override protected def key(request: Request): String = + s"${request.method}${request.path}" +} + +object KamonArmeriaOperationNameGenerator { + def apply() = new KamonArmeriaOperationNameGenerator() +} + + + diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/handlers/ArmeriaHttpServerRequestHandler.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/handlers/ArmeriaHttpServerRequestHandler.scala new file mode 100644 index 000000000..1ce920e9c --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/handlers/ArmeriaHttpServerRequestHandler.scala @@ -0,0 +1,63 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package com.linecorp.armeria.server + +import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} +import kamon.Kamon +import kamon.armeria.instrumentation.server.{HasRequestProcessingContext, RequestProcessingContext} +import kamon.instrumentation.http.{HttpMessage, HttpServerInstrumentation} + +import scala.collection.JavaConverters.iterableAsScalaIterableConverter + +final class ArmeriaHttpServerRequestHandler(serverInstrumentation: HttpServerInstrumentation) extends ChannelInboundHandlerAdapter { + + override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = { + if (!msg.isInstanceOf[DecodedHttpRequest]) ctx.fireChannelRead(msg) + else { + val processingContext = ctx.channel().asInstanceOf[HasRequestProcessingContext] + val request = msg.asInstanceOf[DecodedHttpRequest] + val serverRequestHandler = serverInstrumentation.createHandler(toRequest(request, serverInstrumentation.interface(), serverInstrumentation.port())) + + processingContext.setRequestProcessingContext(RequestProcessingContext(serverRequestHandler, Kamon.storeContext(serverRequestHandler.context))) + + ctx.fireChannelRead(msg) + } + } + + private def toRequest(request: DecodedHttpRequest, serverHost: String, serverPort: Int): HttpMessage.Request = new HttpMessage.Request { + + override def url: String = request.uri().toString + + override def path: String = request.path() + + override def method: String = request.method().name() + + override def host: String = serverHost + + override def port: Int = serverPort + + override def read(header: String): Option[String] = + Option(request.headers().get(header)) + + override def readAll(): Map[String, String] = + request.headers().asScala.map(e => e.getKey.toString() -> e.getValue).toMap + } +} + +object ArmeriaHttpServerRequestHandler { + def apply(serverInstrumentation: HttpServerInstrumentation): ArmeriaHttpServerRequestHandler = + new ArmeriaHttpServerRequestHandler(serverInstrumentation) +} diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/handlers/ArmeriaHttpServerResponseHandler.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/handlers/ArmeriaHttpServerResponseHandler.scala new file mode 100644 index 000000000..4b5704d22 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/handlers/ArmeriaHttpServerResponseHandler.scala @@ -0,0 +1,65 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package com.linecorp.armeria.server + +import com.linecorp.armeria.common.HttpStatus.NOT_FOUND +import io.netty.channel.{ChannelHandlerContext, ChannelOutboundHandlerAdapter, ChannelPromise} +import io.netty.handler.codec.http.HttpResponse +import kamon.armeria.instrumentation.server.HasRequestProcessingContext +import kamon.instrumentation.http.{HttpMessage, HttpServerInstrumentation} + +final class ArmeriaHttpServerResponseHandler(serverInstrumentation: HttpServerInstrumentation) extends ChannelOutboundHandlerAdapter { + + override def write(ctx: ChannelHandlerContext, msg: Any, promise: ChannelPromise): Unit = { + if (!msg.isInstanceOf[HttpResponse]) ctx.write(msg, promise) + else { + val response = msg.asInstanceOf[HttpResponse] + val processingContext = ctx.channel().asInstanceOf[HasRequestProcessingContext].getRequestProcessingContext + + /** + * processingContext.requestHandler.span.operationName() will be empty if an HttpStatusException is thrown + * see {@link kamon.armeria.instrumentation.server.ServeMethodAdvisor.around()} + */ + if (response.status().code() == NOT_FOUND.code() && processingContext.requestHandler.span.operationName().isEmpty) { + processingContext.requestHandler.span.name(serverInstrumentation.settings.unhandledOperationName) + } + + processingContext.requestHandler.buildResponse(toResponse(response), processingContext.scope.context) + + try ctx.write(msg, promise) finally { + processingContext.requestHandler.responseSent() + processingContext.scope.close() + } + } + } + + private def toResponse(response: HttpResponse): HttpMessage.ResponseBuilder[HttpResponse] = new HttpMessage.ResponseBuilder[HttpResponse] { + override def build(): HttpResponse = + response + + override def statusCode: Int = + response.status().code() + + override def write(header: String, value: String): Unit = + response.headers().add(header, value) + } +} + +object ArmeriaHttpServerResponseHandler { + def apply(serverInstrumentation: HttpServerInstrumentation): ArmeriaHttpServerResponseHandler = + new ArmeriaHttpServerResponseHandler(serverInstrumentation) +} + diff --git a/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/package.scala b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/package.scala new file mode 100644 index 000000000..44812ab8a --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/kamon/armeria/instrumentation/server/package.scala @@ -0,0 +1,25 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation + +import kamon.context.Storage +import kamon.instrumentation.http.HttpServerInstrumentation.RequestHandler + +package object server { + + final case class RequestProcessingContext(requestHandler: RequestHandler, scope: Storage.Scope) + +} diff --git a/instrumentation/kamon-armeria/src/main/scala/scala/concurrent/FutureConvertersImpl.scala b/instrumentation/kamon-armeria/src/main/scala/scala/concurrent/FutureConvertersImpl.scala new file mode 100644 index 000000000..6e8589679 --- /dev/null +++ b/instrumentation/kamon-armeria/src/main/scala/scala/concurrent/FutureConvertersImpl.scala @@ -0,0 +1,43 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package scala.concurrent + +import java.util.concurrent.{CompletableFuture, CompletionStage} +import java.util.function.BiConsumer + +import scala.concurrent.impl.Promise.DefaultPromise +import scala.util.{Failure, Success, Try} + +/** + * Taken from https://github.com/scala/scala-java8-compat/blob/master/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala + */ +object FutureConvertersImpl { + + class CF[T](val wrapped: Future[T]) extends CompletableFuture[T] with (Try[T] => Unit) { + override def apply(t: Try[T]): Unit = t match { + case Success(v) => complete(v) + case Failure(e) => completeExceptionally(e) + } + } + + class P[T](val wrapped: CompletionStage[T]) extends DefaultPromise[T] with BiConsumer[T, Throwable] { + override def accept(v: T, e: Throwable): Unit = { + if (e == null) complete(Success(v)) + else complete(Failure(e)) + } + } + +} diff --git a/instrumentation/kamon-armeria/src/test/resources/application.conf b/instrumentation/kamon-armeria/src/test/resources/application.conf new file mode 100644 index 000000000..f6b889f36 --- /dev/null +++ b/instrumentation/kamon-armeria/src/test/resources/application.conf @@ -0,0 +1,255 @@ +# =================================== # +# kamon-armeria reference configuration # +# =================================== # + + +kamon.instrumentation.armeria { + + # Settings to control the HTTP Server instrumentation. + # + # IMPORTANT: Besides the "initial-operation-name" and "unhandled-operation-name" settings, the entire configuration of + # the HTTP Server Instrumentation is based on the constructs provided by the Kamon Instrumentation Common library + # which will always fallback to the settings found under the "kamon.instrumentation.http-server.default" path. The + # default settings have been included here to make them easy to find and understand in the context of this project and + # commented out so that any changes to the default settings will actually have effect. + # + http-server { + + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + #enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + + + } + + + # + # Configuration for HTTP server metrics collection. + # + metrics { + + # Enables collection of HTTP server metrics. When enabled the following metrics will be collected, assuming + # that the instrumentation is fully compliant: + # + # - http.server.requets + # - http.server.request.active + # - http.server.request.size + # - http.server.response.size + # - http.server.connection.lifetime + # - http.server.connection.usage + # - http.server.connection.open + # + # All metrics have at least three tags: component, interface and port. Additionally, the http.server.requests + # metric will also have a status_code tag with the status code group (1xx, 2xx and so on). + # + #enabled = yes + } + + + # + # Configuration for HTTP request tracing. + # + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for incoming requests + # and finish them when the response is sent back to the clients. + #enabled = yes + + # Select a context tag that provides a preferred trace identifier. The preferred trace identifier will be used + # only if all these conditions are met: + # - the context tag is present. + # - there is no parent Span on the incoming context (i.e. this is the first service on the trace). + # - the identifier is valid in accordance to the identity provider. + #preferred-trace-id-tag = "none" + + # Enables collection of span metrics using the `span.processing-time` metric. + #span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + #url = span + + # Use the http.method tag. + #method = metric + + # Use the http.status_code tag. + #status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span crekamon.trace.sampler = alwaysated by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + # Controls writing trace and span identifiers to HTTP response headers sent by the instrumented servers. The + # configuration can be set to either "none" to disable writing the identifiers on the response headers or to + # the header name to be used when writing the identifiers. + response-headers { + + # HTTP response header name for the trace identifier, or "none" to disable it. + #trace-id = "trace-id" + + # HTTP response header name for the server span identifier, or "none" to disable it. + #span-id = none + } + + # Custom mappings between routes and operation names. + operations { + + # The default operation name to be used when creating Spans to handle the HTTP server requests. In most + # cases it is not possible to define an operation name right at the moment of starting the HTTP server Span + # and in those cases, this operation name will be initially assigned to the Span. Instrumentation authors + # should do their best effort to provide a suitable operation name or make use of the "mappings" facilities. + default = "http.server.request" + + # The operation name to be assigned when an application cannot find any route/endpoint/controller to handle + # a given request. Depending on the instrumented framework, it might be possible to apply this operation + # name automatically or not, check the frameworks' instrumentation docs for more details. + unhandled = "unhandled" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - default: Uses the set default operation name + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.armeria.instrumentation.server.KamonArmeriaOperationNameGenerator" + + # Provides custom mappings from HTTP paths into operation names. Meant to be used in cases where the bytecode + # instrumentation is not able to provide a sensible operation name that is free of high cardinality values. + # For example, with the following configuration: + # mappings { + # "/organization/*/user/*/profile" = "/organization/:orgID/user/:userID/profile" + # "/events/*/rsvps" = "EventRSVPs" + # } + # + # Requests to "/organization/3651/user/39652/profile" and "/organization/22234/user/54543/profile" will have + # the same operation name "/organization/:orgID/user/:userID/profile". + # + # Similarly, requests to "/events/aaa-bb-ccc/rsvps" and "/events/1234/rsvps" will have the same operation + # name "EventRSVPs". + # + # The patterns are expressed as globs and the operation names are free form. + # + mappings { + "/dummy-resources/*/other-resources/*" = "dummy-resources/{}/other-resources/{}" + } + } + } + } + + # Settings to control the HTTP Client instrumentation + # + # IMPORTANT: The entire configuration of the HTTP Client Instrumentation is based on the constructs provided by the + # Kamon Instrumentation Common library which will always fallback to the settings found under the + # "kamon.instrumentation.http-client.default" path. The default settings have been included here to make them easy to + # find and understand in the context of this project and commented out so that any changes to the default settings + # will actually have effect. + # + http-client { + # + # Configuration for HTTP context propagation. + # + propagation { + + # Enables or disables HTTP context propagation on this HTTP server instrumentation. Please note that if + # propagation is disabled then some distributed tracing features will not be work as expected (e.g. Spans can + # be created and reported but will not be linked across boundaries nor take trace identifiers from tags). + enabled = yes + + # HTTP propagation channel to b used by this instrumentation. Take a look at the kamon.propagation.http.default + # configuration for more details on how to configure the detault HTTP context propagation. + channel = "default" + } + + tracing { + + # Enables HTTP request tracing. When enabled the instrumentation will create Spans for outgoing requests + # and finish them when the response is received from the server. + enabled = yes + + # Enables collection of span metrics using the `span.processing-time` metric. + span-metrics = on + + # Select which tags should be included as span and span metric tags. The possible options are: + # - span: the tag is added as a Span tag (i.e. using span.tag(...)) + # - metric: the tag is added a a Span metric tag (i.e. using span.tagMetric(...)) + # - off: the tag is not used. + # + tags { + + # Use the http.url tag. + url = span + + # Use the http.method tag. + method = metric + + # Use the http.status_code tag. + status-code = metric + + # Copy tags from the context into the Spans with the specified purpouse. For example, to copy a customer_type + # tag from the context into the HTTP Server Span created by the instrumentation, the following configuration + # should be added: + # + # from-context { + # customer_type = span + # } + # + from-context { + + } + } + + operations { + + # The default operation name to be used when creating Spans to handle the HTTP client requests. The HTTP + # Client instrumentation will always try to use the HTTP Operation Name Generator configured bellow to get + # a name, but if it fails to generate it then this name will be used. + default = "http.client.request" + + # FQCN for a HttpOperationNameGenerator implementation, or ony of the following shorthand forms: + # - hostname: Uses the request Host as the operation name. + # - method: Uses the request HTTP method as the operation name. + # + name-generator = "kamon.armeria.instrumentation.client.KamonArmeriaOperationNameGenerator" + } + } + } + +} + +kanela { + show-banner = false + modules { + armeria { + name = "Armeria instrumentation" + stoppable = true + instrumentations = [ + "kamon.armeria.instrumentation.server.ArmeriaHttpServerInstrumentation", + "kamon.armeria.instrumentation.client.ArmeriaHttpClientInstrumentation" + ] + within = [ "io.netty..*", "com.linecorp.armeria..*"] + } + } +} diff --git a/instrumentation/kamon-armeria/src/test/resources/logback.xml b/instrumentation/kamon-armeria/src/test/resources/logback.xml new file mode 100644 index 000000000..a923265e7 --- /dev/null +++ b/instrumentation/kamon-armeria/src/test/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + diff --git a/instrumentation/kamon-armeria/src/test/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientTracingSpec.scala b/instrumentation/kamon-armeria/src/test/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientTracingSpec.scala new file mode 100644 index 000000000..3a2f01c28 --- /dev/null +++ b/instrumentation/kamon-armeria/src/test/scala/kamon/armeria/instrumentation/client/ArmeriaHttpClientTracingSpec.scala @@ -0,0 +1,228 @@ +package kamon.armeria.instrumentation.client + +import com.linecorp.armeria.client.{ClientFactory, Clients, WebClient} +import com.linecorp.armeria.common.{HttpMethod, HttpRequest, RequestHeaders} +import kamon.Kamon +import kamon.context.Context +import kamon.tag.Lookups.{plain, plainBoolean, plainLong} +import kamon.testkit.TestSpanReporter +import kamon.trace.Span +import kamon.trace.SpanPropagation.B3 +import org.scalatest.concurrent.Eventually +import org.scalatest.time.SpanSugar.convertIntToGrainOfTime +import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec} +import utils.ArmeriaServerSupport.startArmeriaServer +import utils.TestSupport.getResponseHeaders + +class ArmeriaHttpClientTracingSpec extends WordSpec + with Matchers + with BeforeAndAfterAll + with Eventually + with TestSpanReporter + with OptionValues { + + val customTag = "requestId" + val customHeaderName = "X-Request-Id" + + val interface = "127.0.0.1" + val httpPort = 8081 + + val httpServer = startArmeriaServer(httpPort) + + "The Armeria http client tracing instrumentation" should { + + "propagate the current context and generate a span around an async request" in { + val path = "/dummy" + val url = s"http://$interface:$httpPort" + + val okSpan = Kamon.spanBuilder("ok-async-operation-span").start() + val client = Clients.builder(url).build(classOf[WebClient]) + val request = HttpRequest.of(RequestHeaders.of(HttpMethod.GET, path)) + + val response = Kamon.runWithContext(Context.of(Span.Key, okSpan)) { + client.execute(request).aggregate().get() + } + + val span = eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe s"$interface.dummy.get" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "armeria-http-client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + span.metricTags.get(plainBoolean("error")) shouldBe false + span.tags.get(plain("http.url")) shouldBe s"$url$path" + okSpan.id shouldBe span.parentId + + testSpanReporter().nextSpan() shouldBe None + + span + } + + val responseHeaders = getResponseHeaders(response) + + responseHeaders.get(B3.Headers.TraceIdentifier.toLowerCase).value should be(span.trace.id.string) + responseHeaders.get(B3.Headers.SpanIdentifier.toLowerCase).value should be(span.id.string) + responseHeaders.get(B3.Headers.ParentSpanIdentifier.toLowerCase).value should be(span.parentId.string) + responseHeaders.get(B3.Headers.Sampled.toLowerCase).value should be("1") + + + } + + "propagate context tags" in { + val path = "/dummy" + val url = s"http://$interface:$httpPort" + + val okSpan = Kamon.spanBuilder("ok-span-with-extra-tags").start() + val client = Clients.builder(url).build(classOf[WebClient]) + val request = HttpRequest.of(RequestHeaders.of(HttpMethod.GET, path)) + + val response = Kamon.runWithContext(Context.of(Span.Key, okSpan).withTag(customTag, "1234")) { + client.execute(request).aggregate().get() + } + + val span: Span.Finished = eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe s"$interface.dummy.get" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "armeria-http-client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + span.metricTags.get(plainBoolean("error")) shouldBe false + span.tags.get(plain("http.url")) shouldBe s"$url$path" + span.tags.get(plain(customTag)) shouldBe "1234" + + okSpan.id == span.parentId + + testSpanReporter().nextSpan() shouldBe None + + span + } + + val responseHeaders = getResponseHeaders(response) + + responseHeaders.get(B3.Headers.TraceIdentifier.toLowerCase).value should be(span.trace.id.string) + responseHeaders.get(B3.Headers.SpanIdentifier.toLowerCase).value should be(span.id.string) + responseHeaders.get(B3.Headers.ParentSpanIdentifier.toLowerCase).value should be(span.parentId.string) + responseHeaders.get(B3.Headers.Sampled.toLowerCase).value should be("1") + responseHeaders.get(customHeaderName.toLowerCase).value should be("1234") + + } + + "mark span as failed when server response with 5xx on async execution" in { + val path = "/dummy-error" + val url = s"http://$interface:$httpPort" + + val okSpan = Kamon.spanBuilder("ok-async-operation-span").start() + val client = Clients.builder(url).build(classOf[WebClient]) + val request = HttpRequest.of(RequestHeaders.of(HttpMethod.GET, path)) + + val response = Kamon.runWithContext(Context.of(Span.Key, okSpan)) { + val response = client.execute(request) + response.aggregate().get() + } + + val span: Span.Finished = eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe s"$interface.dummy-error.get" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "armeria-http-client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainBoolean("error")) shouldBe true + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + span.tags.get(plain("http.url")) shouldBe s"$url$path" + + okSpan.id == span.parentId + + testSpanReporter().nextSpan() shouldBe None + + span + } + + val responseHeaders = getResponseHeaders(response) + + responseHeaders.get(B3.Headers.TraceIdentifier.toLowerCase).value should be(span.trace.id.string) + responseHeaders.get(B3.Headers.SpanIdentifier.toLowerCase).value should be(span.id.string) + responseHeaders.get(B3.Headers.ParentSpanIdentifier.toLowerCase).value should be(span.parentId.string) + responseHeaders.get(B3.Headers.Sampled.toLowerCase).value should be("1") + } + + "add timing marks to the generated span" in { + val path = "/dummy" + val url = s"http://$interface:$httpPort" + + val okSpan = Kamon.spanBuilder("ok-async-operation-span").start() + /** + * For testing purpose we override client idle timeout property so when + * client.timing.Timing#takeTimings(RequestLog, Span) is executed this condition + * log.connectionTimings() isn't null + */ + val clientFactory = ClientFactory.builder().idleTimeoutMillis(1).build() + val webClient = Clients.builder(url).build(classOf[WebClient]) + val client = clientFactory.newClient(webClient).asInstanceOf[WebClient] + + val request = HttpRequest.of(RequestHeaders.of(HttpMethod.GET, path)) + + val response = Kamon.runWithContext(Context.of(Span.Key, okSpan)) { + client.execute(request).aggregate().get() + } + + val span = eventually(timeout(3 seconds)) { + val span = testSpanReporter().nextSpan().value + + span.operationName shouldBe s"$interface.dummy.get" + span.kind shouldBe Span.Kind.Client + span.metricTags.get(plain("component")) shouldBe "armeria-http-client" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200 + span.metricTags.get(plainBoolean("error")) shouldBe false + span.tags.get(plain("http.url")) shouldBe s"$url$path" + okSpan.id shouldBe span.parentId + + span.marks.map(_.key) should contain allOf( + "connection-acquire.start", + "connection-acquire.end", + "socket-connect.start", + "socket-connect.end" + ) + + testSpanReporter().nextSpan() shouldBe None + + span + } + + val responseHeaders = getResponseHeaders(response) + + responseHeaders.get(B3.Headers.TraceIdentifier.toLowerCase).value should be(span.trace.id.string) + responseHeaders.get(B3.Headers.SpanIdentifier.toLowerCase).value should be(span.id.string) + responseHeaders.get(B3.Headers.ParentSpanIdentifier.toLowerCase).value should be(span.parentId.string) + responseHeaders.get(B3.Headers.Sampled.toLowerCase).value should be("1") + + } + + } + + + override protected def beforeAll(): Unit = { + applyConfig( + s""" + |kamon { + | propagation.http.default.tags.mappings { + | $customTag = $customHeaderName + | } + | instrumentation.http-client.default.tracing.tags.from-context { + | $customTag = span + | } + |} + |""".stripMargin) + enableFastSpanFlushing() + sampleAlways() + } + + + override protected def afterAll(): Unit = { + httpServer.close() + } +} diff --git a/instrumentation/kamon-armeria/src/test/scala/kamon/armeria/instrumentation/server/ArmeriaHttpServerTracingSpec.scala b/instrumentation/kamon-armeria/src/test/scala/kamon/armeria/instrumentation/server/ArmeriaHttpServerTracingSpec.scala new file mode 100644 index 000000000..5062da347 --- /dev/null +++ b/instrumentation/kamon-armeria/src/test/scala/kamon/armeria/instrumentation/server/ArmeriaHttpServerTracingSpec.scala @@ -0,0 +1,143 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package kamon.armeria.instrumentation.server + +import kamon.tag.Lookups.{plain, plainBoolean, plainLong} +import kamon.testkit._ +import okhttp3.{OkHttpClient, Request} +import org.scalatest.OptionValues.convertOptionToValuable +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import utils.ArmeriaServerSupport.startArmeriaServer +import utils.TestEndpoints._ + +import scala.concurrent.duration._ + +class ArmeriaHttpServerTracingSpec extends WordSpec + with Matchers + with BeforeAndAfterAll + with Eventually + with TestSpanReporter { + + private val okHttp = new OkHttpClient.Builder().build() + + val interface = "127.0.0.1" + val httpPort = 8081 + + private val httpServer = startArmeriaServer(httpPort) + + testSuite("http", interface, httpPort) + + private def testSuite(protocol: String, interface: String, port: Int): Unit = { + + s"The Armeria $protocol server" should { + + "create a server Span when receiving requests" in { + val target = s"$protocol://$interface:$port/$dummyPath" + val expected = "dummy.get" + okHttp.newCall(new Request.Builder().url(target).get().build()).execute().close() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "armeria-http-server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainLong("http.status_code")) shouldBe 200L + } + } + + "set operation name with unhandled" when { + "request path doesn't exists" in { + val target = s"$protocol://$interface:$port/$dummyNotFoundPath" + val expected = "unhandled" + okHttp.newCall(new Request.Builder().url(target).get().build()).execute().close() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + println(span.operationName) + span.operationName shouldBe expected + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "armeria-http-server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainBoolean("error")) shouldBe false + span.metricTags.get(plainLong("http.status_code")) shouldBe 404 + } + } + } + + "set operation name with path + http method" when { + "resource doesn't exist" in { + val target = s"$protocol://$interface:$port/$dummyResourceNotFoundPath" + val expected = "dummy-resource-not-found.get" + okHttp.newCall(new Request.Builder().url(target).get().build()).execute().close() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "armeria-http-server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainBoolean("error")) shouldBe false + span.metricTags.get(plainLong("http.status_code")) shouldBe 404 + } + } + } + + "not include path variables names" in { + val target = s"$protocol://$interface:$port/$dummyMultipleResourcesPath" + val expected = "dummy-resources/{}/other-resources/{}" + okHttp.newCall(new Request.Builder().url(target).get().build()).execute().close() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + } + } + + "not fail when request url contains special regexp chars" in { + val target = s"$protocol://$interface:$port/$dummyMultipleResourcesPath**" + val expected = "dummy-resources/{}/other-resources/{}" + val response = okHttp.newCall(new Request.Builder().url(target).build()).execute() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.operationName shouldBe expected + response.code() shouldBe 200 + } + response.close() + } + + "mark spans as failed when request fails" in { + val target = s"$protocol://$interface:$port/$dummyErrorPath" + val expected = s"$dummyErrorPath.get" + okHttp.newCall(new Request.Builder().url(target).build()).execute().close() + + eventually(timeout(10 seconds)) { + val span = testSpanReporter().nextSpan().value + span.tags.get(plain("http.url")) shouldBe target + span.metricTags.get(plain("component")) shouldBe "armeria-http-server" + span.metricTags.get(plain("http.method")) shouldBe "GET" + span.metricTags.get(plainBoolean("error")) shouldBe true + span.metricTags.get(plainLong("http.status_code")) shouldBe 500 + } + } + } + } + + override protected def afterAll(): Unit = + httpServer.close() +} diff --git a/instrumentation/kamon-armeria/src/test/scala/utils/ArmeriaServerSupport.scala b/instrumentation/kamon-armeria/src/test/scala/utils/ArmeriaServerSupport.scala new file mode 100644 index 000000000..b536001f7 --- /dev/null +++ b/instrumentation/kamon-armeria/src/test/scala/utils/ArmeriaServerSupport.scala @@ -0,0 +1,40 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package utils + +import java.net.InetSocketAddress + +import com.linecorp.armeria.server.Server +import com.linecorp.armeria.server.healthcheck.HealthCheckService + +object ArmeriaServerSupport { + + def startArmeriaServer(port: Int, https: Boolean = false): Server = { + val server = Server + .builder() + .service("/health-check", HealthCheckService.of()) + .annotatedService().build(TestRoutesSupport()) + .http(InetSocketAddress.createUnresolved("localhost", port)) + .build() + + server + .start() + .join() + + server + } + +} diff --git a/instrumentation/kamon-armeria/src/test/scala/utils/TestRoutesSupport.scala b/instrumentation/kamon-armeria/src/test/scala/utils/TestRoutesSupport.scala new file mode 100644 index 000000000..e0493de73 --- /dev/null +++ b/instrumentation/kamon-armeria/src/test/scala/utils/TestRoutesSupport.scala @@ -0,0 +1,58 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package utils + +import com.linecorp.armeria.common.{HttpRequest, HttpResponse, HttpStatus, ResponseHeaders} +import com.linecorp.armeria.server.annotation.{Get, Param} + +final class TestRoutesSupport { + @Get("/dummy") + def getDummy(req: HttpRequest): HttpResponse = { + val responseHeaders = ResponseHeaders.builder(HttpStatus.OK).add(req.headers()).build() + HttpResponse.of(responseHeaders) + } + + @Get("/dummy-resources/{resource}/other-resources/{other}") + def getResource(@Param("resource") resource: String, @Param("other") other: String): HttpResponse = { + println(s"Received a request to retrieve resource $resource and $other") + HttpResponse.of(HttpStatus.OK) + } + + @Get("/dummy-error") + def getDummyError(req: HttpRequest): HttpResponse = { + val responseHeaders = ResponseHeaders.builder(HttpStatus.INTERNAL_SERVER_ERROR).add(req.headers()).build() + HttpResponse.of(responseHeaders) + } + + @Get("/dummy-resource-not-found") + def getDummyResourceNotFound(req: HttpRequest): HttpResponse = { + val responseHeaders = ResponseHeaders.builder(HttpStatus.NOT_FOUND).add(req.headers()).build() + HttpResponse.of(responseHeaders) + } + +} + +object TestRoutesSupport { + def apply(): TestRoutesSupport = new TestRoutesSupport() +} + +object TestEndpoints { + val dummyPath = "dummy" + val dummyErrorPath = "dummy-error" + val dummyMultipleResourcesPath = "dummy-resources/a/other-resources/b" + val dummyNotFoundPath = "dummy-not-found" + val dummyResourceNotFoundPath = "dummy-resource-not-found" +} diff --git a/instrumentation/kamon-armeria/src/test/scala/utils/TestSupport.scala b/instrumentation/kamon-armeria/src/test/scala/utils/TestSupport.scala new file mode 100644 index 000000000..3525b8239 --- /dev/null +++ b/instrumentation/kamon-armeria/src/test/scala/utils/TestSupport.scala @@ -0,0 +1,33 @@ +/* ========================================================================================= + * Copyright © 2013-2020 the kamon project + * + * Licensed under the Apache License, Version 2.0 (the "License") you may not use this file + * except in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + * ========================================================================================= + */ + +package utils + +import com.linecorp.armeria.common.AggregatedHttpResponse + +import scala.collection.JavaConverters + + +object TestSupport { + def getResponseHeaders(response: AggregatedHttpResponse): Map[String, String] = + JavaConverters + .asScalaIteratorConverter(response.headers().iterator()) + .asScala + .map(entry => (entry.getKey.toString, entry.getValue)) + .toMap +} + + +