diff --git a/README.md b/README.md index 43f64b8c..f9bbdcaf 100644 --- a/README.md +++ b/README.md @@ -22,7 +22,11 @@ dependencies: import 'package:opentelemetry/api.dart'; import 'package:opentelemetry/sdk.dart' as otel_sdk; -final Tracer tracer = otel_sdk.TracerProvider().getTracer('appName', version: '1.0.0'); +final otel_sdk.ConsoleExporter exporter = otel_sdk.ConsoleExporter(); +final otel_sdk.TracerProvider provider = otel_sdk.TracerProvider([ + otel_sdk.SimpleSpanProcessor(exporter) +]); +final Tracer tracer = provider.getTracer('appName', version: '1.0.0'); doWork() { Span parent = getSpan(Context.current); diff --git a/lib/sdk.dart b/lib/sdk.dart index 1b134895..abc4044b 100644 --- a/lib/sdk.dart +++ b/lib/sdk.dart @@ -1,3 +1,12 @@ +export 'src/sdk/trace/exporters/console_exporter.dart' + show + ConsoleExporter; +export 'src/sdk/trace/span_processors/batch_processor.dart' + show + BatchSpanProcessor; +export 'src/sdk/trace/span_processors/simple_processor.dart' + show + SimpleSpanProcessor; export 'src/sdk/trace/tracer_provider.dart' show TracerProvider; diff --git a/lib/src/api/trace/span.dart b/lib/src/api/trace/span.dart index dcbdc8cf..d526ae03 100644 --- a/lib/src/api/trace/span.dart +++ b/lib/src/api/trace/span.dart @@ -22,6 +22,12 @@ abstract class Span { /// Get the time when the span was started. int get startTime; + /// The parent span id. + String get parentSpanId; + + /// The name of the span. + String get name; + /// Sets the status to the [Span]. /// /// If used, this will override the default [Span] status. Default status code diff --git a/lib/src/api/trace/tracer_provider.dart b/lib/src/api/trace/tracer_provider.dart index 6ab4304c..824c33b6 100644 --- a/lib/src/api/trace/tracer_provider.dart +++ b/lib/src/api/trace/tracer_provider.dart @@ -8,4 +8,10 @@ abstract class TracerProvider { /// [name] should be the name of the tracer or instrumentation library. /// [version] should be the version of the tracer or instrumentation library. Tracer getTracer(String name, {String version}); + + /// Flush all registered span processors. + void forceFlush(); + + /// Stop all registered span processors. + void shutdown(); } diff --git a/lib/src/sdk/trace/exporters/console_exporter.dart b/lib/src/sdk/trace/exporters/console_exporter.dart new file mode 100644 index 00000000..5ba63f13 --- /dev/null +++ b/lib/src/sdk/trace/exporters/console_exporter.dart @@ -0,0 +1,37 @@ +import '../../../api/trace/span.dart'; + +import 'span_exporter.dart'; + +class ConsoleExporter implements SpanExporter { + var _isShutdown = false; + + void _printSpans(List spans) { + for (var i=0; i < spans.length; i++) { + final span = spans[i]; + print({ + 'traceId': span.spanContext.traceId, + 'parentId': span.parentSpanId, + 'name': span.name, + 'id': span.spanContext.spanId, + 'timestamp': span.startTime, + 'duration': span.endTime - span.startTime, + 'status': span.status.code + }); + } + } + + @override + void export(List spans) { + if (_isShutdown) { + return; + } + + _printSpans(spans); + } + + @override + void shutdown() { + _isShutdown = true; + } + +} diff --git a/lib/src/sdk/trace/exporters/span_exporter.dart b/lib/src/sdk/trace/exporters/span_exporter.dart new file mode 100644 index 00000000..6f127e05 --- /dev/null +++ b/lib/src/sdk/trace/exporters/span_exporter.dart @@ -0,0 +1,7 @@ +import '../../../api/trace/span.dart'; + +abstract class SpanExporter { + void export(List spans); + + void shutdown(); +} diff --git a/lib/src/sdk/trace/span.dart b/lib/src/sdk/trace/span.dart index 0f18654d..28963723 100644 --- a/lib/src/sdk/trace/span.dart +++ b/lib/src/sdk/trace/span.dart @@ -1,21 +1,31 @@ -import 'package:opentelemetry/src/api/trace/span.dart' as span_api; -import 'package:opentelemetry/src/api/trace/span_context.dart'; -import 'package:opentelemetry/src/api/trace/span_status.dart'; +import '../../api/trace/span.dart' as span_api; +import '../../api/trace/span_context.dart'; +import '../../api/trace/span_status.dart'; +import 'span_processors/span_processor.dart'; /// A representation of a single operation within a trace. class Span implements span_api.Span { int _startTime; int _endTime; - final String _name; final String _parentSpanId; final SpanContext _spanContext; final SpanStatus _status = SpanStatus(); + final List _processors; + + @override + String name; /// Construct a [Span]. - Span(this._name, this._spanContext, this._parentSpanId) { + Span( + this.name, + this._spanContext, + this._parentSpanId, + this._processors + ) { _startTime = DateTime.now().toUtc().microsecondsSinceEpoch; - print( - '--- $_name START $_startTime ---\ntraceId: ${_spanContext.traceId}\nparent: $_parentSpanId\nspanId: ${_spanContext.spanId}'); + for (var i = 0; i < _processors.length; i++) { + _processors[i].onStart(); + } } @override @@ -27,11 +37,15 @@ class Span implements span_api.Span { @override int get startTime => _startTime; + @override + String get parentSpanId => _parentSpanId; + @override void end() { - _endTime = DateTime.now().toUtc().microsecondsSinceEpoch; - print( - '--- $_name END $_endTime ---\ntraceId: ${_spanContext.traceId}\nparent: $_parentSpanId\nspanId: ${_spanContext.spanId}'); + _endTime ??= DateTime.now().toUtc().microsecondsSinceEpoch; + for (var i = 0; i < _processors.length; i++) { + _processors[i].onEnd(this); + } } @override diff --git a/lib/src/sdk/trace/span_processors/batch_processor.dart b/lib/src/sdk/trace/span_processors/batch_processor.dart new file mode 100644 index 00000000..31ad998c --- /dev/null +++ b/lib/src/sdk/trace/span_processors/batch_processor.dart @@ -0,0 +1,104 @@ +import 'dart:async'; +import 'dart:math'; + +import 'package:logging/logging.dart'; + +import '../../../api/trace/span.dart'; +import '../exporters/span_exporter.dart'; +import 'span_processor.dart'; + +class BatchSpanProcessor implements SpanProcessor { + final _log = Logger('opentelemetry.BatchSpanProcessor'); + + final SpanExporter _exporter; + bool _isShutdown = false; + final List _spanBuffer = []; + Timer _timer; + + int _maxExportBatchSize = 512; + final int _maxQueueSize = 2048; + int _scheduledDelay = 5000; + + BatchSpanProcessor(this._exporter, {int maxExportBatchSize, int scheduledDelay}) { + _maxExportBatchSize = maxExportBatchSize; + _scheduledDelay = scheduledDelay; + } + + @override + void forceFlush() { + if (_isShutdown) { + return; + } + while (_spanBuffer.isNotEmpty) { + _flushBatch(); + } + } + + @override + void onEnd(Span span) { + if (_isShutdown) { + return; + } + _addToBuffer(span); + } + + @override + void onStart() {} + + @override + void shutdown() { + forceFlush(); + _isShutdown = true; + _clearTimer(); + _exporter.shutdown(); + } + + void _addToBuffer(Span span) { + if (_spanBuffer.length >= _maxQueueSize) { + // Buffer is full, drop span. + _log.warning('Max queue size exceeded. Dropping ${_spanBuffer.length} spans.'); + return; + } + + _spanBuffer.add(span); + _startTimer(); + } + + void _startTimer() { + if (_timer != null) { + // _timer already defined. + return; + } + + _timer = Timer(Duration(milliseconds: _scheduledDelay), () { + _flushBatch(); + if (_spanBuffer.isNotEmpty) { + _clearTimer(); + _startTimer(); + } + }); + } + + void _clearTimer() { + if (_timer == null) { + // _timer not set. + return; + } + + _timer.cancel(); + _timer = null; + } + + void _flushBatch() { + _clearTimer(); + if (_spanBuffer.isEmpty) { + return; + } + + final batchSize = min(_spanBuffer.length, _maxExportBatchSize); + final batch = _spanBuffer.sublist(0, batchSize); + _spanBuffer.removeRange(0, batchSize); + + _exporter.export(batch); + } +} diff --git a/lib/src/sdk/trace/span_processors/simple_processor.dart b/lib/src/sdk/trace/span_processors/simple_processor.dart new file mode 100644 index 00000000..1df9b9d4 --- /dev/null +++ b/lib/src/sdk/trace/span_processors/simple_processor.dart @@ -0,0 +1,32 @@ +import '../../../api/trace/span.dart'; +import '../exporters/span_exporter.dart'; +import 'span_processor.dart'; + +class SimpleSpanProcessor implements SpanProcessor { + final SpanExporter _exporter; + bool _isShutdown = false; + + SimpleSpanProcessor(this._exporter); + + @override + void forceFlush() {} + + @override + void onEnd(Span span) { + if (_isShutdown) { + return; + } + + _exporter.export([span]); + } + + @override + void onStart() {} + + @override + void shutdown() { + forceFlush(); + _isShutdown = true; + _exporter.shutdown(); + } +} diff --git a/lib/src/sdk/trace/span_processors/span_processor.dart b/lib/src/sdk/trace/span_processors/span_processor.dart new file mode 100644 index 00000000..b69a8efb --- /dev/null +++ b/lib/src/sdk/trace/span_processors/span_processor.dart @@ -0,0 +1,11 @@ +import '../../../api/trace/span.dart'; + +abstract class SpanProcessor { + void onStart(); + + void onEnd(Span span); + + void shutdown(); + + void forceFlush(); +} diff --git a/lib/src/sdk/trace/tracer.dart b/lib/src/sdk/trace/tracer.dart index 67f1b645..14bd18da 100644 --- a/lib/src/sdk/trace/tracer.dart +++ b/lib/src/sdk/trace/tracer.dart @@ -5,10 +5,14 @@ import '../../../src/api/trace/tracer.dart' as tracer_api; import 'id_generator.dart'; import 'span.dart'; import 'span_context.dart'; +import 'span_processors/span_processor.dart'; /// An interface for creating [Span]s and propagating context in-process. class Tracer implements tracer_api.Tracer { final IdGenerator _idGenerator = IdGenerator(); + final List _processors; + + Tracer(this._processors); @override Span startSpan(String name, {Context context}) { @@ -32,6 +36,6 @@ class Tracer implements tracer_api.Tracer { final spanContext = SpanContext(traceId, spanId, traceState); - return Span(name, spanContext, parentSpanId); + return Span(name, spanContext, parentSpanId, _processors); } } diff --git a/lib/src/sdk/trace/tracer_provider.dart b/lib/src/sdk/trace/tracer_provider.dart index 85461824..86d771f7 100644 --- a/lib/src/sdk/trace/tracer_provider.dart +++ b/lib/src/sdk/trace/tracer_provider.dart @@ -1,13 +1,41 @@ -import '../../../src/api/trace/tracer_provider.dart' as api; +import '../../api/trace/tracer_provider.dart' as api; +import 'exporters/console_exporter.dart'; +import 'span_processors/simple_processor.dart'; +import 'span_processors/span_processor.dart'; import 'tracer.dart'; /// A registry for creating named [Tracer]s. class TracerProvider implements api.TracerProvider { final Map _tracers = {}; + List _processors; + + TracerProvider({List processors}) { + if (processors == null) { + _processors = [SimpleSpanProcessor(ConsoleExporter())]; + } else { + _processors = processors; + } + } + + List get spanProcessors => _processors; @override Tracer getTracer(String name, {String version = ''}) { final key = '$name@$version'; - return _tracers.putIfAbsent(key, () => Tracer()); + return _tracers.putIfAbsent(key, () => Tracer(_processors)); + } + + @override + void forceFlush() { + for (var i = 0; i < _processors.length; i++) { + _processors[i].forceFlush(); + } + } + + @override + void shutdown() { + for (var i = 0; i < _processors.length; i++) { + _processors[i].shutdown(); + } } } diff --git a/pubspec.yaml b/pubspec.yaml index c03f282d..fe4a5341 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -3,8 +3,10 @@ version: 0.0.0 environment: sdk: '>=2.7.0 <3.0.0' +dependencies: + logging: ^0.11.4 + dev_dependencies: - coverage: ^1.0.3 mockito: ^5.0.13 - test: ^1.17.10 + test: ^1.16.5 workiva_analysis_options: ^1.2.0 diff --git a/test/integration/sdk/span_test.dart b/test/integration/sdk/span_test.dart index 752100df..3b83f785 100644 --- a/test/integration/sdk/span_test.dart +++ b/test/integration/sdk/span_test.dart @@ -1,26 +1,43 @@ import 'package:opentelemetry/src/api/trace/span_status.dart'; +import 'package:mockito/mockito.dart'; import 'package:opentelemetry/src/sdk/trace/span.dart'; import 'package:opentelemetry/src/sdk/trace/span_context.dart'; import 'package:opentelemetry/src/sdk/trace/trace_state.dart'; import 'package:test/test.dart'; +import '../../unit/mocks.dart'; + void main() { test('span set and end time', () { - final span = - Span('foo', SpanContext('trace123', '789', TraceState()), 'span456'); + final mockProcessor1 = MockSpanProcessor(); + final mockProcessor2 = MockSpanProcessor(); + final span = Span('foo', SpanContext('trace123', '789', TraceState()), 'span456', [ + mockProcessor1, + mockProcessor2 + ]); expect(span.startTime, isA()); expect(span.endTime, isNull); + expect(span.parentSpanId, 'span456'); + expect(span.name, 'foo'); + + verify(mockProcessor1.onStart()).called(1); + verify(mockProcessor2.onStart()).called(1); + verifyNever(mockProcessor1.onEnd(span)); + verifyNever(mockProcessor2.onEnd(span)); span.end(); expect(span.startTime, isA()); expect(span.endTime, isA()); expect(span.endTime, greaterThan(span.startTime)); + + verify(mockProcessor1.onEnd(span)).called(1); + verify(mockProcessor2.onEnd(span)).called(1); }); test('span status', () { final span = - Span('foo', SpanContext('trace123', '789', TraceState()), 'span456'); + Span('foo', SpanContext('trace123', '789', TraceState()), 'span456', []); // Verify span status' defaults. expect(span.status.code, equals(StatusCode.UNSET)); diff --git a/test/integration/sdk/tracer_test.dart b/test/integration/sdk/tracer_test.dart index d8b10f9b..9973d464 100644 --- a/test/integration/sdk/tracer_test.dart +++ b/test/integration/sdk/tracer_test.dart @@ -5,7 +5,7 @@ import 'package:test/test.dart'; void main() { test('startSpan new trace', () { - final tracer = Tracer(); + final tracer = Tracer([]); final span = tracer.startSpan('foo'); @@ -16,7 +16,7 @@ void main() { }); test('startSpan child span', () { - final tracer = Tracer(); + final tracer = Tracer([]); final parentSpan = tracer.startSpan('foo'); final context = setSpan(Context.current, parentSpan); diff --git a/test/unit/api/context_utils_test.dart b/test/unit/api/context_utils_test.dart index 589c72ac..555c3c49 100644 --- a/test/unit/api/context_utils_test.dart +++ b/test/unit/api/context_utils_test.dart @@ -10,7 +10,7 @@ import '../mocks.dart'; void main() { final testSpanContext = SpanContext('123', '789', TraceState()); - final testSpan = Span('foo', testSpanContext, '456'); + final testSpan = Span('foo', testSpanContext, '456', []); group('getSpan', () { test('returns Span when exists', () { diff --git a/test/unit/mocks.dart b/test/unit/mocks.dart index b7c9e7af..11f4b2ae 100644 --- a/test/unit/mocks.dart +++ b/test/unit/mocks.dart @@ -1,4 +1,10 @@ import 'package:mockito/mockito.dart'; import 'package:opentelemetry/src/api/context/context.dart'; +import 'package:opentelemetry/src/api/trace/span.dart'; +import 'package:opentelemetry/src/sdk/trace/exporters/span_exporter.dart'; +import 'package:opentelemetry/src/sdk/trace/span_processors/span_processor.dart'; class MockContext extends Mock implements Context {} +class MockSpan extends Mock implements Span {} +class MockSpanExporter extends Mock implements SpanExporter {} +class MockSpanProcessor extends Mock implements SpanProcessor {} diff --git a/test/unit/sdk/exporters/console_exporter_test.dart b/test/unit/sdk/exporters/console_exporter_test.dart new file mode 100644 index 00000000..2ce46614 --- /dev/null +++ b/test/unit/sdk/exporters/console_exporter_test.dart @@ -0,0 +1,46 @@ +import 'dart:async'; + +import 'package:opentelemetry/src/sdk/trace/exporters/console_exporter.dart'; +import 'package:opentelemetry/src/sdk/trace/span.dart'; +import 'package:opentelemetry/src/sdk/trace/span_context.dart'; +import 'package:opentelemetry/src/sdk/trace/trace_state.dart'; +import 'package:test/test.dart'; + +List printLogs = []; + +dynamic overridePrint(Function() testFn) => () { + final spec = ZoneSpecification( + print: (_, __, ___, msg) { + // Add to log instead of printing to stdout + printLogs.add(msg); + } + ); + return Zone.current.fork(specification: spec).run(testFn); +}; + +void main() { + tearDown(() { + printLogs = []; + }); + + test('prints', overridePrint(() { + final span = Span('foo', SpanContext('trace123', 'span789', TraceState()), 'span456', []) + ..end(); + + ConsoleExporter().export([span]); + + final expected = RegExp(r'^{traceId: trace123, parentId: span456, name: foo, id: span789, timestamp: \d+, duration: \d+, status: StatusCode.UNSET}$'); + expect(printLogs.length, 1); + expect(expected.hasMatch(printLogs[0]), true); + })); + + test('does not print after shutdown', overridePrint(() { + final span = Span('foo', SpanContext('trace123', 'span789', TraceState()), 'span456', []); + + ConsoleExporter() + ..shutdown() + ..export([span]); + + expect(printLogs.length, 0); + })); +} diff --git a/test/unit/sdk/span_processors/batch_processor_test.dart b/test/unit/sdk/span_processors/batch_processor_test.dart new file mode 100644 index 00000000..73920dda --- /dev/null +++ b/test/unit/sdk/span_processors/batch_processor_test.dart @@ -0,0 +1,43 @@ +import 'package:mockito/mockito.dart'; +import 'package:opentelemetry/src/api/trace/span.dart'; +import 'package:opentelemetry/src/sdk/trace/exporters/span_exporter.dart'; +import 'package:opentelemetry/src/sdk/trace/span_processors/batch_processor.dart'; +import 'package:test/test.dart'; +import '../../mocks.dart'; + +void main() { + BatchSpanProcessor processor; + SpanExporter mockExporter; + Span mockSpan1, mockSpan2, mockSpan3; + + setUp(() { + mockSpan1 = MockSpan(); + mockSpan2 = MockSpan(); + mockSpan3 = MockSpan(); + + mockExporter = MockSpanExporter(); + processor = BatchSpanProcessor(mockExporter, maxExportBatchSize: 2, scheduledDelay: 100); + }); + + tearDown(() { + processor.shutdown(); + reset(mockExporter); + }); + + test('forceFlush', () { + processor + ..onEnd(mockSpan1) + ..onEnd(mockSpan2) + ..onEnd(mockSpan3) + ..forceFlush(); + + verify(mockExporter.export([mockSpan1, mockSpan2])).called(1); + verify(mockExporter.export([mockSpan3])).called(1); + }); + + test('shutdown shuts exporter down', () { + processor.shutdown(); + + verify(mockExporter.shutdown()).called(1); + }); +} diff --git a/test/unit/sdk/span_processors/simple_processor_test.dart b/test/unit/sdk/span_processors/simple_processor_test.dart new file mode 100644 index 00000000..693dda6d --- /dev/null +++ b/test/unit/sdk/span_processors/simple_processor_test.dart @@ -0,0 +1,33 @@ +import 'package:mockito/mockito.dart'; +import 'package:opentelemetry/src/api/trace/span.dart'; +import 'package:opentelemetry/src/sdk/trace/exporters/span_exporter.dart'; +import 'package:opentelemetry/src/sdk/trace/span_processors/simple_processor.dart'; +import 'package:test/test.dart'; +import '../../mocks.dart'; + +void main() { + SpanExporter exporter; + SimpleSpanProcessor processor; + Span span; + + setUp(() { + exporter = MockSpanExporter(); + processor = SimpleSpanProcessor(exporter); + span = MockSpan(); + }); + + test('executes export', () { + processor.onEnd(span); + + verify(exporter.export([span])).called(1); + }); + + test('does not export if shutdown', () { + processor + ..shutdown() + ..onEnd(span); + + verify(exporter.shutdown()).called(1); + verifyNever(exporter.export([span])); + }); +} diff --git a/test/unit/sdk/span_test.dart b/test/unit/sdk/span_test.dart new file mode 100644 index 00000000..b2dd62c8 --- /dev/null +++ b/test/unit/sdk/span_test.dart @@ -0,0 +1,14 @@ +import 'package:opentelemetry/src/sdk/trace/span.dart'; +import 'package:opentelemetry/src/sdk/trace/span_context.dart'; +import 'package:opentelemetry/src/sdk/trace/trace_state.dart'; +import 'package:test/test.dart'; + +void main() { + test('span change name', () { + final span = Span('foo', SpanContext('trace123', '789', TraceState()), 'span456', []); + expect(span.name, 'foo'); + + span.name = 'bar'; + expect(span.name, 'bar'); + }); +} diff --git a/test/unit/sdk/trace_provider_test.dart b/test/unit/sdk/trace_provider_test.dart index 3ea19670..6feb64d9 100644 --- a/test/unit/sdk/trace_provider_test.dart +++ b/test/unit/sdk/trace_provider_test.dart @@ -1,6 +1,10 @@ +import 'package:mockito/mockito.dart'; +import 'package:opentelemetry/src/sdk/trace/span_processors/span_processor.dart'; import 'package:opentelemetry/src/sdk/trace/tracer_provider.dart'; import 'package:test/test.dart'; +import '../mocks.dart'; + void main() { test('getTracer stores tracers by name', () { final provider = TracerProvider(); @@ -13,5 +17,33 @@ void main() { isNot(fooWithVersionTracer), same(provider.getTracer('foo')) ])); + + expect(provider.spanProcessors, isA>()); + }); + + test('tracerProvider custom span processors', () { + final mockProcessor1 = MockSpanProcessor(); + final mockProcessor2 = MockSpanProcessor(); + final provider = TracerProvider(processors: [mockProcessor1, mockProcessor2]); + + expect(provider.spanProcessors, [mockProcessor1, mockProcessor2]); + }); + + test('tracerProvider force flushes all processors', () { + final mockProcessor1 = MockSpanProcessor(); + final mockProcessor2 = MockSpanProcessor(); + TracerProvider(processors: [mockProcessor1, mockProcessor2]).forceFlush(); + + verify(mockProcessor1.forceFlush()).called(1); + verify(mockProcessor2.forceFlush()).called(1); + }); + + test('tracerProvider shuts down all processors', () { + final mockProcessor1 = MockSpanProcessor(); + final mockProcessor2 = MockSpanProcessor(); + TracerProvider(processors: [mockProcessor1, mockProcessor2]).shutdown(); + + verify(mockProcessor1.shutdown()).called(1); + verify(mockProcessor2.shutdown()).called(1); }); }