Skip to content

Commit

Permalink
Merge pull request #4 from Workiva/console-reporter
Browse files Browse the repository at this point in the history
O11Y-993: OTEL Dart: Implement OTLP Console Reporter
  • Loading branch information
rmconsole6-wk authored Aug 19, 2021
2 parents e36f25f + 4926a4d commit 80cdcfc
Show file tree
Hide file tree
Showing 22 changed files with 477 additions and 22 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ dependencies:
import 'package:opentelemetry/api.dart';
import 'package:opentelemetry/sdk.dart' as otel_sdk;
final Tracer tracer = otel_sdk.TracerProvider().getTracer('appName', version: '1.0.0');
final otel_sdk.ConsoleExporter exporter = otel_sdk.ConsoleExporter();
final otel_sdk.TracerProvider provider = otel_sdk.TracerProvider([
otel_sdk.SimpleSpanProcessor(exporter)
]);
final Tracer tracer = provider.getTracer('appName', version: '1.0.0');
doWork() {
Span parent = getSpan(Context.current);
Expand Down
9 changes: 9 additions & 0 deletions lib/sdk.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
export 'src/sdk/trace/exporters/console_exporter.dart'
show
ConsoleExporter;
export 'src/sdk/trace/span_processors/batch_processor.dart'
show
BatchSpanProcessor;
export 'src/sdk/trace/span_processors/simple_processor.dart'
show
SimpleSpanProcessor;
export 'src/sdk/trace/tracer_provider.dart'
show
TracerProvider;
6 changes: 6 additions & 0 deletions lib/src/api/trace/span.dart
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ abstract class Span {
/// Get the time when the span was started.
int get startTime;

/// The parent span id.
String get parentSpanId;

/// The name of the span.
String get name;

/// Sets the status to the [Span].
///
/// If used, this will override the default [Span] status. Default status code
Expand Down
6 changes: 6 additions & 0 deletions lib/src/api/trace/tracer_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,10 @@ abstract class TracerProvider {
/// [name] should be the name of the tracer or instrumentation library.
/// [version] should be the version of the tracer or instrumentation library.
Tracer getTracer(String name, {String version});

/// Flush all registered span processors.
void forceFlush();

/// Stop all registered span processors.
void shutdown();
}
37 changes: 37 additions & 0 deletions lib/src/sdk/trace/exporters/console_exporter.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import '../../../api/trace/span.dart';

import 'span_exporter.dart';

class ConsoleExporter implements SpanExporter {
var _isShutdown = false;

void _printSpans(List<Span> spans) {
for (var i=0; i < spans.length; i++) {
final span = spans[i];
print({
'traceId': span.spanContext.traceId,
'parentId': span.parentSpanId,
'name': span.name,
'id': span.spanContext.spanId,
'timestamp': span.startTime,
'duration': span.endTime - span.startTime,
'status': span.status.code
});
}
}

@override
void export(List<Span> spans) {
if (_isShutdown) {
return;
}

_printSpans(spans);
}

@override
void shutdown() {
_isShutdown = true;
}

}
7 changes: 7 additions & 0 deletions lib/src/sdk/trace/exporters/span_exporter.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import '../../../api/trace/span.dart';

abstract class SpanExporter {
void export(List<Span> spans);

void shutdown();
}
34 changes: 24 additions & 10 deletions lib/src/sdk/trace/span.dart
Original file line number Diff line number Diff line change
@@ -1,21 +1,31 @@
import 'package:opentelemetry/src/api/trace/span.dart' as span_api;
import 'package:opentelemetry/src/api/trace/span_context.dart';
import 'package:opentelemetry/src/api/trace/span_status.dart';
import '../../api/trace/span.dart' as span_api;
import '../../api/trace/span_context.dart';
import '../../api/trace/span_status.dart';
import 'span_processors/span_processor.dart';

/// A representation of a single operation within a trace.
class Span implements span_api.Span {
int _startTime;
int _endTime;
final String _name;
final String _parentSpanId;
final SpanContext _spanContext;
final SpanStatus _status = SpanStatus();
final List<SpanProcessor> _processors;

@override
String name;

/// Construct a [Span].
Span(this._name, this._spanContext, this._parentSpanId) {
Span(
this.name,
this._spanContext,
this._parentSpanId,
this._processors
) {
_startTime = DateTime.now().toUtc().microsecondsSinceEpoch;
print(
'--- $_name START $_startTime ---\ntraceId: ${_spanContext.traceId}\nparent: $_parentSpanId\nspanId: ${_spanContext.spanId}');
for (var i = 0; i < _processors.length; i++) {
_processors[i].onStart();
}
}

@override
Expand All @@ -27,11 +37,15 @@ class Span implements span_api.Span {
@override
int get startTime => _startTime;

@override
String get parentSpanId => _parentSpanId;

@override
void end() {
_endTime = DateTime.now().toUtc().microsecondsSinceEpoch;
print(
'--- $_name END $_endTime ---\ntraceId: ${_spanContext.traceId}\nparent: $_parentSpanId\nspanId: ${_spanContext.spanId}');
_endTime ??= DateTime.now().toUtc().microsecondsSinceEpoch;
for (var i = 0; i < _processors.length; i++) {
_processors[i].onEnd(this);
}
}

@override
Expand Down
104 changes: 104 additions & 0 deletions lib/src/sdk/trace/span_processors/batch_processor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import 'dart:async';
import 'dart:math';

import 'package:logging/logging.dart';

import '../../../api/trace/span.dart';
import '../exporters/span_exporter.dart';
import 'span_processor.dart';

class BatchSpanProcessor implements SpanProcessor {
final _log = Logger('opentelemetry.BatchSpanProcessor');

final SpanExporter _exporter;
bool _isShutdown = false;
final List<Span> _spanBuffer = [];
Timer _timer;

int _maxExportBatchSize = 512;
final int _maxQueueSize = 2048;
int _scheduledDelay = 5000;

BatchSpanProcessor(this._exporter, {int maxExportBatchSize, int scheduledDelay}) {
_maxExportBatchSize = maxExportBatchSize;
_scheduledDelay = scheduledDelay;
}

@override
void forceFlush() {
if (_isShutdown) {
return;
}
while (_spanBuffer.isNotEmpty) {
_flushBatch();
}
}

@override
void onEnd(Span span) {
if (_isShutdown) {
return;
}
_addToBuffer(span);
}

@override
void onStart() {}

@override
void shutdown() {
forceFlush();
_isShutdown = true;
_clearTimer();
_exporter.shutdown();
}

void _addToBuffer(Span span) {
if (_spanBuffer.length >= _maxQueueSize) {
// Buffer is full, drop span.
_log.warning('Max queue size exceeded. Dropping ${_spanBuffer.length} spans.');
return;
}

_spanBuffer.add(span);
_startTimer();
}

void _startTimer() {
if (_timer != null) {
// _timer already defined.
return;
}

_timer = Timer(Duration(milliseconds: _scheduledDelay), () {
_flushBatch();
if (_spanBuffer.isNotEmpty) {
_clearTimer();
_startTimer();
}
});
}

void _clearTimer() {
if (_timer == null) {
// _timer not set.
return;
}

_timer.cancel();
_timer = null;
}

void _flushBatch() {
_clearTimer();
if (_spanBuffer.isEmpty) {
return;
}

final batchSize = min(_spanBuffer.length, _maxExportBatchSize);
final batch = _spanBuffer.sublist(0, batchSize);
_spanBuffer.removeRange(0, batchSize);

_exporter.export(batch);
}
}
32 changes: 32 additions & 0 deletions lib/src/sdk/trace/span_processors/simple_processor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import '../../../api/trace/span.dart';
import '../exporters/span_exporter.dart';
import 'span_processor.dart';

class SimpleSpanProcessor implements SpanProcessor {
final SpanExporter _exporter;
bool _isShutdown = false;

SimpleSpanProcessor(this._exporter);

@override
void forceFlush() {}

@override
void onEnd(Span span) {
if (_isShutdown) {
return;
}

_exporter.export([span]);
}

@override
void onStart() {}

@override
void shutdown() {
forceFlush();
_isShutdown = true;
_exporter.shutdown();
}
}
11 changes: 11 additions & 0 deletions lib/src/sdk/trace/span_processors/span_processor.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import '../../../api/trace/span.dart';

abstract class SpanProcessor {
void onStart();

void onEnd(Span span);

void shutdown();

void forceFlush();
}
6 changes: 5 additions & 1 deletion lib/src/sdk/trace/tracer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import '../../../src/api/trace/tracer.dart' as tracer_api;
import 'id_generator.dart';
import 'span.dart';
import 'span_context.dart';
import 'span_processors/span_processor.dart';

/// An interface for creating [Span]s and propagating context in-process.
class Tracer implements tracer_api.Tracer {
final IdGenerator _idGenerator = IdGenerator();
final List<SpanProcessor> _processors;

Tracer(this._processors);

@override
Span startSpan(String name, {Context context}) {
Expand All @@ -32,6 +36,6 @@ class Tracer implements tracer_api.Tracer {

final spanContext = SpanContext(traceId, spanId, traceState);

return Span(name, spanContext, parentSpanId);
return Span(name, spanContext, parentSpanId, _processors);
}
}
32 changes: 30 additions & 2 deletions lib/src/sdk/trace/tracer_provider.dart
Original file line number Diff line number Diff line change
@@ -1,13 +1,41 @@
import '../../../src/api/trace/tracer_provider.dart' as api;
import '../../api/trace/tracer_provider.dart' as api;
import 'exporters/console_exporter.dart';
import 'span_processors/simple_processor.dart';
import 'span_processors/span_processor.dart';
import 'tracer.dart';

/// A registry for creating named [Tracer]s.
class TracerProvider implements api.TracerProvider {
final Map<String, Tracer> _tracers = {};
List<SpanProcessor> _processors;

TracerProvider({List<SpanProcessor> processors}) {
if (processors == null) {
_processors = [SimpleSpanProcessor(ConsoleExporter())];
} else {
_processors = processors;
}
}

List<SpanProcessor> get spanProcessors => _processors;

@override
Tracer getTracer(String name, {String version = ''}) {
final key = '$name@$version';
return _tracers.putIfAbsent(key, () => Tracer());
return _tracers.putIfAbsent(key, () => Tracer(_processors));
}

@override
void forceFlush() {
for (var i = 0; i < _processors.length; i++) {
_processors[i].forceFlush();
}
}

@override
void shutdown() {
for (var i = 0; i < _processors.length; i++) {
_processors[i].shutdown();
}
}
}
6 changes: 4 additions & 2 deletions pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ version: 0.0.0
environment:
sdk: '>=2.7.0 <3.0.0'

dependencies:
logging: ^0.11.4

dev_dependencies:
coverage: ^1.0.3
mockito: ^5.0.13
test: ^1.17.10
test: ^1.16.5
workiva_analysis_options: ^1.2.0
Loading

0 comments on commit 80cdcfc

Please sign in to comment.