Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trying to integrate with zipkin at actor level #8

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<brave.version>5.13.2</brave.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
Expand Down Expand Up @@ -107,6 +108,15 @@
<artifactId>micrometer-registry-jmx</artifactId>
<version>1.0.6</version>
</dependency>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-instrumentation-okhttp3</artifactId>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-sender-okhttp3</artifactId>
<version>2.16.2</version>
</dependency>
</dependencies>

<repositories>
Expand Down Expand Up @@ -139,4 +149,16 @@
<url>https://oss.jfrog.org/artifactory/oss-snapshot-local</url>
</snapshotRepository>
</distributionManagement>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.zipkin.brave</groupId>
<artifactId>brave-bom</artifactId>
<version>${brave.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
</project>
195 changes: 195 additions & 0 deletions src/main/java/brave/RealSpan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* Copyright 2013-2020 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package brave;

import brave.handler.MutableSpan;
import brave.internal.recorder.PendingSpans;
import brave.propagation.TraceContext;

/** This wraps the public api and guards access to a mutable span. */
public final class RealSpan extends Span {
final TraceContext context;
final PendingSpans pendingSpans;
final MutableSpan state;
final Clock clock;

RealSpan(TraceContext context,
PendingSpans pendingSpans,
MutableSpan state,
Clock clock
) {
this.context = context;
this.pendingSpans = pendingSpans;
this.state = state;
this.clock = clock;
}

@Override public boolean isNoop() {
return false;
}

@Override public TraceContext context() {
return context;
}

@Override public SpanCustomizer customizer() {
return new SpanCustomizerShield(this);
}

@Override public Span start() {
return start(clock.currentTimeMicroseconds());
}

@Override public Span start(long timestamp) {
synchronized (state) {
state.startTimestamp(timestamp);
}
return this;
}

@Override public Span name(String name) {
synchronized (state) {
state.name(name);
}
return this;
}

@Override public Span kind(Kind kind) {
synchronized (state) {
state.kind(kind);
}
return this;
}

@Override public Span annotate(String value) {
return annotate(clock.currentTimeMicroseconds(), value);
}

@Override public Span annotate(long timestamp, String value) {
// Modern instrumentation should not send annotations such as this, but we leniently
// accept them rather than fail. This for example allows old bridges like to Brave v3 to work
if ("cs".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.CLIENT);
state.startTimestamp(timestamp);
}
} else if ("sr".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.SERVER);
state.startTimestamp(timestamp);
}
} else if ("cr".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.CLIENT);
}
finish(timestamp);
} else if ("ss".equals(value)) {
synchronized (state) {
state.kind(Span.Kind.SERVER);
}
finish(timestamp);
} else {
synchronized (state) {
state.annotate(timestamp, value);
}
}
return this;
}

@Override public Span tag(String key, String value) {
synchronized (state) {
state.tag(key, value);
}
return this;
}

@Override public Span error(Throwable throwable) {
synchronized (state) {
state.error(throwable);
}
return this;
}

public Span localServiceName(String localServiceName) {
synchronized (state) {
state.localServiceName(localServiceName);
}
return this;
}

public String remoteServiceName() {
synchronized (state) {
return state.remoteServiceName();
}
}

@Override public Span remoteServiceName(String remoteServiceName) {
synchronized (state) {
state.remoteServiceName(remoteServiceName);
}
return this;
}

@Override public boolean remoteIpAndPort(String remoteIp, int remotePort) {
synchronized (state) {
return state.remoteIpAndPort(remoteIp, remotePort);
}
}

@Override public void finish() {
finish(0L);
}

@Override public void finish(long timestamp) {
synchronized (state) {
pendingSpans.finish(context, timestamp);
}
}

@Override public void abandon() {
pendingSpans.abandon(context);
}

@Override public void flush() {
pendingSpans.flush(context);
}

@Override public String toString() {
return "RealSpan(" + context + ")";
}

/**
* This also matches equals against a lazy span. The rationale is least surprise to the user, as
* code should not act differently given an instance of lazy or {@link RealSpan}.
*/
@Override public boolean equals(Object o) {
if (o == this) return true;
return isEqualToRealOrLazySpan(context, o);
}

// We don't compare a RealSpan vs a NoopSpan as they can never equal each other.
// RealSpan's are always locally sampled and Noop ones are always not.
static boolean isEqualToRealOrLazySpan(TraceContext context, Object o) {
if (o instanceof LazySpan) {
return context.equals(((LazySpan) o).context);
} else if (o instanceof RealSpan) {
return context.equals(((RealSpan) o).context);
}
return false;
}

@Override public int hashCode() {
return context.hashCode();
}
}
58 changes: 58 additions & 0 deletions src/main/java/com/github/Test.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.github;

import io.vlingo.actors.Actor;
import io.vlingo.actors.Definition;
import io.vlingo.actors.World;

public class Test {
public interface Ping {
void ping();
}

public static class PingActor extends Actor implements Ping {
@Override
public void ping() {
System.out.println("Ping!");

stage().actorOf(Pong.class, stage().addressFactory().from("100"))
.andFinallyConsume(Pong::pong);
}
}

public interface Pong {
void pong();
}

public static class PongActor extends Actor implements Pong {
@Override
public void pong() {
System.out.println("Pong!");

stage().actorOf(Foo.class, stage().addressFactory().from("200"))
.andFinallyConsume(Foo::foo);
}
}

public interface Foo {
void foo();
}

public static class FooActor extends Actor implements Foo {
@Override
public void foo() {
System.out.println("Foo!");
}
}


public static void main(String[] args) {
World world = World.start("test");

Ping ping = world.actorFor(Ping.class, PingActor.class);

world.stage().actorFor(Pong.class, Definition.has(PongActor.class, Definition.NoParameters), world.addressFactory().from("100"));
world.stage().actorFor(Foo.class, Definition.has(FooActor.class, Definition.NoParameters), world.addressFactory().from("200"));

ping.ping();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,19 @@

import java.util.Properties;

import brave.Tracer;
import brave.Tracing;
import io.vlingo.actors.Configuration;
import io.vlingo.actors.Registrar;
import io.vlingo.actors.plugin.Plugin;
import io.vlingo.actors.plugin.PluginConfiguration;
import io.vlingo.actors.plugin.PluginProperties;
import io.vlingo.actors.plugin.mailbox.DefaultMailboxProviderKeeper;
import io.vlingo.telemetry.Telemetry;
import io.vlingo.telemetry.tracing.TracingBaggage;
import zipkin2.reporter.brave.AsyncZipkinSpanHandler;
import zipkin2.reporter.brave.ZipkinSpanHandler;
import zipkin2.reporter.okhttp3.OkHttpSender;

public class MailboxTelemetryPlugin implements Plugin {
public static class MailboxTelemetryPluginConfiguration implements PluginConfiguration {
Expand Down Expand Up @@ -65,7 +71,21 @@ public int pass() {
@Override
public void start(final Registrar registrar) {
Telemetry<?> telemetry = Telemetry.from(registrar.world());
registrar.registerMailboxProviderKeeper(new TelemetryMailboxProviderKeeper(new DefaultMailboxProviderKeeper(), new DefaultMailboxTelemetry(telemetry)));
OkHttpSender sender = OkHttpSender.create("http://127.0.0.1:9411/api/v2/spans");
// (this dependency is io.zipkin.reporter2:zipkin-reporter-brave)
ZipkinSpanHandler zipkinSpanHandler = AsyncZipkinSpanHandler.create(sender);

// Create a tracing component with the service name you want to see in Zipkin.
Tracing tracing = Tracing.newBuilder()
.traceId128Bit(true)
.addSpanHandler(zipkinSpanHandler)
.build();

// Tracing exposes objects you might need, most importantly the tracer
Tracer tracer = tracing.tracer();
TracingBaggage baggage = TracingBaggage.withTracer(tracer);

registrar.registerMailboxProviderKeeper(new TelemetryMailboxProviderKeeper(new DefaultMailboxProviderKeeper(), new DefaultMailboxTelemetry(telemetry), baggage));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
import io.vlingo.actors.Message;
import io.vlingo.actors.Returns;
import io.vlingo.common.SerializableConsumer;
import io.vlingo.telemetry.tracing.TracingBaggage;

public class TelemetryMailbox implements Mailbox {
private final MailboxTelemetry telemetry;
private final Mailbox delegate;
private final TracingBaggage baggage;

public TelemetryMailbox(final MailboxTelemetry telemetry, final Mailbox delegate) {
public TelemetryMailbox(final MailboxTelemetry telemetry, final Mailbox delegate, final TracingBaggage baggage) {
this.telemetry = telemetry;
this.delegate = delegate;
this.baggage = baggage;
}

@Override
Expand Down Expand Up @@ -50,7 +53,7 @@ public void resume(final String name) {
@Override
public void send(final Message message) {
try {
delegate.send(new TelemetryMessage(message, telemetry));
delegate.send(new TelemetryMessage(message, telemetry, baggage));
telemetry.onSendMessage(message.actor());
} catch (Exception e) {
telemetry.onSendMessageFailed(message.actor(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
import io.vlingo.actors.Dispatcher;
import io.vlingo.actors.Mailbox;
import io.vlingo.actors.MailboxProvider;
import io.vlingo.telemetry.tracing.TracingBaggage;

public class TelemetryMailboxProvider implements MailboxProvider {
private final MailboxTelemetry telemetry;
private final MailboxProvider delegate;
private final TracingBaggage baggage;

public TelemetryMailboxProvider(final MailboxTelemetry telemetry, final MailboxProvider delegate) {
public TelemetryMailboxProvider(final MailboxTelemetry telemetry, final MailboxProvider delegate, final TracingBaggage baggage) {
this.telemetry = telemetry;
this.delegate = delegate;
this.baggage = baggage;
}

@Override
Expand All @@ -27,11 +30,11 @@ public void close() {

@Override
public Mailbox provideMailboxFor(final int hashCode) {
return new TelemetryMailbox(telemetry, delegate.provideMailboxFor(hashCode));
return new TelemetryMailbox(telemetry, delegate.provideMailboxFor(hashCode), baggage);
}

@Override
public Mailbox provideMailboxFor(final int hashCode, final Dispatcher dispatcher) {
return new TelemetryMailbox(telemetry, delegate.provideMailboxFor(hashCode, dispatcher));
return new TelemetryMailbox(telemetry, delegate.provideMailboxFor(hashCode, dispatcher), baggage);
}
}
Loading