Skip to content

Commit

Permalink
Add EventToSpanEventBridge
Browse files Browse the repository at this point in the history
  • Loading branch information
jack-berg committed Nov 18, 2024
1 parent 0b7233e commit 20f4ddd
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 2 deletions.
30 changes: 29 additions & 1 deletion processors/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
# Processors

This module provides tools to intercept and process signals globally.
## Interceptable exporters

This module provides tools to intercept and process signals globally:

* `InterceptableSpanExporter`
* `InterceptableMetricExporter`
* `InterceptableLogRecordExporter`

## Event to SpanEvent Bridge

`EventToSpanEventBridge` is a `LogRecordProcessor` which records events (i.e. log records with an `event.name` attribute) as span events for the current span if:

* The log record has a valid span context
* `Span.current()` returns a span where `Span.isRecording()` is true

For details of how the event log record is translated to span event, see [EventToSpanEventBridge Javadoc](./src/main/java/io/opentelemetry/contrib/eventbridge/EventToSpanEventBridge.java).

`EventToSpanEventBridge` can be referenced in [declarative configuration](https://opentelemetry.io/docs/languages/java/configuration/#declarative-configuration) as follows:

```yaml
// Configure tracer provider as usual, omitted for brevity
tracer_provider: ...

logger_provider:
processors:
- event_to_span_event_bridge: {}
```
// TODO(jack-berg): remove "{}" after merging / rle https://github.com/open-telemetry/opentelemetry-java/pull/6891/files
## Component owners
Expand Down
8 changes: 8 additions & 0 deletions processors/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ java {

dependencies {
api("io.opentelemetry:opentelemetry-sdk")
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure-spi")

// For EventToSpanEventBridge
implementation("io.opentelemetry:opentelemetry-exporter-otlp-common")
implementation("com.fasterxml.jackson.core:jackson-core")

testImplementation("io.opentelemetry:opentelemetry-sdk-testing")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
testImplementation("io.opentelemetry:opentelemetry-sdk-extension-incubator")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.eventbridge;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.exporter.internal.marshal.MarshalerWithSize;
import io.opentelemetry.exporter.internal.otlp.AnyValueMarshaler;
import io.opentelemetry.sdk.logs.LogRecordProcessor;
import io.opentelemetry.sdk.logs.ReadWriteLogRecord;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* A processor that records events (i.e. log records with an {@code event.name} attribute) as span
* events for the current span if:
*
* <ul>
* <li>The log record has a valid span context
* <li>{@link Span#current()} returns a span where {@link Span#isRecording()} is true
* </ul>
*
* <p>The event {@link LogRecordData} is converted to attributes on the span event as follows:
*
* <ul>
* <li>{@code event.name} attribute is mapped to span event name
* <li>{@link LogRecordData#getTimestampEpochNanos()} is mapped to span event timestamp
* <li>{@link LogRecordData#getAttributes()} are mapped to span event attributes, excluding {@code
* event.name}
* <li>{@link LogRecordData#getObservedTimestampEpochNanos()} is mapped to span event attribute
* with key {@code log.record.observed_timestamp}
* <li>{@link LogRecordData#getSeverity()} is mapped to span event attribute with key {@code
* log.record.severity_number}
* <li>{@link LogRecordData#getBodyValue()} is mapped to span event attribute with key {@code
* log.record.body}, as an escaped JSON string following the standard protobuf JSON encoding
* <li>{@link LogRecordData#getTotalAttributeCount()} - {@link
* LogRecordData#getAttributes()}.size() is mapped to span event attribute with key {@code
* log.record.dropped_attributes_count}
* </ul>
*/
public final class EventToSpanEventBridge implements LogRecordProcessor {

private static final Logger LOGGER = Logger.getLogger(EventToSpanEventBridge.class.getName());

private static final AttributeKey<String> EVENT_NAME = AttributeKey.stringKey("event.name");
private static final AttributeKey<Long> LOG_RECORD_OBSERVED_TIME_UNIX_NANO =
AttributeKey.longKey("log.record.observed_time_unix_nano");
private static final AttributeKey<Long> LOG_RECORD_SEVERITY_NUMBER =
AttributeKey.longKey("log.record.severity_number");
private static final AttributeKey<String> LOG_RECORD_BODY =
AttributeKey.stringKey("log.record.body");
private static final AttributeKey<Long> LOG_RECORD_DROPPED_ATTRIBUTES_COUNT =
AttributeKey.longKey("log.record.dropped_attributes_count");

private EventToSpanEventBridge() {}

/** Create an instance. */
public static EventToSpanEventBridge create() {
return new EventToSpanEventBridge();
}

@Override
public void onEmit(Context context, ReadWriteLogRecord logRecord) {
LogRecordData logRecordData = logRecord.toLogRecordData();
String eventName = logRecordData.getAttributes().get(EVENT_NAME);
if (eventName == null) {
return;
}
if (!logRecordData.getSpanContext().isValid()) {
return;
}
Span currentSpan = Span.current();
if (!currentSpan.isRecording()) {
return;
}
currentSpan.addEvent(
eventName,
toSpanEventAttributes(logRecordData),
logRecordData.getTimestampEpochNanos(),
TimeUnit.NANOSECONDS);
}

private static Attributes toSpanEventAttributes(LogRecordData logRecord) {
AttributesBuilder builder =
logRecord.getAttributes().toBuilder().removeIf(key -> key.equals(EVENT_NAME));

builder.put(LOG_RECORD_OBSERVED_TIME_UNIX_NANO, logRecord.getObservedTimestampEpochNanos());

builder.put(LOG_RECORD_SEVERITY_NUMBER, logRecord.getSeverity().getSeverityNumber());

// Add bridging for logRecord.getSeverityText() if EventBuilder adds severity text setter

Value<?> body = logRecord.getBodyValue();
if (body != null) {
MarshalerWithSize marshaler = AnyValueMarshaler.create(body);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try {
marshaler.writeJsonTo(baos);
} catch (IOException e) {
LOGGER.log(Level.WARNING, "Error converting log record body to JSON", e);
}
builder.put(LOG_RECORD_BODY, new String(baos.toByteArray(), StandardCharsets.UTF_8));
}

int droppedAttributesCount =
logRecord.getTotalAttributeCount() - logRecord.getAttributes().size();
if (droppedAttributesCount > 0) {
builder.put(LOG_RECORD_DROPPED_ATTRIBUTES_COUNT, droppedAttributesCount);
}

return builder.build();
}

@Override
public String toString() {
return "EventToSpanEventBridge{}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.eventbridge.internal;

import io.opentelemetry.contrib.eventbridge.EventToSpanEventBridge;
import io.opentelemetry.sdk.autoconfigure.spi.internal.ComponentProvider;
import io.opentelemetry.sdk.autoconfigure.spi.internal.StructuredConfigProperties;
import io.opentelemetry.sdk.logs.LogRecordProcessor;

/**
* Declarative configuration SPI implementation for {@link EventToSpanEventBridge}.
*
* <p>This class is internal and is hence not for public use. Its APIs are unstable and can change
* at any time.
*/
public class EventToSpanEventBridgeComponentProvider
implements ComponentProvider<LogRecordProcessor> {

@Override
public Class<LogRecordProcessor> getType() {
return LogRecordProcessor.class;
}

@Override
public String getName() {
return "event_to_span_event_bridge";
}

@Override
public LogRecordProcessor create(StructuredConfigProperties config) {
return EventToSpanEventBridge.create();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
io.opentelemetry.contrib.eventbridge.internal.EventToSpanEventBridgeComponentProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.contrib.eventbridge;

import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.common.Value;
import io.opentelemetry.api.incubator.events.EventLogger;
import io.opentelemetry.api.logs.Severity;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.internal.SdkEventLoggerProvider;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.testing.time.TestClock;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

class EventToSpanEventBridgeTest {

private final InMemorySpanExporter spanExporter = InMemorySpanExporter.create();
private final SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.setSampler(onlyServerSpans())
.addSpanProcessor(SimpleSpanProcessor.create(spanExporter))
.build();
private final TestClock testClock = TestClock.create();
private final SdkEventLoggerProvider eventLoggerProvider =
SdkEventLoggerProvider.create(
SdkLoggerProvider.builder()
.setClock(testClock)
.addLogRecordProcessor(EventToSpanEventBridge.create())
.build());
private final Tracer tracer = tracerProvider.get("tracer");
private final EventLogger eventLogger = eventLoggerProvider.get("event-logger");

private static Sampler onlyServerSpans() {
return new Sampler() {
@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks) {
return SpanKind.SERVER.equals(spanKind)
? SamplingResult.recordAndSample()
: SamplingResult.drop();
}

@Override
public String getDescription() {
return "description";
}
};
}

@Test
void withRecordingSpan_BridgesEvent() {
testClock.setTime(Instant.ofEpochMilli(1));

Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.SERVER).startSpan();
try (Scope unused = span.makeCurrent()) {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
} finally {
span.end();
}

assertThat(spanExporter.getFinishedSpanItems())
.satisfiesExactly(
spanData ->
assertThat(spanData)
.hasName("span")
.hasEventsSatisfyingExactly(
spanEvent ->
spanEvent
.hasName("my.event-name")
.hasTimestamp(100, TimeUnit.NANOSECONDS)
.hasAttributesSatisfying(
attributes -> {
assertThat(attributes.get(stringKey("color")))
.isEqualTo("red");
assertThat(attributes.get(stringKey("shape")))
.isEqualTo("square");
assertThat(
attributes.get(
longKey("log.record.observed_time_unix_nano")))
.isEqualTo(1000000L);
assertThat(
attributes.get(longKey("log.record.severity_number")))
.isEqualTo(Severity.DEBUG.getSeverityNumber());
assertThat(attributes.get(stringKey("log.record.body")))
.isEqualTo(
"{\"kvlistValue\":{\"values\":[{\"key\":\"number\",\"value\":{\"intValue\":\"1\"}},{\"key\":\"foo\",\"value\":{\"stringValue\":\"bar\"}},{\"key\":\"map\",\"value\":{\"kvlistValue\":{\"values\":[{\"key\":\"key\",\"value\":{\"stringValue\":\"value\"}}]}}}]}}");
})));
}

@Test
void noSpan_doesNotBridgeEvent() {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();

assertThat(spanExporter.getFinishedSpanItems()).isEmpty();
}

@Test
void nonRecordingSpan_doesNotBridgeEvent() {
Span span = tracer.spanBuilder("span").setSpanKind(SpanKind.INTERNAL).startSpan();
try (Scope unused = span.makeCurrent()) {
eventLogger
.builder("my.event-name")
.setTimestamp(100, TimeUnit.NANOSECONDS)
.setSeverity(Severity.DEBUG)
.put("foo", "bar")
.put("number", 1)
.put("map", Value.of(Collections.singletonMap("key", Value.of("value"))))
.setAttributes(Attributes.builder().put("color", "red").build())
.setAttributes(Attributes.builder().put("shape", "square").build())
.emit();
} finally {
span.end();
}

assertThat(spanExporter.getFinishedSpanItems()).isEmpty();
}
}
Loading

0 comments on commit 20f4ddd

Please sign in to comment.