Skip to content

Commit

Permalink
Checkpoint Otel conversion.
Browse files Browse the repository at this point in the history
 - Rely on TracingHelper to ensure Otel isn't inadvertently bootstrapped at start.

Signed-off-by: Ryan Lubke <[email protected]>
  • Loading branch information
rlubke committed Jan 21, 2025
1 parent 6aae82f commit 7b01e85
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,16 @@
import com.oracle.bedrock.runtime.options.DisplayName;

import com.oracle.coherence.common.base.Logger;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.Cluster;
import com.tangosol.net.Coherence;
import com.tangosol.net.Member;

import com.tangosol.util.ResourceRegistry;
import com.tangosol.internal.tracing.TracingHelper;
import com.tangosol.internal.tracing.Span;

import io.opentelemetry.api.trace.Span;
import com.tangosol.util.ResourceRegistry;

import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
Expand Down Expand Up @@ -129,7 +131,8 @@ public Response createMember(@PathParam("serverCount") int serverCount) {
SystemProperty.of(Launcher.SECONDARY_CLUSTER_PROPERTY,
System.getProperty(Launcher.SECONDARY_CLUSTER_PROPERTY)),
JvmOptions.include(newArguments.toArray(new String[0])));
Span span = Span.current();

Span span = TracingHelper.getActiveSpan();
Utilities.spanLog(span, "Starting new member");

// wait for the new cache server to join the cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,10 @@

import java.net.URI;

import io.opentelemetry.api.GlobalOpenTelemetry;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.api.trace.Tracer;

import io.opentelemetry.context.Scope;
import com.tangosol.internal.tracing.Scope;
import com.tangosol.internal.tracing.Span;
import com.tangosol.internal.tracing.Tracer;
import com.tangosol.internal.tracing.TracingHelper;

import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.container.ContainerRequestFilter;
Expand Down Expand Up @@ -68,31 +64,36 @@ public class TracingFilter

@Override
public void filter(ContainerRequestContext context) {
Tracer tracer = GlobalOpenTelemetry.getTracer("coherence.demo");
Span span = tracer.spanBuilder(getOperationName())
.setSpanKind(SpanKind.SERVER)
.setAttribute("component", JAXRS)
.setAttribute("http.method", context.getMethod())
.setAttribute("http.url", getURL(context)).startSpan();

store(context, SPAN_KEY, span);
store(context, SCOPE_KEY, span.makeCurrent());
if (TracingHelper.isEnabled()) {
Tracer tracer = TracingHelper.getTracer();
Span span = tracer.spanBuilder(getOperationName())
.withMetadata("span.kind", "server")
.withMetadata("component", JAXRS)
.withMetadata("http.method", context.getMethod())
.withMetadata("http.url", getURL(context)).startSpan();

store(context, SPAN_KEY, span);
store(context, SCOPE_KEY, tracer.withSpan(span));
}
}

@Override
public void filter(ContainerRequestContext requestContext,
ContainerResponseContext responseContext) {
Span span = load(requestContext, SPAN_KEY);
Scope scope = load(requestContext, SCOPE_KEY);
Span span = load(requestContext, SPAN_KEY);

if (responseContext.getStatusInfo().getFamily() == Response.Status.Family.SERVER_ERROR) {
span.setStatus(StatusCode.ERROR);
}
if (span != null) {
Scope scope = load(requestContext, SCOPE_KEY);

if (responseContext.getStatusInfo().getFamily() == Response.Status.Family.SERVER_ERROR) {
span.setMetadata("error", true);
}

span.setAttribute("http.status_code", responseContext.getStatus());
span.setMetadata("http.status_code", responseContext.getStatus());

span.end();
scope.close();
span.end();
scope.close();
}
}

/**
Expand Down
159 changes: 108 additions & 51 deletions src/main/java/com/oracle/coherence/demo/application/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import com.oracle.coherence.demo.model.Price;
import com.oracle.coherence.demo.model.Trade;

import com.tangosol.internal.tracing.Span;
import com.tangosol.internal.tracing.Scope;
import com.tangosol.internal.tracing.Tracer;
import com.tangosol.internal.tracing.TracingHelper;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.Coherence;
import com.tangosol.net.NamedCache;
Expand All @@ -31,12 +36,6 @@

import com.tangosol.util.Processors;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;

import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -245,24 +244,34 @@ public static int getCoherenceVersionAsInt()
* Add indexes to the caches to improve query performance.
*/
public static void addIndexes() {
NamedCache<String, Trade> tradesCache = getTradesCache();
Tracer tracer = GlobalOpenTelemetry.getTracer("coherence.demo");
Span span = tracer.spanBuilder("Utilities.AddIndexes")
.setSpanKind(SpanKind.SERVER)
.setAttribute("Component", "demo")
.startSpan();
NamedCache<String, Trade> tradesCache = getTradesCache();
Span span = null;
Scope scope = null;

if (TracingHelper.isEnabled()) {
Tracer tracer = TracingHelper.getTracer();
span = tracer.spanBuilder("Utilities.AddIndexes")
.withMetadata("span.kind", "server")
.withMetadata("Component", "demo")
.startSpan();
scope = tracer.withSpan(span);
}

Logger.out("Adding Indexes...");
try (Scope ignored = span.makeCurrent()) {
try {
tradesCache.addIndex(Trade::getSymbol, true, null);
spanLog(span, "Created trade symbol index");
tradesCache.addIndex(Trade::getPurchaseValue, false, null);
spanLog(span, "Created trade purchase value index");
tradesCache.addIndex(Trade::getQuantity, false, null);
spanLog(span, "Created trade amount index");
}
finally {
span.end();
} finally {
if (scope != null) {
scope.close();
}
if (span != null) {
span.end();
}
}
Logger.out(" Done");
}
Expand All @@ -273,14 +282,20 @@ public static void addIndexes() {
*/
public static void removeIndexes() {
NamedCache<String, Trade> tradesCache = getTradesCache();
Tracer tracer = GlobalOpenTelemetry.getTracer("coherence.demo");
Span span = tracer.spanBuilder("Utilities.RemoveIndexes")
.setSpanKind(SpanKind.SERVER)
.setAttribute("Component", "demo")
.startSpan();
Span span = null;
Scope scope = null;

if (TracingHelper.isEnabled()) {
Tracer tracer = TracingHelper.getTracer();
span = tracer.spanBuilder("Utilities.RemoveIndexes")
.withMetadata("span.kind", "server")
.withMetadata("Component", "demo")
.startSpan();
scope = tracer.withSpan(span);
}

Logger.out("Removing Indexes...");
try (Scope ignored = span.makeCurrent()) {
try {
tradesCache.removeIndex(Trade::getSymbol);
spanLog(span, "Removed trade symbol index");
tradesCache.removeIndex(Trade::getPurchaseValue);
Expand All @@ -289,7 +304,12 @@ public static void removeIndexes() {
spanLog(span, "Removed trade amount index");
}
finally {
span.end();
if (scope != null) {
scope.close();
}
if (span != null) {
span.end();
}
}

Logger.out(" Done");
Expand All @@ -302,21 +322,32 @@ public static void removeIndexes() {
*/
public static void populatePrices() {
NamedCache<String, Price> pricesCaches = getPricesCache();
Tracer tracer = GlobalOpenTelemetry.getTracer("coherence.demo");
Span span = tracer.spanBuilder("Utilities.PopulatePrices")
.setSpanKind(SpanKind.SERVER)
.setAttribute("Component", "demo")
.setAttribute("symbol.count", SYMBOLS.length)
.startSpan();

try (Scope ignored = span.makeCurrent()) {
Span span = null;
Scope scope = null;

if (TracingHelper.isEnabled()) {
Tracer tracer = TracingHelper.getTracer();
span = tracer.spanBuilder("Utilities.PopulatePrices")
.withMetadata("span.kind", "server")
.withMetadata("Component", "demo")
.withMetadata("symbol.count", SYMBOLS.length)
.startSpan();
scope = tracer.withSpan(span);
}

try {
for (String symbol : SYMBOLS) {
Price price = new Price(symbol, INITIAL_PRICE);
pricesCaches.put(price.getSymbol(), price);
}
}
finally {
span.end();
if (scope != null) {
scope.close();
}
if (span != null) {
span.end();
}
}
}

Expand Down Expand Up @@ -366,16 +397,22 @@ public static void createPositions(String symbolToInsert, int count)

NamedCache<String, Trade> tradesCache = getTradesCache();
NamedCache<String, Price> priceCache = getPricesCache();
Tracer tracer = GlobalOpenTelemetry.getTracer("coherence.demo");
Span span = tracer.spanBuilder("Utilities.CreatePositions")
.setSpanKind(SpanKind.SERVER)
.setAttribute("Component", "demo")
.setAttribute("symbol.count", SYMBOLS.length)
.startSpan();
Span span = null;
Scope scope = null;

if (TracingHelper.isEnabled()) {
Tracer tracer = TracingHelper.getTracer();
span = tracer.spanBuilder("Utilities.CreatePositions")
.withMetadata("span.kind", "server")
.withMetadata("Component", "demo")
.withMetadata("symbol.count", SYMBOLS.length)
.startSpan();
scope = tracer.withSpan(span);
}

boolean singleSymbol = symbolToInsert != null;

try (Scope ignored = span.makeCurrent())
try
{
Map<String, Price> localPrices = new HashMap<>(priceCache.getAll(priceCache.keySet()));
HashMap<String, Trade> trades = new HashMap<>();
Expand Down Expand Up @@ -410,7 +447,14 @@ public static void createPositions(String symbolToInsert, int count)
}
finally
{
span.end();
if (scope != null)
{
scope.close();
}
if (span != null)
{
span.end();
}
}

Logger.out(String.format("Creation Complete! (Cache contains %d positions) ", tradesCache.size()));
Expand All @@ -427,27 +471,40 @@ public static void updatePrices()

// choose random symbol to modify
String symbol = SYMBOLS[random.nextInt(SYMBOLS.length)];
Tracer tracer = GlobalOpenTelemetry.getTracer("coherence.demo");
Span span = tracer.spanBuilder("Utilities.UpdatePrices")
.setSpanKind(SpanKind.SERVER)
.setAttribute("Component", "demo")
.setAttribute("update.symbol", symbol)
.startSpan();

try (Scope ignored = span.makeCurrent())
Span span = null;
Scope scope = null;

if (TracingHelper.isEnabled()) {
Tracer tracer = TracingHelper.getTracer();
span = tracer.spanBuilder("Utilities.UpdatePrices")
.withMetadata("span.kind", "server")
.withMetadata("Component", "demo")
.withMetadata("update.symbol", symbol)
.startSpan();
scope = tracer.withSpan(span);
}

try
{
// invoke using static method to ensure all arguments are captured
priceCache.invoke(symbol, updateStockPrice(random.nextFloat()));
}
finally
{
span.end();
if (scope != null)
{
scope.close();
}
if (span != null)
{
span.end();
}
}
}


/**
* Invokes {@link Span#addEvent(String)} if {@code span} is not {@code null}.
* Invokes {@link Span#log(String)} if {@code span} is not {@code null}.
*
* @param span the target {@link Span}
* @param message the message to log
Expand All @@ -456,7 +513,7 @@ public static void spanLog(Span span, String message)
{
if (span != null)
{
span.addEvent(message);
span.log(message);
}
}

Expand Down

0 comments on commit 7b01e85

Please sign in to comment.