From 9b835e6588f8bc63a9cfc3bcb13192d3fa3fd28a Mon Sep 17 00:00:00 2001 From: Francesco Gualazzi Date: Tue, 15 Oct 2024 12:57:53 +0200 Subject: [PATCH] metrics: in memory exporter (#3) * Add test for in memory exporter * refactor: introduce interface to export metrics We adopt a pattern based on @fieldParentPtr to support multiple ways of exporting metrics built on a common exportBatch method. Signed-off-by: inge4pres --- src/metrics/exporter.zig | 103 ++++++++++++++++++++++++++++++++++----- src/metrics/reader.zig | 42 ++++++++++------ 2 files changed, 118 insertions(+), 27 deletions(-) diff --git a/src/metrics/exporter.zig b/src/metrics/exporter.zig index bcf03a7..bb397ad 100644 --- a/src/metrics/exporter.zig +++ b/src/metrics/exporter.zig @@ -1,29 +1,110 @@ const std = @import("std"); + +const protobuf = @import("protobuf"); +const ManagedString = protobuf.ManagedString; const pbmetrics = @import("../opentelemetry/proto/metrics/v1.pb.zig"); +const pbcommon = @import("../opentelemetry/proto/common/v1.pb.zig"); + const reader = @import("reader.zig"); const MetricExporter = reader.MetricExporter; const MetricReadError = reader.MetricReadError; +/// ExporterIface is the type representing the interface for exporting metrics. +/// Implementations can be achieved by any type by having a member field of type +/// ExporterIface and a member function exporttBatch with the same signature. +pub const ExporterIface = struct { + exportFn: *const fn (*ExporterIface, pbmetrics.MetricsData) MetricReadError!void, + + pub fn exportBatch(self: *ExporterIface, data: pbmetrics.MetricsData) MetricReadError!void { + return self.exportFn(self, data); + } +}; + pub const ImMemoryExporter = struct { const Self = @This(); + allocator: std.mem.Allocator, + data: std.ArrayList(pbmetrics.ResourceMetrics) = undefined, + // Implement the interface via @fieldParentPtr + exporter: ExporterIface, - var data: std.ArrayList(pbmetrics.ResourceMetrics) = std.ArrayList(pbmetrics.ResourceMetrics).init(std.heap.page_allocator); - - pub fn GetMetricExporter() MetricExporter { - return MetricExporter{ - .exporter = Self.exportBatch, + pub fn init(allocator: std.mem.Allocator) Self { + return Self{ + .allocator = allocator, + .data = std.ArrayList(pbmetrics.ResourceMetrics).init(allocator), + .exporter = ExporterIface{ + .exportFn = exportBatch, + }, }; } + pub fn deinit(self: *Self) void { + self.data.deinit(); + } + + fn exportBatch(iface: *ExporterIface, metrics: pbmetrics.MetricsData) MetricReadError!void { + const self: *Self = @fieldParentPtr("exporter", iface); - fn exportBatch(metrics: pbmetrics.MetricsData) MetricReadError!void { - Self.data.clearRetainingCapacity(); - Self.data.appendSlice(metrics.resource_metrics.items) catch |e| { - std.debug.print("Failed to export metrics in memory: {}\n", .{e}); + self.data.clearRetainingCapacity(); + self.data.appendSlice(metrics.resource_metrics.items) catch |e| { + std.debug.print("error exporting to memory, allocation error: {?}", .{e}); return MetricReadError.ExportFailed; }; + return; } - pub fn Data() []pbmetrics.ResourceMetrics { - return Self.data.items; + pub fn fetch(self: Self) []pbmetrics.ResourceMetrics { + return self.data.items; } }; + +test "in memory exporter stores data" { + var inMemExporter = ImMemoryExporter.init(std.testing.allocator); + defer inMemExporter.deinit(); + + const exporter = MetricExporter.new(&inMemExporter.exporter); + + const howMany: usize = 2; + const dp = try std.testing.allocator.alloc(pbmetrics.NumberDataPoint, howMany); + dp[0] = pbmetrics.NumberDataPoint{ + .attributes = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator), + .exemplars = std.ArrayList(pbmetrics.Exemplar).init(std.testing.allocator), + .value = .{ .as_int = @as(i64, 1) }, + }; + dp[1] = pbmetrics.NumberDataPoint{ + .attributes = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator), + .exemplars = std.ArrayList(pbmetrics.Exemplar).init(std.testing.allocator), + .value = .{ .as_int = @as(i64, 2) }, + }; + + const metric = pbmetrics.Metric{ + .metadata = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator), + .name = ManagedString.managed("test_metric"), + .unit = ManagedString.managed("count"), + .data = .{ .sum = pbmetrics.Sum{ + .data_points = std.ArrayList(pbmetrics.NumberDataPoint).fromOwnedSlice(std.testing.allocator, dp), + .aggregation_temporality = .AGGREGATION_TEMPORALITY_CUMULATIVE, + } }, + }; + + var sm = pbmetrics.ScopeMetrics{ + .metrics = std.ArrayList(pbmetrics.Metric).init(std.testing.allocator), + }; + try sm.metrics.append(metric); + + var resource = pbmetrics.ResourceMetrics{ + .scope_metrics = std.ArrayList(pbmetrics.ScopeMetrics).init(std.testing.allocator), + }; + try resource.scope_metrics.append(sm); + + var metricsData = pbmetrics.MetricsData{ + .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator), + }; + defer metricsData.deinit(); + try metricsData.resource_metrics.append(resource); + + const result = exporter.exportBatch(metricsData); + std.debug.assert(result == .Success); + const data = inMemExporter.fetch(); + + std.debug.assert(data.len == 1); + std.debug.assert(data[0].scope_metrics.items.len == 1); +} diff --git a/src/metrics/reader.zig b/src/metrics/reader.zig index b6f993f..8974fa2 100644 --- a/src/metrics/reader.zig +++ b/src/metrics/reader.zig @@ -34,7 +34,7 @@ pub const MetricReader = struct { // Exporter is the destination of the metrics data. // FIXME // the default metric exporter should be the PeriodicExporter - exporter: MetricExporter = MetricExporter.new(noopExporter), + exporter: MetricExporter = undefined, const Self = @This(); @@ -212,9 +212,11 @@ test "metric reader shutdown prevents collect() to execute" { test "metric reader collects data from meter provider" { var mp = try MeterProvider.init(std.testing.allocator); defer mp.shutdown(); + + var noop = Exporter{ .exportFn = noopExporter }; var reader = MetricReader{ .allocator = std.testing.allocator, - .exporter = MetricExporter.new(noopExporter), + .exporter = MetricExporter.new(&noop), }; defer reader.shutdown(); @@ -252,9 +254,13 @@ const InMemoryExporter = @import("exporter.zig").ImMemoryExporter; test "metric reader custom temporality" { var mp = try MeterProvider.init(std.testing.allocator); defer mp.shutdown(); + + var inMem = InMemoryExporter.init(std.testing.allocator); + defer inMem.deinit(); + var reader = MetricReader{ .allocator = std.testing.allocator, - .exporter = InMemoryExporter.GetMetricExporter(), + .exporter = MetricExporter.new(&inMem.exporter), .temporality = deltaTemporality, }; defer reader.shutdown(); @@ -268,7 +274,7 @@ test "metric reader custom temporality" { try reader.collect(); - const data = InMemoryExporter.Data(); + const data = inMem.fetch(); std.debug.assert(data.len == 1); } @@ -277,16 +283,16 @@ pub const ExportResult = enum { Failure, }; -pub const ExportFn = fn (pbmetrics.MetricsData) MetricReadError!void; +const Exporter = @import("exporter.zig").ExporterIface; pub const MetricExporter = struct { const Self = @This(); - exporter: *const ExportFn, + exporter: *Exporter, hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), var exportCompleted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false); - pub fn new(exporter: *const ExportFn) Self { + pub fn new(exporter: *Exporter) Self { return Self{ .exporter = exporter, }; @@ -303,7 +309,7 @@ pub const MetricExporter = struct { defer exportCompleted.store(true, .release); // Call the exporter function to process metrics data. - self.exporter(metrics) catch |e| { + self.exporter.exportBatch(metrics) catch |e| { std.debug.print("MetricExporter exportBatch failed: {?}\n", .{e}); return ExportResult.Failure; }; @@ -327,26 +333,26 @@ pub const MetricExporter = struct { }; // test harness to build a noop exporter. -fn noopExporter(_: pbmetrics.MetricsData) MetricReadError!void { +fn noopExporter(_: *Exporter, _: pbmetrics.MetricsData) MetricReadError!void { return; } // mocked metric exporter to assert metrics data are read once exported. -fn mockExporter(metrics: pbmetrics.MetricsData) MetricReadError!void { +fn mockExporter(_: *Exporter, metrics: pbmetrics.MetricsData) MetricReadError!void { if (metrics.resource_metrics.items.len != 1) { return MetricReadError.ExportFailed; } // only one resource metrics is expected in this mock } // test harness to build an exporter that times out. -fn waiterExporter(_: pbmetrics.MetricsData) MetricReadError!void { +fn waiterExporter(_: *Exporter, _: pbmetrics.MetricsData) MetricReadError!void { // Sleep for 1 second to simulate a slow exporter. std.time.sleep(std.time.ns_per_ms * 1000); return; } test "build no-op metric exporter" { - const exporter: *const ExportFn = noopExporter; - var me = MetricExporter.new(exporter); + var noop = Exporter{ .exportFn = noopExporter }; + var me = MetricExporter.new(&noop); const metrics = pbmetrics.MetricsData{ .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator), @@ -359,7 +365,9 @@ test "build no-op metric exporter" { test "exported metrics by calling metric reader" { var mp = try MeterProvider.init(std.testing.allocator); defer mp.shutdown(); - const me = MetricExporter.new(mockExporter); + + var mock = Exporter{ .exportFn = mockExporter }; + const me = MetricExporter.new(&mock); var reader = MetricReader{ .allocator = std.testing.allocator, .exporter = me }; defer reader.shutdown(); @@ -376,7 +384,8 @@ test "exported metrics by calling metric reader" { } test "metric exporter force flush succeeds" { - var me = MetricExporter.new(noopExporter); + var noop = Exporter{ .exportFn = noopExporter }; + var me = MetricExporter.new(&noop); const metrics = pbmetrics.MetricsData{ .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator), @@ -393,7 +402,8 @@ fn backgroundRunner(me: *MetricExporter, metrics: pbmetrics.MetricsData) !void { } test "metric exporter force flush fails" { - var me = MetricExporter.new(waiterExporter); + var wait = Exporter{ .exportFn = waiterExporter }; + var me = MetricExporter.new(&wait); const metrics = pbmetrics.MetricsData{ .resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),