Skip to content

Commit

Permalink
metrics: in memory exporter (#3)
Browse files Browse the repository at this point in the history
* Add test for in memory exporter
* refactor: introduce interface to export metrics

We adopt a pattern based on @fieldParentPtr to 
support multiple ways of exporting metrics built on
a common exportBatch method.

Signed-off-by: inge4pres <[email protected]>
  • Loading branch information
inge4pres authored Oct 15, 2024
1 parent 0c87d60 commit 9b835e6
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 27 deletions.
103 changes: 92 additions & 11 deletions src/metrics/exporter.zig
Original file line number Diff line number Diff line change
@@ -1,29 +1,110 @@
const std = @import("std");

const protobuf = @import("protobuf");
const ManagedString = protobuf.ManagedString;
const pbmetrics = @import("../opentelemetry/proto/metrics/v1.pb.zig");
const pbcommon = @import("../opentelemetry/proto/common/v1.pb.zig");

const reader = @import("reader.zig");
const MetricExporter = reader.MetricExporter;
const MetricReadError = reader.MetricReadError;

/// ExporterIface is the type representing the interface for exporting metrics.
/// Implementations can be achieved by any type by having a member field of type
/// ExporterIface and a member function exporttBatch with the same signature.
pub const ExporterIface = struct {
exportFn: *const fn (*ExporterIface, pbmetrics.MetricsData) MetricReadError!void,

pub fn exportBatch(self: *ExporterIface, data: pbmetrics.MetricsData) MetricReadError!void {
return self.exportFn(self, data);
}
};

pub const ImMemoryExporter = struct {
const Self = @This();
allocator: std.mem.Allocator,
data: std.ArrayList(pbmetrics.ResourceMetrics) = undefined,
// Implement the interface via @fieldParentPtr
exporter: ExporterIface,

var data: std.ArrayList(pbmetrics.ResourceMetrics) = std.ArrayList(pbmetrics.ResourceMetrics).init(std.heap.page_allocator);

pub fn GetMetricExporter() MetricExporter {
return MetricExporter{
.exporter = Self.exportBatch,
pub fn init(allocator: std.mem.Allocator) Self {
return Self{
.allocator = allocator,
.data = std.ArrayList(pbmetrics.ResourceMetrics).init(allocator),
.exporter = ExporterIface{
.exportFn = exportBatch,
},
};
}
pub fn deinit(self: *Self) void {
self.data.deinit();
}

fn exportBatch(iface: *ExporterIface, metrics: pbmetrics.MetricsData) MetricReadError!void {
const self: *Self = @fieldParentPtr("exporter", iface);

fn exportBatch(metrics: pbmetrics.MetricsData) MetricReadError!void {
Self.data.clearRetainingCapacity();
Self.data.appendSlice(metrics.resource_metrics.items) catch |e| {
std.debug.print("Failed to export metrics in memory: {}\n", .{e});
self.data.clearRetainingCapacity();
self.data.appendSlice(metrics.resource_metrics.items) catch |e| {
std.debug.print("error exporting to memory, allocation error: {?}", .{e});
return MetricReadError.ExportFailed;
};
return;
}

pub fn Data() []pbmetrics.ResourceMetrics {
return Self.data.items;
pub fn fetch(self: Self) []pbmetrics.ResourceMetrics {
return self.data.items;
}
};

test "in memory exporter stores data" {
var inMemExporter = ImMemoryExporter.init(std.testing.allocator);
defer inMemExporter.deinit();

const exporter = MetricExporter.new(&inMemExporter.exporter);

const howMany: usize = 2;
const dp = try std.testing.allocator.alloc(pbmetrics.NumberDataPoint, howMany);
dp[0] = pbmetrics.NumberDataPoint{
.attributes = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator),
.exemplars = std.ArrayList(pbmetrics.Exemplar).init(std.testing.allocator),
.value = .{ .as_int = @as(i64, 1) },
};
dp[1] = pbmetrics.NumberDataPoint{
.attributes = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator),
.exemplars = std.ArrayList(pbmetrics.Exemplar).init(std.testing.allocator),
.value = .{ .as_int = @as(i64, 2) },
};

const metric = pbmetrics.Metric{
.metadata = std.ArrayList(pbcommon.KeyValue).init(std.testing.allocator),
.name = ManagedString.managed("test_metric"),
.unit = ManagedString.managed("count"),
.data = .{ .sum = pbmetrics.Sum{
.data_points = std.ArrayList(pbmetrics.NumberDataPoint).fromOwnedSlice(std.testing.allocator, dp),
.aggregation_temporality = .AGGREGATION_TEMPORALITY_CUMULATIVE,
} },
};

var sm = pbmetrics.ScopeMetrics{
.metrics = std.ArrayList(pbmetrics.Metric).init(std.testing.allocator),
};
try sm.metrics.append(metric);

var resource = pbmetrics.ResourceMetrics{
.scope_metrics = std.ArrayList(pbmetrics.ScopeMetrics).init(std.testing.allocator),
};
try resource.scope_metrics.append(sm);

var metricsData = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
};
defer metricsData.deinit();
try metricsData.resource_metrics.append(resource);

const result = exporter.exportBatch(metricsData);
std.debug.assert(result == .Success);
const data = inMemExporter.fetch();

std.debug.assert(data.len == 1);
std.debug.assert(data[0].scope_metrics.items.len == 1);
}
42 changes: 26 additions & 16 deletions src/metrics/reader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub const MetricReader = struct {
// Exporter is the destination of the metrics data.
// FIXME
// the default metric exporter should be the PeriodicExporter
exporter: MetricExporter = MetricExporter.new(noopExporter),
exporter: MetricExporter = undefined,

const Self = @This();

Expand Down Expand Up @@ -212,9 +212,11 @@ test "metric reader shutdown prevents collect() to execute" {
test "metric reader collects data from meter provider" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();

var noop = Exporter{ .exportFn = noopExporter };
var reader = MetricReader{
.allocator = std.testing.allocator,
.exporter = MetricExporter.new(noopExporter),
.exporter = MetricExporter.new(&noop),
};
defer reader.shutdown();

Expand Down Expand Up @@ -252,9 +254,13 @@ const InMemoryExporter = @import("exporter.zig").ImMemoryExporter;
test "metric reader custom temporality" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();

var inMem = InMemoryExporter.init(std.testing.allocator);
defer inMem.deinit();

var reader = MetricReader{
.allocator = std.testing.allocator,
.exporter = InMemoryExporter.GetMetricExporter(),
.exporter = MetricExporter.new(&inMem.exporter),
.temporality = deltaTemporality,
};
defer reader.shutdown();
Expand All @@ -268,7 +274,7 @@ test "metric reader custom temporality" {

try reader.collect();

const data = InMemoryExporter.Data();
const data = inMem.fetch();
std.debug.assert(data.len == 1);
}

Expand All @@ -277,16 +283,16 @@ pub const ExportResult = enum {
Failure,
};

pub const ExportFn = fn (pbmetrics.MetricsData) MetricReadError!void;
const Exporter = @import("exporter.zig").ExporterIface;

pub const MetricExporter = struct {
const Self = @This();
exporter: *const ExportFn,
exporter: *Exporter,
hasShutDown: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),

var exportCompleted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);

pub fn new(exporter: *const ExportFn) Self {
pub fn new(exporter: *Exporter) Self {
return Self{
.exporter = exporter,
};
Expand All @@ -303,7 +309,7 @@ pub const MetricExporter = struct {
defer exportCompleted.store(true, .release);

// Call the exporter function to process metrics data.
self.exporter(metrics) catch |e| {
self.exporter.exportBatch(metrics) catch |e| {
std.debug.print("MetricExporter exportBatch failed: {?}\n", .{e});
return ExportResult.Failure;
};
Expand All @@ -327,26 +333,26 @@ pub const MetricExporter = struct {
};

// test harness to build a noop exporter.
fn noopExporter(_: pbmetrics.MetricsData) MetricReadError!void {
fn noopExporter(_: *Exporter, _: pbmetrics.MetricsData) MetricReadError!void {
return;
}
// mocked metric exporter to assert metrics data are read once exported.
fn mockExporter(metrics: pbmetrics.MetricsData) MetricReadError!void {
fn mockExporter(_: *Exporter, metrics: pbmetrics.MetricsData) MetricReadError!void {
if (metrics.resource_metrics.items.len != 1) {
return MetricReadError.ExportFailed;
} // only one resource metrics is expected in this mock
}

// test harness to build an exporter that times out.
fn waiterExporter(_: pbmetrics.MetricsData) MetricReadError!void {
fn waiterExporter(_: *Exporter, _: pbmetrics.MetricsData) MetricReadError!void {
// Sleep for 1 second to simulate a slow exporter.
std.time.sleep(std.time.ns_per_ms * 1000);
return;
}

test "build no-op metric exporter" {
const exporter: *const ExportFn = noopExporter;
var me = MetricExporter.new(exporter);
var noop = Exporter{ .exportFn = noopExporter };
var me = MetricExporter.new(&noop);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
Expand All @@ -359,7 +365,9 @@ test "build no-op metric exporter" {
test "exported metrics by calling metric reader" {
var mp = try MeterProvider.init(std.testing.allocator);
defer mp.shutdown();
const me = MetricExporter.new(mockExporter);

var mock = Exporter{ .exportFn = mockExporter };
const me = MetricExporter.new(&mock);

var reader = MetricReader{ .allocator = std.testing.allocator, .exporter = me };
defer reader.shutdown();
Expand All @@ -376,7 +384,8 @@ test "exported metrics by calling metric reader" {
}

test "metric exporter force flush succeeds" {
var me = MetricExporter.new(noopExporter);
var noop = Exporter{ .exportFn = noopExporter };
var me = MetricExporter.new(&noop);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
Expand All @@ -393,7 +402,8 @@ fn backgroundRunner(me: *MetricExporter, metrics: pbmetrics.MetricsData) !void {
}

test "metric exporter force flush fails" {
var me = MetricExporter.new(waiterExporter);
var wait = Exporter{ .exportFn = waiterExporter };
var me = MetricExporter.new(&wait);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
Expand Down

0 comments on commit 9b835e6

Please sign in to comment.