Skip to content

Commit

Permalink
Merge pull request #6 from Workiva/collector-exporter
Browse files Browse the repository at this point in the history
O11Y-992: OTEL Dart: Implement OTLP HTTP Reporter
  • Loading branch information
rmconsole7-wk authored Aug 25, 2021
2 parents 80cdcfc + d56bd58 commit 587cb9c
Show file tree
Hide file tree
Showing 29 changed files with 312 additions and 83 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.dart_tool
.packages
pubspec.lock
pubspec.lock
lib/src/sdk/trace/exporters/opentelemetry
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "lib/src/sdk/trace/exporters/opentelemetry-proto"]
path = lib/src/sdk/trace/exporters/opentelemetry-proto
url = [email protected]:open-telemetry/opentelemetry-proto.git
21 changes: 14 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
deps:
@pub get
init:
git submodule update --init
pub get
pub global activate protoc_plugin
cd lib/src/sdk/trace/exporters && \
protoc --proto_path opentelemetry-proto \
--dart_out . \
opentelemetry-proto/opentelemetry/proto/common/v1/common.proto \
opentelemetry-proto/opentelemetry/proto/collector/trace/v1/trace_service.proto \
opentelemetry-proto/opentelemetry/proto/trace/v1/trace.proto \
opentelemetry-proto/opentelemetry/proto/resource/v1/resource.proto

analyze-lib:
analyze:
@dart analyze ./lib

analyze-test:
@dart analyze ./test

test: analyze-lib
test:
@dart test ./test --chain-stack-traces

.PHONY: deps analyze-lib analyze-test test
.PHONY: init analyze test
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,8 @@ main() async {
span.end();
}
```
```

## Development

In order to generate protobuf definitions, you must have [protoc](https://github.com/protocolbuffers/protobuf/releases) installed and available in your path.
3 changes: 3 additions & 0 deletions analysis_options.yaml
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
include: package:workiva_analysis_options/v1.recommended.yaml
analyzer:
exclude:
- src/sdk/trace/exporters/opentelemetry
3 changes: 3 additions & 0 deletions lib/sdk.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
export 'src/sdk/trace/exporters/collector_exporter.dart'
show
CollectorExporter;
export 'src/sdk/trace/exporters/console_exporter.dart'
show
ConsoleExporter;
Expand Down
4 changes: 2 additions & 2 deletions lib/src/api/trace/id_generator.dart
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
/// Generator capable of creating OTel compliant IDs.
abstract class IdGenerator {
/// Generate an ID for a Span.
String generateSpanId();
List<int> generateSpanId();

/// Generate an ID for a trace.
String generateTraceId();
List<int> generateTraceId();
}
14 changes: 9 additions & 5 deletions lib/src/api/trace/span.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'package:opentelemetry/src/api/trace/span_status.dart';

import 'package:fixnum/fixnum.dart';
import 'span_context.dart';
import 'span_status.dart';
import 'tracer.dart';

/// A representation of a single operation within a trace.
///
Expand All @@ -17,13 +18,13 @@ abstract class Span {
SpanContext get spanContext;

/// Get the time when the span was closed, or null if still open.
int get endTime;
Int64 get endTime;

/// Get the time when the span was started.
int get startTime;
Int64 get startTime;

/// The parent span id.
String get parentSpanId;
List<int> get parentSpanId;

/// The name of the span.
String get name;
Expand All @@ -40,6 +41,9 @@ abstract class Span {
/// Retrieve the status of the [Span].
SpanStatus get status;

/// Tracer responsible for creating the [Span].
Tracer get tracer;

/// Marks the end of this span's execution.
void end();
}
4 changes: 2 additions & 2 deletions lib/src/api/trace/span_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import 'trace_state.dart';
/// Representation of the context of the context of an individual span.
abstract class SpanContext {
/// Get the ID of the span.
String get spanId;
List<int> get spanId;

/// Get the ID of the trace the span is a part of.
String get traceId;
List<int> get traceId;

/// Get the state of the entire trace.
TraceState get traceState;
Expand Down
3 changes: 3 additions & 0 deletions lib/src/api/trace/tracer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,7 @@ abstract class Tracer {
/// Starts a new [Span] without setting it as the current span in this
/// tracer's context.
Span startSpan(String name, {Context context});

/// The Tracer's name.
String get name;
}
82 changes: 82 additions & 0 deletions lib/src/sdk/trace/exporters/collector_exporter.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import 'package:http/http.dart' as http;

import '../../../api/trace/span.dart';
import '../../../api/trace/span_status.dart';

import 'opentelemetry/proto/collector/trace/v1/trace_service.pb.dart';
import 'opentelemetry/proto/common/v1/common.pb.dart';
import 'opentelemetry/proto/resource/v1/resource.pb.dart';
import 'opentelemetry/proto/trace/v1/trace.pb.dart' as pb;
import 'span_exporter.dart';

class CollectorExporter implements SpanExporter {
Uri uri;
http.Client client;
var _isShutdown = false;

CollectorExporter(this.uri, {http.Client httpClient}) {
client = httpClient ?? http.Client();
}

@override
void export(List<Span> spans) {
if (_isShutdown) {
return;
}

if (spans.isEmpty) {
return;
}

final pbSpans = <pb.Span>[];
for (var i = 0; i < spans.length; i++) {
pbSpans.add(_spanToProtobuf(spans[i]));
}

final body = ExportTraceServiceRequest(resourceSpans: [
pb.ResourceSpans(
resource: Resource(attributes: [
KeyValue(
key: 'service.name',
value: AnyValue(stringValue: spans[0].tracer.name))
]),
instrumentationLibrarySpans: [
pb.InstrumentationLibrarySpans(spans: pbSpans)
])
]);

client.post(uri,
body: body.writeToBuffer(),
headers: {'Content-Type': 'application/x-protobuf'});
}

pb.Span _spanToProtobuf(Span span) {
pb.Status_StatusCode statusCode;

switch (span.status.code) {
case StatusCode.UNSET:
statusCode = pb.Status_StatusCode.STATUS_CODE_UNSET;
break;
case StatusCode.ERROR:
statusCode = pb.Status_StatusCode.STATUS_CODE_ERROR;
break;
case StatusCode.OK:
statusCode = pb.Status_StatusCode.STATUS_CODE_OK;
break;
}

return pb.Span(
traceId: span.spanContext.traceId,
spanId: span.spanContext.spanId,
parentSpanId: span.parentSpanId,
name: span.name,
startTimeUnixNano: span.startTime * 1000,
endTimeUnixNano: span.endTime * 1000,
status: pb.Status(code: statusCode, message: span.status.description));
}

@override
void shutdown() {
_isShutdown = true;
}
}
12 changes: 9 additions & 3 deletions lib/src/sdk/trace/exporters/console_exporter.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@ class ConsoleExporter implements SpanExporter {
for (var i=0; i < spans.length; i++) {
final span = spans[i];
print({
'traceId': span.spanContext.traceId,
'parentId': span.parentSpanId,
'traceId': span.spanContext.traceId
.map((x) => x.toRadixString(16).padLeft(2, '0'))
.join(),
'parentId': span.parentSpanId
.map((x) => x.toRadixString(16).padLeft(2, '0'))
.join(),
'name': span.name,
'id': span.spanContext.spanId,
'id': span.spanContext.spanId
.map((x) => x.toRadixString(16).padLeft(2, '0'))
.join(),
'timestamp': span.startTime,
'duration': span.endTime - span.startTime,
'status': span.status.code
Expand Down
1 change: 1 addition & 0 deletions lib/src/sdk/trace/exporters/opentelemetry-proto
Submodule opentelemetry-proto added at 867249
12 changes: 6 additions & 6 deletions lib/src/sdk/trace/id_generator.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ import '../../../src/api/trace/id_generator.dart' as api;
class IdGenerator implements api.IdGenerator {
static final Random _random = Random.secure();

static String _generateId(int byteLength) {
final buffer = StringBuffer();
static List<int> _generateId(int byteLength) {
final buffer = [];
for (var i = 0; i < byteLength; i++) {
buffer.write(_random.nextInt(256).toRadixString(16).padLeft(2, '0'));
buffer.add(_random.nextInt(256));
}
return buffer.toString();
return buffer.cast<int>();
}

@override
String generateSpanId() => _generateId(8);
List<int> generateSpanId() => _generateId(8);

@override
String generateTraceId() => _generateId(16);
List<int> generateTraceId() => _generateId(16);
}
31 changes: 17 additions & 14 deletions lib/src/sdk/trace/span.dart
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
import 'package:fixnum/fixnum.dart';

import '../../api/trace/span.dart' as span_api;
import '../../api/trace/span_context.dart';
import '../../api/trace/span_status.dart';
import '../../api/trace/tracer.dart';
import 'span_processors/span_processor.dart';

/// A representation of a single operation within a trace.
class Span implements span_api.Span {
int _startTime;
int _endTime;
final String _parentSpanId;
Int64 _startTime;
Int64 _endTime;
final List<int> _parentSpanId;
final SpanContext _spanContext;
final SpanStatus _status = SpanStatus();
final List<SpanProcessor> _processors;
final Tracer _tracer;

@override
String name;

/// Construct a [Span].
Span(
this.name,
this._spanContext,
this._parentSpanId,
this._processors
) {
_startTime = DateTime.now().toUtc().microsecondsSinceEpoch;
Span(this.name, this._spanContext, this._parentSpanId, this._processors,
this._tracer) {
_startTime = Int64(DateTime.now().toUtc().microsecondsSinceEpoch);
for (var i = 0; i < _processors.length; i++) {
_processors[i].onStart();
}
Expand All @@ -32,17 +32,17 @@ class Span implements span_api.Span {
SpanContext get spanContext => _spanContext;

@override
int get endTime => _endTime;
Int64 get endTime => _endTime;

@override
int get startTime => _startTime;
Int64 get startTime => _startTime;

@override
String get parentSpanId => _parentSpanId;
List<int> get parentSpanId => _parentSpanId;

@override
void end() {
_endTime ??= DateTime.now().toUtc().microsecondsSinceEpoch;
_endTime ??= Int64(DateTime.now().toUtc().microsecondsSinceEpoch);
for (var i = 0; i < _processors.length; i++) {
_processors[i].onEnd(this);
}
Expand All @@ -66,4 +66,7 @@ class Span implements span_api.Span {

@override
SpanStatus get status => _status;

@override
Tracer get tracer => _tracer;
}
8 changes: 4 additions & 4 deletions lib/src/sdk/trace/span_context.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,15 @@ import '../../../src/api/trace/trace_state.dart';

/// Representation of the context of the context of an individual span.
class SpanContext implements span_context_api.SpanContext {
final String _spanId;
final String _traceId;
final List<int> _spanId;
final List<int> _traceId;
final TraceState _traceState;

@override
String get spanId => _spanId;
List<int> get spanId => _spanId;

@override
String get traceId => _traceId;
List<int> get traceId => _traceId;

@override
TraceState get traceState => _traceState;
Expand Down
8 changes: 4 additions & 4 deletions lib/src/sdk/trace/span_processors/batch_processor.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ class BatchSpanProcessor implements SpanProcessor {
final List<Span> _spanBuffer = [];
Timer _timer;

int _maxExportBatchSize = 512;
int _maxExportBatchSize;
final int _maxQueueSize = 2048;
int _scheduledDelay = 5000;
int _scheduledDelay;

BatchSpanProcessor(this._exporter, {int maxExportBatchSize, int scheduledDelay}) {
_maxExportBatchSize = maxExportBatchSize;
_scheduledDelay = scheduledDelay;
_maxExportBatchSize = maxExportBatchSize ?? 512;
_scheduledDelay = scheduledDelay ?? 5000;
}

@override
Expand Down
12 changes: 8 additions & 4 deletions lib/src/sdk/trace/tracer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,21 @@ import 'span_processors/span_processor.dart';

/// An interface for creating [Span]s and propagating context in-process.
class Tracer implements tracer_api.Tracer {
final String _name;
final IdGenerator _idGenerator = IdGenerator();
final List<SpanProcessor> _processors;

Tracer(this._processors);
Tracer(this._name, this._processors);

@override
String get name => _name;

@override
Span startSpan(String name, {Context context}) {
context ??= Context.current;

String parentSpanId;
String traceId;
List<int> parentSpanId;
List<int> traceId;
TraceState traceState;

final spanId = _idGenerator.generateSpanId();
Expand All @@ -36,6 +40,6 @@ class Tracer implements tracer_api.Tracer {

final spanContext = SpanContext(traceId, spanId, traceState);

return Span(name, spanContext, parentSpanId, _processors);
return Span(name, spanContext, parentSpanId, _processors, this);
}
}
Loading

0 comments on commit 587cb9c

Please sign in to comment.