From e82c6278b558da3c832432917f1257e0163fc9e7 Mon Sep 17 00:00:00 2001 From: Doug Hilpipre Date: Fri, 1 Sep 2023 09:40:08 -0500 Subject: [PATCH] reinstate DT --- reactor-core-2.x/build.gradle | 26 ++++ .../reactor/NRRunnableWrapper.java | 35 +++++ .../dispatch/AbstractLifecycleDispatcher.java | 9 ++ reactor-core-3.1/build.gradle | 12 +- .../reactor/NRReactorHeaders.java | 62 ++++++++ .../reactor/NRRunnableWrapper.java | 23 +-- .../reactor/NRSubscriberWrapper.java | 126 +++++++++++++++++ .../reactor/ReactorDispatcher.java | 63 +++++++++ .../instrumentation/reactor/ReactorUtils.java | 77 ++++++++++ .../core/publisher/FluxSubscribeOn.java | 23 +++ .../core/publisher/Flux_instrumentation.java | 18 +++ .../core/publisher/Mono_instrumentation.java | 17 +++ .../reactor/core/scheduler/Scheduler.java | 35 +++-- .../reactor/test/MonoCoreSubscriber.java | 35 +++++ .../reactor/test/TestApplication.java | 111 ++++++++++----- reactor-core-3.3/build.gradle | 26 ++++ .../reactor/NRReactorHeaders.java | 62 ++++++++ .../reactor/NRReactorPremain.java | 17 +++ .../reactor/NRRunnableWrapper.java | 42 ++++++ .../reactor/NRSubscriberWrapper.java | 132 ++++++++++++++++++ .../reactor/ReactorDispatcher.java | 79 +++++++++++ .../instrumentation/reactor/ReactorUtils.java | 91 ++++++++++++ .../core/publisher/Flux_instrumentation.java | 18 +++ .../core/publisher/Mono_instrumentation.java | 17 +++ .../reactor/core/scheduler/Scheduler.java | 62 ++++++++ reactor-finder/build.gradle | 18 ++- .../reactor/ReactorClassMatcher.java | 18 ++- .../reactor/ReactorClassTransformer.java | 65 +++++++++ .../frameworks/reactor/ReactorFactory.java | 30 ++++ .../frameworks/reactor/ReactorPreMain.java | 74 ++++++++++ .../example/reactor/test/MonoReturning.java | 39 ++++++ .../labs/reactor/ReactorTest.java | 34 +++++ settings.gradle | 1 + 33 files changed, 1418 insertions(+), 79 deletions(-) create mode 100644 reactor-core-2.x/build.gradle create mode 100644 reactor-core-2.x/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java create mode 100644 reactor-core-2.x/src/main/java/reactor/core/dispatch/AbstractLifecycleDispatcher.java create mode 100644 reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java create mode 100644 reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java create mode 100644 reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java create mode 100644 reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java create mode 100644 reactor-core-3.1/src/main/java/reactor/core/publisher/FluxSubscribeOn.java create mode 100644 reactor-core-3.1/src/main/java/reactor/core/publisher/Flux_instrumentation.java create mode 100644 reactor-core-3.1/src/main/java/reactor/core/publisher/Mono_instrumentation.java create mode 100644 reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/MonoCoreSubscriber.java create mode 100644 reactor-core-3.3/build.gradle create mode 100644 reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java create mode 100644 reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorPremain.java create mode 100644 reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java create mode 100644 reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java create mode 100644 reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java create mode 100644 reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java create mode 100644 reactor-core-3.3/src/main/java/reactor/core/publisher/Flux_instrumentation.java create mode 100644 reactor-core-3.3/src/main/java/reactor/core/publisher/Mono_instrumentation.java create mode 100644 reactor-core-3.3/src/main/java/reactor/core/scheduler/Scheduler.java create mode 100644 reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassTransformer.java create mode 100644 reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorFactory.java create mode 100644 reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorPreMain.java create mode 100644 reactor-finder/src/test/java/com/example/reactor/test/MonoReturning.java create mode 100644 reactor-finder/src/test/java/com/newrelic/instrumentation/labs/reactor/ReactorTest.java diff --git a/reactor-core-2.x/build.gradle b/reactor-core-2.x/build.gradle new file mode 100644 index 0000000..691aa6a --- /dev/null +++ b/reactor-core-2.x/build.gradle @@ -0,0 +1,26 @@ + +// Build.gradle generated for instrumentation module reactor-core-2.x + +apply plugin: 'java' + +dependencies { + implementation 'io.projectreactor:reactor-core:2.0.1.RELEASE' + + // New Relic Java Agent dependencies + implementation 'com.newrelic.agent.java:newrelic-agent:6.0.0' + implementation 'com.newrelic.agent.java:newrelic-api:6.0.0' + implementation fileTree(include: ['*.jar'], dir: '../libs') +} + +jar { + manifest { + attributes 'Implementation-Title': 'com.newrelic.instrumentation.labs.reactor-core-2.x' + attributes 'Implementation-Vendor': 'New Relic Labs' + attributes 'Implementation-Vendor-Id': 'com.newrelic.labs' + attributes 'Implementation-Version': 1.0 + } +} + +verifyInstrumentation { + passes 'io.projectreactor:reactor-core:[2.0.1.RELEASE]' +} diff --git a/reactor-core-2.x/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java b/reactor-core-2.x/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java new file mode 100644 index 0000000..ffb4a47 --- /dev/null +++ b/reactor-core-2.x/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java @@ -0,0 +1,35 @@ +package com.nr.instrumentation.reactor; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.Trace; + +public class NRRunnableWrapper implements Runnable { + + private Runnable delegate = null; + + private Token token = null; + private static boolean isTransformed = false; + + public NRRunnableWrapper(Runnable r, Token t) { + delegate = r; + token = t; + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + @Trace(async=true) + public void run() { + if(token != null) { + token.linkAndExpire(); + token = null; + } + if(delegate != null) { + delegate.run(); + } + } + +} diff --git a/reactor-core-2.x/src/main/java/reactor/core/dispatch/AbstractLifecycleDispatcher.java b/reactor-core-2.x/src/main/java/reactor/core/dispatch/AbstractLifecycleDispatcher.java new file mode 100644 index 0000000..530fc6d --- /dev/null +++ b/reactor-core-2.x/src/main/java/reactor/core/dispatch/AbstractLifecycleDispatcher.java @@ -0,0 +1,9 @@ +package reactor.core.dispatch; + +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; + +@Weave(type=MatchType.BaseClass) +public abstract class AbstractLifecycleDispatcher { + +} diff --git a/reactor-core-3.1/build.gradle b/reactor-core-3.1/build.gradle index 0a2b1f3..7f682ce 100644 --- a/reactor-core-3.1/build.gradle +++ b/reactor-core-3.1/build.gradle @@ -4,7 +4,7 @@ apply plugin: 'java' dependencies { - implementation 'io.projectreactor:reactor-core:3.2.11.RELEASE' + implementation 'io.projectreactor:reactor-core:3.1.0.RELEASE' // New Relic Java Agent dependencies implementation 'com.newrelic.agent.java:newrelic-agent:6.4.0' @@ -19,13 +19,13 @@ dependencies { jar { manifest { - attributes 'Implementation-Title': 'com.newrelic.instrumentation.reactor-core-3.1' - attributes 'Implementation-Vendor': 'New Relic' - attributes 'Implementation-Vendor-Id': 'com.newrelic' + attributes 'Implementation-Title': 'com.newrelic.instrumentation.labs.reactor-core-3.1' + attributes 'Implementation-Vendor': 'New Relic Labs' + attributes 'Implementation-Vendor-Id': 'com.newrelic.labs' attributes 'Implementation-Version': 1.0 } } verifyInstrumentation { - passes 'io.projectreactor:reactor-core:[3.1.0.RELEASE,)' -} \ No newline at end of file + passes 'io.projectreactor:reactor-core:[3.1.0.RELEASE,3.3.0.RELEASE)' +} diff --git a/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java new file mode 100644 index 0000000..526549f --- /dev/null +++ b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java @@ -0,0 +1,62 @@ +package com.nr.instrumentation.reactor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.Headers; + +public class NRReactorHeaders implements Headers { + + private HashMap headers = new HashMap(); + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + + @Override + public String getHeader(String name) { + return headers.get(name); + } + + @Override + public Collection getHeaders(String name) { + String value = headers.get(name); + List list = new ArrayList(); + if(value != null) { + list.add(value); + } + return list; + } + + @Override + public void setHeader(String name, String value) { + headers.put(name, value); + } + + @Override + public void addHeader(String name, String value) { + headers.put(name, value); + } + + @Override + public Collection getHeaderNames() { + return headers.keySet(); + } + + @Override + public boolean containsHeader(String name) { + return headers.containsKey(name); + } + + public boolean isEmpty() { + return headers.isEmpty(); + } + + public void clear() { + headers.clear(); + } +} diff --git a/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java index ffb4a47..15fb4da 100644 --- a/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java +++ b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java @@ -1,19 +1,20 @@ package com.nr.instrumentation.reactor; import com.newrelic.agent.bridge.AgentBridge; -import com.newrelic.api.agent.Token; +import com.newrelic.api.agent.NewRelic; import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.TransportType; public class NRRunnableWrapper implements Runnable { private Runnable delegate = null; - private Token token = null; + private NRReactorHeaders headers; private static boolean isTransformed = false; - public NRRunnableWrapper(Runnable r, Token t) { + public NRRunnableWrapper(Runnable r, NRReactorHeaders h) { delegate = r; - token = t; + headers = h; if(!isTransformed) { isTransformed = true; AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); @@ -21,11 +22,17 @@ public NRRunnableWrapper(Runnable r, Token t) { } @Override - @Trace(async=true) + @Trace(dispatcher=true) public void run() { - if(token != null) { - token.linkAndExpire(); - token = null; + boolean ignore = true; + if(headers != null) { + if(!headers.isEmpty()) { + NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, headers); + ignore = false; + } + } + if(ignore) { + NewRelic.getAgent().getTransaction().ignore(); } if(delegate != null) { delegate.run(); diff --git a/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java new file mode 100644 index 0000000..f2b7907 --- /dev/null +++ b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java @@ -0,0 +1,126 @@ +package com.nr.instrumentation.reactor; + +import java.util.logging.Level; + +import org.reactivestreams.Subscription; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.Transaction; + +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; +import reactor.core.Fuseable.QueueSubscription; +import reactor.core.Scannable; + +public class NRSubscriberWrapper implements CoreSubscriber, QueueSubscription { + + private CoreSubscriber actual = null; + + private static boolean isTransformed = false; + + private Subscription subscription = null; + + private NRReactorHeaders headers = null; + + private String name = null; + + public NRSubscriberWrapper(CoreSubscriber sub, Scannable s) { + if(!isTransformed) { + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + isTransformed = true; + } + actual = sub; + name = s.name(); + if(name == null || name.isEmpty()) name = "Scannable"; + headers = new NRReactorHeaders(); + Transaction transaction = NewRelic.getAgent().getTransaction(); + NewRelic.getAgent().getLogger().log(Level.FINE, "In NRSubscriberWrapper., transaction is {0}", transaction); + if (transaction != null) { + transaction.insertDistributedTraceHeaders(headers); + } + + } + + @Override + public void onNext(T t) { + if(!ReactorUtils.activeTransaction()) { + ReactorDispatcher.startOnNextTransaction(name, actual, t, headers); + } else { + actual.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + if(!ReactorUtils.activeTransaction()) { + ReactorDispatcher.startOnErrorTransaction(name, actual, headers,t); + } else { + ReactorUtils.deActivate(); + actual.onError(t); + } + } + + @Override + public void onComplete() { + if(!ReactorUtils.activeTransaction()) { + ReactorDispatcher.startOnCompleteTransaction(name, actual, headers); + } else { + ReactorUtils.deActivate(); + actual.onComplete(); + } + } + + @Override + public void onSubscribe(Subscription s) { + if(headers == null) { + headers = new NRReactorHeaders(); + } + if(headers.isEmpty()) { + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers); + } + subscription = s; + actual.onSubscribe(this); + } + + @Override + @Trace + public void request(long n) { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"request"); + subscription.request(n); + } + + @Override + @Trace + public void cancel() { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"cancel"); + subscription.cancel(); + } + + @Override + public T poll() { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void clear() { + + } + + @Override + public int requestFusion(int requestedMode) { + return Fuseable.NONE; + } + +} diff --git a/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java new file mode 100644 index 0000000..42d5924 --- /dev/null +++ b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java @@ -0,0 +1,63 @@ +package com.nr.instrumentation.reactor; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.Transaction; +import com.newrelic.api.agent.TransportType; + +import reactor.core.CoreSubscriber; + +public class ReactorDispatcher { + + private static volatile ReactorDispatcher instance = null; + + public static ReactorDispatcher get() { + if(instance == null) { + instance = new ReactorDispatcher(); + } + return instance; + } + + private ReactorDispatcher() { + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + + @Trace(dispatcher = true) + public static void startOnNextTransaction(String name,CoreSubscriber sub, T t, NRReactorHeaders headers) { + ReactorUtils.setActive(); + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"onNext"); + Transaction transaction = NewRelic.getAgent().getTransaction(); + if (transaction != null) { + if (headers != null && !headers.isEmpty()) { + transaction.acceptDistributedTraceHeaders(TransportType.Other, headers); + } + } + sub.onNext(t); + } + + @Trace(dispatcher = true) + public static void startOnCompleteTransaction(String name,CoreSubscriber sub, NRReactorHeaders headers) { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"onComplete"); + Transaction transaction = NewRelic.getAgent().getTransaction(); + if (transaction != null) { + if (headers != null && !headers.isEmpty()) { + transaction.acceptDistributedTraceHeaders(TransportType.Other, headers); + } + } + sub.onComplete(); + } + + @Trace(dispatcher = true) + public static void startOnErrorTransaction(String name,CoreSubscriber sub, NRReactorHeaders headers, Throwable t) { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"onError"); + Transaction transaction = NewRelic.getAgent().getTransaction(); + if (transaction != null) { + if (headers != null && !headers.isEmpty()) { + transaction.acceptDistributedTraceHeaders(TransportType.Other, headers); + } + } + sub.onError(t); + } + +} diff --git a/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java new file mode 100644 index 0000000..96ed7fd --- /dev/null +++ b/reactor-core-3.1/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java @@ -0,0 +1,77 @@ +package com.nr.instrumentation.reactor; + +import java.util.function.BiFunction; +import java.util.function.Function; + +import org.reactivestreams.Publisher; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Transaction; + +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Operators; + +public class ReactorUtils { + + + private static ThreadLocal transactionActive = new ThreadLocal() { + + @Override + protected Boolean initialValue() { + return false; + } + + }; + + public static boolean initialized = false; + + public static final String NRHEADERS = "NEWRELIC_HEADERS"; + + + public static void initialize() { + initialized = true; + //Hooks.onEachOperator("NewRelicWrapper",asOperator()); + Hooks.onLastOperator("NewRelicWrapper",asOperator()); + ReactorDispatcher.get(); + + } + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static Function, ? extends Publisher> asOperator() { + + return Operators.lift(new BiFunction, CoreSubscriber>() { + + @Override + public CoreSubscriber apply(Scannable t, CoreSubscriber u) { + return new NRSubscriberWrapper(u,t); + } + }); + + } + + public static boolean activeTransaction() { + Boolean active = transactionActive.get(); + Transaction txn = NewRelic.getAgent().getTransaction(); + // if transaction is NoOp then there is no active transaction + boolean isNoOp = txn.getClass().getName().toLowerCase().contains("noop"); + if(!active && !isNoOp) { + setActive(); + active = true; + } else if(active && isNoOp) { + deActivate(); + active = false; + } + return active; + } + + public static void setActive() { + transactionActive.set(true); + } + + public static void deActivate() { + transactionActive.set(false); + } +} diff --git a/reactor-core-3.1/src/main/java/reactor/core/publisher/FluxSubscribeOn.java b/reactor-core-3.1/src/main/java/reactor/core/publisher/FluxSubscribeOn.java new file mode 100644 index 0000000..48106cd --- /dev/null +++ b/reactor-core-3.1/src/main/java/reactor/core/publisher/FluxSubscribeOn.java @@ -0,0 +1,23 @@ +package reactor.core.publisher; + +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; + +import reactor.core.CoreSubscriber; + +/** + * Included simply so the verifier only works up to version 3.3.0.RELEASE + * + * @author dhilpipre + * + * @param + */ + +@Weave +abstract class FluxSubscribeOn { + + public void subscribe(CoreSubscriber actual) { + Weaver.callOriginal(); + } + +} diff --git a/reactor-core-3.1/src/main/java/reactor/core/publisher/Flux_instrumentation.java b/reactor-core-3.1/src/main/java/reactor/core/publisher/Flux_instrumentation.java new file mode 100644 index 0000000..c586b2b --- /dev/null +++ b/reactor-core-3.1/src/main/java/reactor/core/publisher/Flux_instrumentation.java @@ -0,0 +1,18 @@ +package reactor.core.publisher; + +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.ReactorUtils; + +@Weave(originalName="reactor.core.publisher.Flux",type = MatchType.BaseClass) +public abstract class Flux_instrumentation { + + protected static Flux onAssembly(Flux source) { + if(!ReactorUtils.initialized) { + ReactorUtils.initialize(); + } + return Weaver.callOriginal(); + } + +} diff --git a/reactor-core-3.1/src/main/java/reactor/core/publisher/Mono_instrumentation.java b/reactor-core-3.1/src/main/java/reactor/core/publisher/Mono_instrumentation.java new file mode 100644 index 0000000..1460861 --- /dev/null +++ b/reactor-core-3.1/src/main/java/reactor/core/publisher/Mono_instrumentation.java @@ -0,0 +1,17 @@ +package reactor.core.publisher; + +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.ReactorUtils; + +@Weave(originalName="reactor.core.publisher.Mono",type = MatchType.BaseClass) +public abstract class Mono_instrumentation { + + protected static Mono onAssembly(Mono source) { + if(!ReactorUtils.initialized) { + ReactorUtils.initialize(); + } + return Weaver.callOriginal(); + } +} diff --git a/reactor-core-3.1/src/main/java/reactor/core/scheduler/Scheduler.java b/reactor-core-3.1/src/main/java/reactor/core/scheduler/Scheduler.java index 4174254..dccba83 100644 --- a/reactor-core-3.1/src/main/java/reactor/core/scheduler/Scheduler.java +++ b/reactor-core-3.1/src/main/java/reactor/core/scheduler/Scheduler.java @@ -3,11 +3,11 @@ import java.util.concurrent.TimeUnit; import com.newrelic.api.agent.NewRelic; -import com.newrelic.api.agent.Token; import com.newrelic.api.agent.Trace; import com.newrelic.api.agent.weaver.MatchType; import com.newrelic.api.agent.weaver.Weave; import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.NRReactorHeaders; import com.nr.instrumentation.reactor.NRRunnableWrapper; import reactor.core.Disposable; @@ -18,13 +18,12 @@ public abstract class Scheduler { @Trace public Disposable schedule(Runnable task) { if(!(task instanceof NRRunnableWrapper)) { - Token t = NewRelic.getAgent().getTransaction().getToken(); - if(t != null && t.isActive()) { - NRRunnableWrapper wrapper = new NRRunnableWrapper(task, t); + NRReactorHeaders nrHeaders = new NRReactorHeaders(); + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders); + + if(!nrHeaders.isEmpty()) { + NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders); task = wrapper; - } else if(t != null) { - t.expire(); - t = null; } } return Weaver.callOriginal(); @@ -35,13 +34,12 @@ public static class Worker { public Disposable schedule(Runnable task) { if(!(task instanceof NRRunnableWrapper)) { - Token t = NewRelic.getAgent().getTransaction().getToken(); - if(t != null && t.isActive()) { - NRRunnableWrapper wrapper = new NRRunnableWrapper(task, t); + NRReactorHeaders nrHeaders = new NRReactorHeaders(); + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders); + + if(!nrHeaders.isEmpty()) { + NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders); task = wrapper; - } else if(t != null) { - t.expire(); - t = null; } } return Weaver.callOriginal(); @@ -49,13 +47,12 @@ public Disposable schedule(Runnable task) { public Disposable schedule(Runnable task, long delay, TimeUnit unit) { if(!(task instanceof NRRunnableWrapper)) { - Token t = NewRelic.getAgent().getTransaction().getToken(); - if(t != null && t.isActive()) { - NRRunnableWrapper wrapper = new NRRunnableWrapper(task, t); + NRReactorHeaders nrHeaders = new NRReactorHeaders(); + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders); + + if(!nrHeaders.isEmpty()) { + NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders); task = wrapper; - } else if(t != null) { - t.expire(); - t = null; } } return Weaver.callOriginal(); diff --git a/reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/MonoCoreSubscriber.java b/reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/MonoCoreSubscriber.java new file mode 100644 index 0000000..ceddfe3 --- /dev/null +++ b/reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/MonoCoreSubscriber.java @@ -0,0 +1,35 @@ +package com.nr.instrumentation.reactor.test; + +import org.reactivestreams.Subscription; + +import com.newrelic.api.agent.Trace; + +import reactor.core.CoreSubscriber; + +public class MonoCoreSubscriber implements CoreSubscriber { + + @Override + @Trace + public void onNext(String t) { + System.out.println("Received string for onNext: " + t); + } + + @Override + @Trace + public void onError(Throwable t) { + System.out.println("Received error for onError: " + t); + } + + @Override + @Trace + public void onComplete() { + System.out.println("Mono has completed"); + } + + @Override + @Trace + public void onSubscribe(Subscription var1) { + System.out.println("Mono was subscribed to by : " + var1); + } + +} diff --git a/reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/TestApplication.java b/reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/TestApplication.java index 4803f27..448b0cb 100644 --- a/reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/TestApplication.java +++ b/reactor-core-3.1/src/test/java/com/nr/instrumentation/reactor/test/TestApplication.java @@ -1,22 +1,21 @@ package com.nr.instrumentation.reactor.test; import java.util.Collection; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Consumer; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import com.newrelic.agent.introspec.InstrumentationTestConfig; import com.newrelic.agent.introspec.InstrumentationTestRunner; import com.newrelic.agent.introspec.Introspector; -import com.newrelic.agent.introspec.SpanEvent; -import com.newrelic.agent.introspec.TraceSegment; import com.newrelic.agent.introspec.TracedMetricData; -import com.newrelic.agent.introspec.TransactionTrace; import com.newrelic.api.agent.Trace; +import reactor.core.Disposable; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -25,57 +24,77 @@ public class TestApplication { private static final String txn1 = "OtherTransaction/Custom/com.nr.instrumentation.reactor.test.TestApplication/testMonoSub"; + private static final String txn2 = "OtherTransaction/Custom/com.nr.instrumentation.reactor.NRRunnableWrapper/run"; + private static final String txn3 = "OtherTransaction/Custom/com.nr.instrumentation.reactor.test.TestApplication/testMonoPub"; + @Test public void doMonoSubscribeOnTest() { + /** + * This should result in two transaction, one is the main thread and the other is the result of the subscribe action + */ testMonoSub(); Introspector introspector = InstrumentationTestRunner.getIntrospector(); int finishedTransactionCount = introspector.getFinishedTransactionCount(5000); - System.out.println("Finished transaction count: "+finishedTransactionCount); + Assert.assertEquals("Expected two transactions", finishedTransactionCount, 2L); Collection txnNames = introspector.getTransactionNames(); - for(String tName : txnNames) { - System.out.println("Transaction Name: "+tName); - } + boolean contains = txnNames.contains(txn1) & txnNames.contains(txn2); + Assert.assertTrue(contains); - Map metrics = introspector.getMetricsForTransaction(txn1); + Map metrics = introspector.getMetricsForTransaction(txn2); Set names = metrics.keySet(); - for(String name : names) { - TracedMetricData traced = metrics.get(name); - System.out.println("Traced: name="+traced.getName()+", call count="+traced.getCallCount()+", totalTime="+traced.getTotalTimeInSec()); - } + Assert.assertTrue(names.contains("Java/com.nr.instrumentation.reactor.NRRunnableWrapper/run")); + Assert.assertTrue(names.contains("Custom/com.nr.instrumentation.reactor.test.TestApplication/doSubscribeAction")); + } + + @Test + public void doMonoPublishOnTest() { + /** + * This should result in two transaction, one is the main thread and the other is the result of the publish action + */ + testMonoPub(); + + Introspector introspector = InstrumentationTestRunner.getIntrospector(); + int finishedTransactionCount = introspector.getFinishedTransactionCount(5000); + Assert.assertEquals("Expected two transaction", finishedTransactionCount, 2L); - Collection traces = introspector.getTransactionTracesForTransaction(txn1); - System.out.println("Returned "+traces.size()+" transaction traces"); - for(TransactionTrace trace : traces) { - System.out.println("Trace: start="+trace.getStartTime()+", response time="+trace.getResponseTimeInSec()); - TraceSegment current = trace.getInitialTraceSegment(); - System.out.println("Initial Segment: name="+current.getName()+", method name:"+current.getMethodName()+", class name: "+current.getClassName()+", call count="+current.getCallCount()); - List children = current.getChildren(); - System.out.println("Contains "+children.size()+" children"); - int count = 0; - while(children != null && children.size() > 0) { - count++; - current = children.get(0); - System.out.println("Child Segment "+count+": name="+current.getName()+", method name:"+current.getMethodName()+", class name: "+current.getClassName()+", call count="+current.getCallCount()); - children = current.getChildren(); - System.out.println("Contains "+children.size()+" children"); - } - - Collection spans = introspector.getSpanEvents(); - System.out.println("There are "+spans.size()+" spans"); + Collection txnNames = introspector.getTransactionNames(); + boolean contains = txnNames.contains(txn3) & txnNames.contains(txn2); + Assert.assertTrue(contains); + + Map metrics = introspector.getMetricsForTransaction(txn2); + Set names = metrics.keySet(); + for(String name : names) { + System.out.println("trace: "+ name); } + Assert.assertTrue(names.contains("Java/com.nr.instrumentation.reactor.NRRunnableWrapper/run")); + //Assert.assertTrue(names.contains("Custom/com.nr.instrumentation.reactor.test.TestApplication/doSubscribeAction")); } + @Trace(dispatcher = true) + public void testMonoPub() { + System.out.println("Enter testMonoPub"); + Mono mono = getStringMono().publishOn(Schedulers.single()); + + mono.subscribeWith(new MonoCoreSubscriber()); + String result = mono.block(); + + System.out.println("Exit testMonoPub with result: "+ result); + + } + @Trace(dispatcher = true) public void testMonoSub() { System.out.println("Enter testMonoSub"); - Mono mono = getStringMono(); + ResultConsumer c = new ResultConsumer(); + Mono mono = getStringMono().subscribeOn(Schedulers.single()).doOnSuccess(c); + + mono.subscribe(s -> doSubscribeAction(s)); - mono.subscribeOn(Schedulers.single()).subscribe(s -> System.out.println("Result is "+s)); - System.out.println("Exit testMonoSub"); + System.out.println("Exit testMonoSub wit result: " + c.result); } @@ -91,4 +110,26 @@ public Mono getStringMono() { return "hello"; }); } + + @Trace + public void doSubscribeAction(String s) { + try { + Thread.sleep(100L); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("Result is "+s); + } + + private class ResultConsumer implements Consumer { + + String result; + + @Override + @Trace + public void accept(String t) { + result = t; + } + + } } diff --git a/reactor-core-3.3/build.gradle b/reactor-core-3.3/build.gradle new file mode 100644 index 0000000..17e1468 --- /dev/null +++ b/reactor-core-3.3/build.gradle @@ -0,0 +1,26 @@ + +// Build.gradle generated for instrumentation module reactor-core-3.1 + +apply plugin: 'java' + +dependencies { + implementation 'io.projectreactor:reactor-core:3.3.0.RELEASE' + + // New Relic Java Agent dependencies + implementation 'com.newrelic.agent.java:newrelic-agent:6.4.0' + implementation 'com.newrelic.agent.java:newrelic-api:6.4.0' + implementation fileTree(include: ['*.jar'], dir: '../libs') +} + +jar { + manifest { + attributes 'Implementation-Title': 'com.newrelic.instrumentation.labs.reactor-core-3.3' + attributes 'Implementation-Vendor': 'New Relic Labs' + attributes 'Implementation-Vendor-Id': 'com.newrelic.labs' + attributes 'Implementation-Version': 1.0 + } +} + +verifyInstrumentation { + passes 'io.projectreactor:reactor-core:[3.3.0.RELEASE,)' +} diff --git a/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java new file mode 100644 index 0000000..526549f --- /dev/null +++ b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorHeaders.java @@ -0,0 +1,62 @@ +package com.nr.instrumentation.reactor; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + +import com.newrelic.api.agent.HeaderType; +import com.newrelic.api.agent.Headers; + +public class NRReactorHeaders implements Headers { + + private HashMap headers = new HashMap(); + + @Override + public HeaderType getHeaderType() { + return HeaderType.MESSAGE; + } + + @Override + public String getHeader(String name) { + return headers.get(name); + } + + @Override + public Collection getHeaders(String name) { + String value = headers.get(name); + List list = new ArrayList(); + if(value != null) { + list.add(value); + } + return list; + } + + @Override + public void setHeader(String name, String value) { + headers.put(name, value); + } + + @Override + public void addHeader(String name, String value) { + headers.put(name, value); + } + + @Override + public Collection getHeaderNames() { + return headers.keySet(); + } + + @Override + public boolean containsHeader(String name) { + return headers.containsKey(name); + } + + public boolean isEmpty() { + return headers.isEmpty(); + } + + public void clear() { + headers.clear(); + } +} diff --git a/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorPremain.java b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorPremain.java new file mode 100644 index 0000000..12db56c --- /dev/null +++ b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRReactorPremain.java @@ -0,0 +1,17 @@ +package com.nr.instrumentation.reactor; + +/** + * Used to initialize ReactorUtils which needs to be started when the application starts + * + * @author dhilpipre + * + */ +public class NRReactorPremain { + + public static void premain(String[] args) { + if(!ReactorUtils.initialized) { + ReactorUtils.initialize(); + } + } + +} diff --git a/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java new file mode 100644 index 0000000..15fb4da --- /dev/null +++ b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRRunnableWrapper.java @@ -0,0 +1,42 @@ +package com.nr.instrumentation.reactor; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.TransportType; + +public class NRRunnableWrapper implements Runnable { + + private Runnable delegate = null; + + private NRReactorHeaders headers; + private static boolean isTransformed = false; + + public NRRunnableWrapper(Runnable r, NRReactorHeaders h) { + delegate = r; + headers = h; + if(!isTransformed) { + isTransformed = true; + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + } + + @Override + @Trace(dispatcher=true) + public void run() { + boolean ignore = true; + if(headers != null) { + if(!headers.isEmpty()) { + NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, headers); + ignore = false; + } + } + if(ignore) { + NewRelic.getAgent().getTransaction().ignore(); + } + if(delegate != null) { + delegate.run(); + } + } + +} diff --git a/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java new file mode 100644 index 0000000..a6d2940 --- /dev/null +++ b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/NRSubscriberWrapper.java @@ -0,0 +1,132 @@ +package com.nr.instrumentation.reactor; + +import org.reactivestreams.Subscription; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.TracedMethod; +import com.newrelic.api.agent.Transaction; + +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; +import reactor.core.Fuseable.QueueSubscription; +import reactor.core.Scannable; + +public class NRSubscriberWrapper implements CoreSubscriber, QueueSubscription { + + private CoreSubscriber actual = null; + + private static boolean isTransformed = false; + + private Subscription subscription = null; + + private NRReactorHeaders headers = null; + + private String name = null; + + public NRSubscriberWrapper(CoreSubscriber sub, Scannable s) { + if(!isTransformed) { + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + isTransformed = true; + } + actual = sub; + name = s.name(); + if(name == null || name.isEmpty()) name = "Scannable"; + headers = new NRReactorHeaders(); + Transaction transaction = NewRelic.getAgent().getTransaction(); + TracedMethod traced = NewRelic.getAgent().getTracedMethod(); + + if (transaction != null && !(traced instanceof com.newrelic.agent.bridge.NoOpTracedMethod)) { + transaction.insertDistributedTraceHeaders(headers); + } + + } + + @Override + public void onNext(T t) { + if(!ReactorUtils.activeTransaction()) { + ReactorDispatcher.startOnNextTransaction(name, actual, t, headers); + } else { + actual.onNext(t); + } + } + + @Override + public void onError(Throwable t) { + if(!ReactorUtils.activeTransaction()) { + ReactorDispatcher.startOnErrorTransaction(name, actual, headers,t); + } else { + ReactorUtils.deActivate(); + actual.onError(t); + } + } + + @Override + public void onComplete() { + if(!ReactorUtils.activeTransaction()) { + ReactorDispatcher.startOnCompleteTransaction(name, actual, headers); + } else { + ReactorUtils.deActivate(); + actual.onComplete(); + } + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + TracedMethod traced = NewRelic.getAgent().getTracedMethod(); + boolean isNoOp = traced.getClass().getSimpleName().toLowerCase().contains("noop"); + if(!ReactorUtils.activeTransaction() || isNoOp) { + ReactorDispatcher.startOnSubscribeTransaction(name, actual, s, headers); + } else { + if(headers == null) { + headers = new NRReactorHeaders(); + } + if(headers.isEmpty()) { + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers); + } + actual.onSubscribe(this); + } + } + + @Override + @Trace + public void request(long n) { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"request"); + subscription.request(n); + } + + @Override + @Trace + public void cancel() { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"cancel"); + subscription.cancel(); + } + + @Override + public T poll() { + return null; + } + + @Override + public int size() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public void clear() { + + } + + @Override + public int requestFusion(int requestedMode) { + return Fuseable.NONE; + } + +} diff --git a/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java new file mode 100644 index 0000000..ebf783d --- /dev/null +++ b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorDispatcher.java @@ -0,0 +1,79 @@ +package com.nr.instrumentation.reactor; + +import org.reactivestreams.Subscription; + +import com.newrelic.agent.bridge.AgentBridge; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.Transaction; +import com.newrelic.api.agent.TransportType; + +import reactor.core.CoreSubscriber; + +public class ReactorDispatcher { + + private static volatile ReactorDispatcher instance = null; + + public static ReactorDispatcher get() { + if(instance == null) { + instance = new ReactorDispatcher(); + } + return instance; + } + + private ReactorDispatcher() { + AgentBridge.instrumentation.retransformUninstrumentedClass(getClass()); + } + + @Trace(dispatcher = true) + public static void startOnNextTransaction(String name,CoreSubscriber sub, T t, NRReactorHeaders headers) { + ReactorUtils.setActive(); + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"onNext"); + Transaction transaction = NewRelic.getAgent().getTransaction(); + if (transaction != null) { + if (headers != null && !headers.isEmpty()) { + transaction.acceptDistributedTraceHeaders(TransportType.Other, headers); + } + } + sub.onNext(t); + } + + @Trace(dispatcher = true) + public static void startOnSubscribeTransaction(String name,CoreSubscriber sub, Subscription subscription, NRReactorHeaders headers) { + ReactorUtils.setActive(); + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"onSubscribe"); + Transaction transaction = NewRelic.getAgent().getTransaction(); + if (transaction != null) { + if (headers != null && !headers.isEmpty()) { + transaction.acceptDistributedTraceHeaders(TransportType.Other, headers); + } + } + sub.onSubscribe(subscription); + } + + + @Trace(dispatcher = true) + public static void startOnCompleteTransaction(String name,CoreSubscriber sub, NRReactorHeaders headers) { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"onComplete"); + Transaction transaction = NewRelic.getAgent().getTransaction(); + if (transaction != null) { + if (headers != null && !headers.isEmpty()) { + transaction.acceptDistributedTraceHeaders(TransportType.Other, headers); + } + } + sub.onComplete(); + } + + @Trace(dispatcher = true) + public static void startOnErrorTransaction(String name,CoreSubscriber sub, NRReactorHeaders headers, Throwable t) { + NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"onError"); + Transaction transaction = NewRelic.getAgent().getTransaction(); + if (transaction != null) { + if (headers != null && !headers.isEmpty()) { + transaction.acceptDistributedTraceHeaders(TransportType.Other, headers); + } + } + sub.onError(t); + } + +} diff --git a/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java new file mode 100644 index 0000000..1b851dc --- /dev/null +++ b/reactor-core-3.3/src/main/java/com/nr/instrumentation/reactor/ReactorUtils.java @@ -0,0 +1,91 @@ +package com.nr.instrumentation.reactor; + +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.logging.Level; + +import org.reactivestreams.Publisher; + +import com.newrelic.agent.transaction.TransactionTimer; +import com.newrelic.api.agent.Logger; +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Transaction; + +import reactor.core.CoreSubscriber; +import reactor.core.Scannable; +import reactor.core.publisher.Hooks; +import reactor.core.publisher.Operators; + +public class ReactorUtils { + + + private static ThreadLocal transactionActive = new ThreadLocal() { + + @Override + protected Boolean initialValue() { + return false; + } + + }; + + public static boolean initialized = false; + + public static final String NRHEADERS = "NEWRELIC_HEADERS"; + + + public static void initialize() { + initialized = true; + //Hooks.onEachOperator("NewRelicWrapper",asOperator()); + Hooks.onLastOperator("NewRelicWrapper",asOperator()); + ReactorDispatcher.get(); + + } + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private static Function, ? extends Publisher> asOperator() { + + return Operators.lift(new BiFunction, CoreSubscriber>() { + + @Override + public CoreSubscriber apply(Scannable t, CoreSubscriber u) { + return new NRSubscriberWrapper(u,t); + } + }); + + } + + public static boolean timerStarted() { + com.newrelic.agent.Transaction txn = com.newrelic.agent.Transaction.getTransaction(false); + if(txn != null) { + TransactionTimer timer = txn.getTransactionTimer(); + if(timer != null) { + return true; + } + } + return false; + } + + public static boolean activeTransaction() { + Boolean active = transactionActive.get(); + Transaction txn = NewRelic.getAgent().getTransaction(); + // if transaction is NoOp then there is no active transaction + boolean isNoOp = txn.getClass().getName().toLowerCase().contains("noop"); + if(!active && !isNoOp) { + setActive(); + active = true; + } else if(active && isNoOp) { + deActivate(); + active = false; + } + return active; + } + + public static void setActive() { + transactionActive.set(true); + } + + public static void deActivate() { + transactionActive.set(false); + } +} diff --git a/reactor-core-3.3/src/main/java/reactor/core/publisher/Flux_instrumentation.java b/reactor-core-3.3/src/main/java/reactor/core/publisher/Flux_instrumentation.java new file mode 100644 index 0000000..c586b2b --- /dev/null +++ b/reactor-core-3.3/src/main/java/reactor/core/publisher/Flux_instrumentation.java @@ -0,0 +1,18 @@ +package reactor.core.publisher; + +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.ReactorUtils; + +@Weave(originalName="reactor.core.publisher.Flux",type = MatchType.BaseClass) +public abstract class Flux_instrumentation { + + protected static Flux onAssembly(Flux source) { + if(!ReactorUtils.initialized) { + ReactorUtils.initialize(); + } + return Weaver.callOriginal(); + } + +} diff --git a/reactor-core-3.3/src/main/java/reactor/core/publisher/Mono_instrumentation.java b/reactor-core-3.3/src/main/java/reactor/core/publisher/Mono_instrumentation.java new file mode 100644 index 0000000..1460861 --- /dev/null +++ b/reactor-core-3.3/src/main/java/reactor/core/publisher/Mono_instrumentation.java @@ -0,0 +1,17 @@ +package reactor.core.publisher; + +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.ReactorUtils; + +@Weave(originalName="reactor.core.publisher.Mono",type = MatchType.BaseClass) +public abstract class Mono_instrumentation { + + protected static Mono onAssembly(Mono source) { + if(!ReactorUtils.initialized) { + ReactorUtils.initialize(); + } + return Weaver.callOriginal(); + } +} diff --git a/reactor-core-3.3/src/main/java/reactor/core/scheduler/Scheduler.java b/reactor-core-3.3/src/main/java/reactor/core/scheduler/Scheduler.java new file mode 100644 index 0000000..dccba83 --- /dev/null +++ b/reactor-core-3.3/src/main/java/reactor/core/scheduler/Scheduler.java @@ -0,0 +1,62 @@ +package reactor.core.scheduler; + +import java.util.concurrent.TimeUnit; + +import com.newrelic.api.agent.NewRelic; +import com.newrelic.api.agent.Trace; +import com.newrelic.api.agent.weaver.MatchType; +import com.newrelic.api.agent.weaver.Weave; +import com.newrelic.api.agent.weaver.Weaver; +import com.nr.instrumentation.reactor.NRReactorHeaders; +import com.nr.instrumentation.reactor.NRRunnableWrapper; + +import reactor.core.Disposable; + +@Weave(type=MatchType.Interface) +public abstract class Scheduler { + + @Trace + public Disposable schedule(Runnable task) { + if(!(task instanceof NRRunnableWrapper)) { + NRReactorHeaders nrHeaders = new NRReactorHeaders(); + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders); + + if(!nrHeaders.isEmpty()) { + NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders); + task = wrapper; + } + } + return Weaver.callOriginal(); + } + + @Weave(type=MatchType.Interface) + public static class Worker { + + public Disposable schedule(Runnable task) { + if(!(task instanceof NRRunnableWrapper)) { + NRReactorHeaders nrHeaders = new NRReactorHeaders(); + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders); + + if(!nrHeaders.isEmpty()) { + NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders); + task = wrapper; + } + } + return Weaver.callOriginal(); + } + + public Disposable schedule(Runnable task, long delay, TimeUnit unit) { + if(!(task instanceof NRRunnableWrapper)) { + NRReactorHeaders nrHeaders = new NRReactorHeaders(); + NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(nrHeaders); + + if(!nrHeaders.isEmpty()) { + NRRunnableWrapper wrapper = new NRRunnableWrapper(task, nrHeaders); + task = wrapper; + } + } + return Weaver.callOriginal(); + } + } + +} diff --git a/reactor-finder/build.gradle b/reactor-finder/build.gradle index eb61e39..6ebb9b1 100644 --- a/reactor-finder/build.gradle +++ b/reactor-finder/build.gradle @@ -7,17 +7,23 @@ dependencies { implementation 'io.projectreactor:reactor-core:3.2.11.RELEASE' // New Relic Java Agent dependencies - implementation 'com.newrelic.agent.java:newrelic-agent:6.0.0' - implementation 'com.newrelic.agent.java:newrelic-api:6.0.0' + implementation 'com.newrelic.agent.java:newrelic-agent:6.4.0' + implementation 'com.newrelic.agent.java:newrelic-api:6.4.0' implementation fileTree(include: ['*.jar'], dir: '../libs') + + testImplementation 'io.projectreactor:reactor-core:3.2.11.RELEASE' + testImplementation 'junit:junit:4.12' + testImplementation fileTree(include: ['*.jar'], dir: '../test-lib') + } jar { manifest { - attributes 'Implementation-Title': 'com.newrelic.instrumentation.reactor-finder' - attributes 'Implementation-Vendor': 'New Relic' - attributes 'Implementation-Vendor-Id': 'com.newrelic' + attributes 'Implementation-Title': 'com.newrelic.instrumentation.labs.reactor-finder' + attributes 'Implementation-Vendor': 'New Relic Labs' + attributes 'Implementation-Vendor-Id': 'com.newrelic.labs' attributes 'Implementation-Version': 1.0 + attributes 'Agent-Class': 'com.newrelic.agent.instrumentation.pointcuts.frameworks.reactor.ReactorPreMain' } } @@ -27,4 +33,4 @@ verifyInstrumentation { // Example: // passes 'javax.servlet:servlet-api:[2.2,2.5]' // exclude 'javax.servlet:servlet-api:2.4.public_draft' -} \ No newline at end of file +} diff --git a/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassMatcher.java b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassMatcher.java index 960cc9c..e1982eb 100644 --- a/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassMatcher.java +++ b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassMatcher.java @@ -2,11 +2,19 @@ import java.util.Collection; import java.util.Collections; +import java.util.logging.Level; import com.newrelic.agent.bridge.AgentBridge; import com.newrelic.agent.deps.org.objectweb.asm.ClassReader; import com.newrelic.agent.instrumentation.classmatchers.ClassMatcher; +import com.newrelic.api.agent.NewRelic; +/** + * Class matcher that matches any class that doesn't belong to reactor.core or its subpackages + * + * @author dhilpipre + * + */ public class ReactorClassMatcher extends ClassMatcher { @Override @@ -15,11 +23,11 @@ public Collection getClassNames() { } @Override - public boolean isMatch(Class var1) { - if(var1.isAnnotation()) return false; + public boolean isMatch(Class clazz) { + if(clazz.isAnnotation()) return false; - Package classPackage = var1.getPackage(); - boolean b = !classPackage.getName().startsWith("reactor.core"); + Package classPackage = clazz.getPackage(); + boolean b = !classPackage.getName().startsWith("reactor.core") && !classPackage.getName().startsWith("reactor.util"); return b; } @@ -30,7 +38,7 @@ public boolean isMatch(ClassLoader loader, ClassReader cr) { } String className = cr.getClassName(); - boolean b = !className.startsWith("reactor/core"); + boolean b = !className.startsWith("reactor/core") && !className.startsWith("reactor/util"); return b; } diff --git a/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassTransformer.java b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassTransformer.java new file mode 100644 index 0000000..b82f326 --- /dev/null +++ b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorClassTransformer.java @@ -0,0 +1,65 @@ +package com.newrelic.agent.instrumentation.pointcuts.frameworks.reactor; + +import java.lang.instrument.IllegalClassFormatException; +import java.security.ProtectionDomain; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; + +import com.newrelic.agent.deps.org.objectweb.asm.commons.Method; +import com.newrelic.agent.instrumentation.InstrumentationType; +import com.newrelic.agent.instrumentation.classmatchers.ClassAndMethodMatcher; +import com.newrelic.agent.instrumentation.classmatchers.OptimizedClassMatcher.Match; +import com.newrelic.agent.instrumentation.classmatchers.OptimizedClassMatcherBuilder; +import com.newrelic.agent.instrumentation.context.ClassMatchVisitorFactory; +import com.newrelic.agent.instrumentation.context.ContextClassTransformer; +import com.newrelic.agent.instrumentation.context.InstrumentationContext; +import com.newrelic.agent.instrumentation.context.InstrumentationContextManager; +import com.newrelic.agent.instrumentation.methodmatchers.MethodMatcher; +import com.newrelic.agent.instrumentation.tracing.TraceDetailsBuilder; +import com.newrelic.api.agent.NewRelic; + +public class ReactorClassTransformer implements ContextClassTransformer { + + private final InstrumentationContextManager contextManager; + private final Map matchers = new HashMap(); + + public ReactorClassTransformer(InstrumentationContextManager mgr) { + contextManager = mgr; + } + + protected void addMatcher(ClassAndMethodMatcher matcher) { + OptimizedClassMatcherBuilder builder = OptimizedClassMatcherBuilder.newBuilder(); + builder.addClassMethodMatcher(matcher); + ClassMatchVisitorFactory matchVisitor = builder.build(); + matchers.put(matcher.getClass().getSimpleName(), matchVisitor); + contextManager.addContextClassTransformer(matchVisitor, this); + } + + protected void removeMatcher(ClassAndMethodMatcher matcher) { + ClassMatchVisitorFactory matchVisitor = matchers.get(matcher.getClass().getSimpleName()); + if(matchVisitor != null) { + contextManager.removeMatchVisitor(matchVisitor); + } + } + + + @Override + public byte[] transform(ClassLoader loader, String className, Class classBeingRedefined, + ProtectionDomain protectionDomain, byte[] classfileBuffer, InstrumentationContext context, Match match) + throws IllegalClassFormatException { + for (Method method : match.getMethods()) { + for (ClassAndMethodMatcher matcher : match.getClassMatches().keySet()) { + if (matcher.getMethodMatcher().matches(MethodMatcher.UNSPECIFIED_ACCESS, method.getName(), + method.getDescriptor(), match.getMethodAnnotations(method))) { + NewRelic.getAgent().getLogger().log(Level.FINE, "Adding Trace annotation to class {0} and method {1}",className, method.getName()); + context.putTraceAnnotation(method, TraceDetailsBuilder.newBuilder().setTracerFactoryName(ReactorPreMain.TRACER_FACTORY_NAME) + .setInstrumentationSourceName("New Relic Labs").setInstrumentationType(InstrumentationType.TraceAnnotation).setDispatcher(true).build()); + } + + } + } + return null; + } + +} diff --git a/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorFactory.java b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorFactory.java new file mode 100644 index 0000000..c88caf4 --- /dev/null +++ b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorFactory.java @@ -0,0 +1,30 @@ +package com.newrelic.agent.instrumentation.pointcuts.frameworks.reactor; + +import java.util.logging.Level; + +import com.newrelic.agent.Transaction; +import com.newrelic.agent.tracers.AbstractTracerFactory; +import com.newrelic.agent.tracers.ClassMethodSignature; +import com.newrelic.agent.tracers.DefaultTracer; +import com.newrelic.agent.tracers.Tracer; +import com.newrelic.agent.tracers.TracerFlags; +import com.newrelic.agent.tracers.metricname.MetricNameFormat; +import com.newrelic.agent.tracers.metricname.SimpleMetricNameFormat; +import com.newrelic.api.agent.NewRelic; + +public class ReactorFactory extends AbstractTracerFactory { + + @Override + public Tracer doGetTracer(Transaction transaction, ClassMethodSignature sig, Object object, Object[] args) { + NewRelic.getAgent().getLogger().log(Level.FINE, "Call to doGetTracer for sig: {0},{1} and object: {2})", sig.getClassName(),sig.getMethodName(),object); + String classname = sig.getClassName(); + String methodName = sig.getMethodName(); + + int flags = DefaultTracer.DEFAULT_TRACER_FLAGS | TracerFlags.DISPATCHER; + + MetricNameFormat metricName = new SimpleMetricNameFormat("Custom/Reactor/ReactorMethod/"+classname+"/"+methodName); + DefaultTracer tracer = new DefaultTracer(transaction, sig, object, metricName, flags); + return tracer; + } + +} diff --git a/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorPreMain.java b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorPreMain.java new file mode 100644 index 0000000..7b7ac94 --- /dev/null +++ b/reactor-finder/src/main/java/com/newrelic/agent/instrumentation/pointcuts/frameworks/reactor/ReactorPreMain.java @@ -0,0 +1,74 @@ +package com.newrelic.agent.instrumentation.pointcuts.frameworks.reactor; + +import java.lang.instrument.Instrumentation; +import java.util.concurrent.Executors; +import java.util.logging.Level; + +import com.newrelic.agent.InstrumentationProxy; +import com.newrelic.agent.TracerService; +import com.newrelic.agent.core.CoreService; +import com.newrelic.agent.instrumentation.ClassTransformerService; +import com.newrelic.agent.instrumentation.classmatchers.ClassAndMethodMatcher; +import com.newrelic.agent.instrumentation.context.InstrumentationContextManager; +import com.newrelic.agent.service.ServiceFactory; +import com.newrelic.api.agent.NewRelic; + +public class ReactorPreMain { + + protected static final String TRACER_FACTORY_NAME = "ReactorReturning"; + + public static void premain(String s, Instrumentation inst) { + + initialize(); + } + + private static void initialize() { + boolean b = setup(); + if(!b) { + Executors.newSingleThreadExecutor().submit(new SetupProcess()); + } + } + + private static boolean setup() { + TracerService tracerService = ServiceFactory.getTracerService(); + ClassTransformerService classTransformerService = ServiceFactory.getClassTransformerService(); + CoreService coreService = ServiceFactory.getCoreService(); + if(classTransformerService != null && coreService != null && tracerService != null) { + + tracerService.registerTracerFactory(TRACER_FACTORY_NAME, new ReactorFactory()); + + InstrumentationContextManager contextMgr = classTransformerService.getContextManager(); + InstrumentationProxy proxy = coreService.getInstrumentation(); + if(contextMgr != null && proxy != null) { + ReactorClassTransformer transformer = new ReactorClassTransformer(contextMgr); + ClassAndMethodMatcher matcher = new ReactorMatcher(); + transformer.addMatcher(matcher); + NewRelic.getAgent().getLogger().log(Level.FINE, "Reactor Finder transformer started"); + return true; + } + } + return false; + } + + private static class SetupProcess implements Runnable { + + + + @Override + public void run() { + boolean setupComplete = false; + + while(!setupComplete) { + setupComplete = setup(); + if(!setupComplete) { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + } + } + } + + } + + } +} diff --git a/reactor-finder/src/test/java/com/example/reactor/test/MonoReturning.java b/reactor-finder/src/test/java/com/example/reactor/test/MonoReturning.java new file mode 100644 index 0000000..23ddb05 --- /dev/null +++ b/reactor-finder/src/test/java/com/example/reactor/test/MonoReturning.java @@ -0,0 +1,39 @@ +package com.example.reactor.test; + +import java.util.HashMap; +import java.util.function.Supplier; + +import reactor.core.publisher.Mono; + +public class MonoReturning { + + private static final HashMap accounts; + + static { + accounts = new HashMap<>(); + accounts.put("Doug", "Doug's Savings"); + accounts.put("Mary", "Mary's Checking"); + accounts.put("Tom", "Tom's CD"); + + } + + public Mono findAccountName(String customer) { + Mono mono = Mono.fromSupplier(new AccountSupplier(customer)); + return mono; + } + + private class AccountSupplier implements Supplier { + + private String customer = null; + + public AccountSupplier(String c) { + customer = c; + } + + @Override + public String get() { + return accounts.get(customer); + } + + } +} diff --git a/reactor-finder/src/test/java/com/newrelic/instrumentation/labs/reactor/ReactorTest.java b/reactor-finder/src/test/java/com/newrelic/instrumentation/labs/reactor/ReactorTest.java new file mode 100644 index 0000000..96c1448 --- /dev/null +++ b/reactor-finder/src/test/java/com/newrelic/instrumentation/labs/reactor/ReactorTest.java @@ -0,0 +1,34 @@ +package com.newrelic.instrumentation.labs.reactor; + +import static org.junit.Assert.assertTrue; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.example.reactor.test.MonoReturning; +import com.newrelic.agent.introspec.InstrumentationTestConfig; +import com.newrelic.agent.introspec.InstrumentationTestRunner; +import com.newrelic.agent.introspec.Introspector; + +import reactor.core.publisher.Mono; + +@RunWith(InstrumentationTestRunner.class) +@InstrumentationTestConfig(includePrefixes = {"reactor.core"}) +public class ReactorTest { + + @Test + public void monoReturnTest() { + MonoReturning returningMono = new MonoReturning(); + String name = "Doug"; + Mono mono = returningMono.findAccountName(name); + String acct = mono.block(); + System.out.println("Found account: "+ acct + " for Customer: "+name); + + Introspector introspector = InstrumentationTestRunner.getIntrospector(); + int finishedTransactionCount = introspector.getFinishedTransactionCount(5000); + //assertTrue(finishedTransactionCount == 1); + System.out.println("Finished transactions: "+ finishedTransactionCount); + + } + +} diff --git a/settings.gradle b/settings.gradle index 3308289..2c80acf 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,4 +1,5 @@ rootProject.name = 'java-instrumentation-template' include 'reactor-core-2.x' include 'reactor-core-3.1' +include 'reactor-core-3.3' include 'reactor-finder'