Skip to content

Commit

Permalink
Merge pull request #7 from newrelic-experimental/useDT-Update
Browse files Browse the repository at this point in the history
reinstate DT
  • Loading branch information
dhilpipre authored Sep 1, 2023
2 parents e4f3247 + e82c627 commit 3067a1a
Show file tree
Hide file tree
Showing 33 changed files with 1,418 additions and 79 deletions.
26 changes: 26 additions & 0 deletions reactor-core-2.x/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

// Build.gradle generated for instrumentation module reactor-core-2.x

apply plugin: 'java'

dependencies {
implementation 'io.projectreactor:reactor-core:2.0.1.RELEASE'

// New Relic Java Agent dependencies
implementation 'com.newrelic.agent.java:newrelic-agent:6.0.0'
implementation 'com.newrelic.agent.java:newrelic-api:6.0.0'
implementation fileTree(include: ['*.jar'], dir: '../libs')
}

jar {
manifest {
attributes 'Implementation-Title': 'com.newrelic.instrumentation.labs.reactor-core-2.x'
attributes 'Implementation-Vendor': 'New Relic Labs'
attributes 'Implementation-Vendor-Id': 'com.newrelic.labs'
attributes 'Implementation-Version': 1.0
}
}

verifyInstrumentation {
passes 'io.projectreactor:reactor-core:[2.0.1.RELEASE]'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.nr.instrumentation.reactor;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.Trace;

public class NRRunnableWrapper implements Runnable {

private Runnable delegate = null;

private Token token = null;
private static boolean isTransformed = false;

public NRRunnableWrapper(Runnable r, Token t) {
delegate = r;
token = t;
if(!isTransformed) {
isTransformed = true;
AgentBridge.instrumentation.retransformUninstrumentedClass(getClass());
}
}

@Override
@Trace(async=true)
public void run() {
if(token != null) {
token.linkAndExpire();
token = null;
}
if(delegate != null) {
delegate.run();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package reactor.core.dispatch;

import com.newrelic.api.agent.weaver.MatchType;
import com.newrelic.api.agent.weaver.Weave;

@Weave(type=MatchType.BaseClass)
public abstract class AbstractLifecycleDispatcher {

}
12 changes: 6 additions & 6 deletions reactor-core-3.1/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
apply plugin: 'java'

dependencies {
implementation 'io.projectreactor:reactor-core:3.2.11.RELEASE'
implementation 'io.projectreactor:reactor-core:3.1.0.RELEASE'

// New Relic Java Agent dependencies
implementation 'com.newrelic.agent.java:newrelic-agent:6.4.0'
Expand All @@ -19,13 +19,13 @@ dependencies {

jar {
manifest {
attributes 'Implementation-Title': 'com.newrelic.instrumentation.reactor-core-3.1'
attributes 'Implementation-Vendor': 'New Relic'
attributes 'Implementation-Vendor-Id': 'com.newrelic'
attributes 'Implementation-Title': 'com.newrelic.instrumentation.labs.reactor-core-3.1'
attributes 'Implementation-Vendor': 'New Relic Labs'
attributes 'Implementation-Vendor-Id': 'com.newrelic.labs'
attributes 'Implementation-Version': 1.0
}
}

verifyInstrumentation {
passes 'io.projectreactor:reactor-core:[3.1.0.RELEASE,)'
}
passes 'io.projectreactor:reactor-core:[3.1.0.RELEASE,3.3.0.RELEASE)'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.nr.instrumentation.reactor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;

import com.newrelic.api.agent.HeaderType;
import com.newrelic.api.agent.Headers;

public class NRReactorHeaders implements Headers {

private HashMap<String, String> headers = new HashMap<String, String>();

@Override
public HeaderType getHeaderType() {
return HeaderType.MESSAGE;
}

@Override
public String getHeader(String name) {
return headers.get(name);
}

@Override
public Collection<String> getHeaders(String name) {
String value = headers.get(name);
List<String> list = new ArrayList<String>();
if(value != null) {
list.add(value);
}
return list;
}

@Override
public void setHeader(String name, String value) {
headers.put(name, value);
}

@Override
public void addHeader(String name, String value) {
headers.put(name, value);
}

@Override
public Collection<String> getHeaderNames() {
return headers.keySet();
}

@Override
public boolean containsHeader(String name) {
return headers.containsKey(name);
}

public boolean isEmpty() {
return headers.isEmpty();
}

public void clear() {
headers.clear();
}
}
Original file line number Diff line number Diff line change
@@ -1,31 +1,38 @@
package com.nr.instrumentation.reactor;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.Token;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.TransportType;

public class NRRunnableWrapper implements Runnable {

private Runnable delegate = null;

private Token token = null;
private NRReactorHeaders headers;
private static boolean isTransformed = false;

public NRRunnableWrapper(Runnable r, Token t) {
public NRRunnableWrapper(Runnable r, NRReactorHeaders h) {
delegate = r;
token = t;
headers = h;
if(!isTransformed) {
isTransformed = true;
AgentBridge.instrumentation.retransformUninstrumentedClass(getClass());
}
}

@Override
@Trace(async=true)
@Trace(dispatcher=true)
public void run() {
if(token != null) {
token.linkAndExpire();
token = null;
boolean ignore = true;
if(headers != null) {
if(!headers.isEmpty()) {
NewRelic.getAgent().getTransaction().acceptDistributedTraceHeaders(TransportType.Other, headers);
ignore = false;
}
}
if(ignore) {
NewRelic.getAgent().getTransaction().ignore();
}
if(delegate != null) {
delegate.run();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.nr.instrumentation.reactor;

import java.util.logging.Level;

import org.reactivestreams.Subscription;

import com.newrelic.agent.bridge.AgentBridge;
import com.newrelic.api.agent.NewRelic;
import com.newrelic.api.agent.Trace;
import com.newrelic.api.agent.Transaction;

import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Fuseable.QueueSubscription;
import reactor.core.Scannable;

public class NRSubscriberWrapper<T> implements CoreSubscriber<T>, QueueSubscription<T> {

private CoreSubscriber<T> actual = null;

private static boolean isTransformed = false;

private Subscription subscription = null;

private NRReactorHeaders headers = null;

private String name = null;

public NRSubscriberWrapper(CoreSubscriber<T> sub, Scannable s) {
if(!isTransformed) {
AgentBridge.instrumentation.retransformUninstrumentedClass(getClass());
isTransformed = true;
}
actual = sub;
name = s.name();
if(name == null || name.isEmpty()) name = "Scannable";
headers = new NRReactorHeaders();
Transaction transaction = NewRelic.getAgent().getTransaction();
NewRelic.getAgent().getLogger().log(Level.FINE, "In NRSubscriberWrapper.<init>, transaction is {0}", transaction);
if (transaction != null) {
transaction.insertDistributedTraceHeaders(headers);
}

}

@Override
public void onNext(T t) {
if(!ReactorUtils.activeTransaction()) {
ReactorDispatcher.startOnNextTransaction(name, actual, t, headers);
} else {
actual.onNext(t);
}
}

@Override
public void onError(Throwable t) {
if(!ReactorUtils.activeTransaction()) {
ReactorDispatcher.startOnErrorTransaction(name, actual, headers,t);
} else {
ReactorUtils.deActivate();
actual.onError(t);
}
}

@Override
public void onComplete() {
if(!ReactorUtils.activeTransaction()) {
ReactorDispatcher.startOnCompleteTransaction(name, actual, headers);
} else {
ReactorUtils.deActivate();
actual.onComplete();
}
}

@Override
public void onSubscribe(Subscription s) {
if(headers == null) {
headers = new NRReactorHeaders();
}
if(headers.isEmpty()) {
NewRelic.getAgent().getTransaction().insertDistributedTraceHeaders(headers);
}
subscription = s;
actual.onSubscribe(this);
}

@Override
@Trace
public void request(long n) {
NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"request");
subscription.request(n);
}

@Override
@Trace
public void cancel() {
NewRelic.getAgent().getTracedMethod().setMetricName("Custom","Reactor",name,"cancel");
subscription.cancel();
}

@Override
public T poll() {
return null;
}

@Override
public int size() {
return 0;
}

@Override
public boolean isEmpty() {
return true;
}

@Override
public void clear() {

}

@Override
public int requestFusion(int requestedMode) {
return Fuseable.NONE;
}

}
Loading

0 comments on commit 3067a1a

Please sign in to comment.