Skip to content

Commit

Permalink
metric exporter: forceFlush
Browse files Browse the repository at this point in the history
Signed-off-by: inge4pres <[email protected]>
  • Loading branch information
inge4pres committed Oct 4, 2024
1 parent 8f1deaf commit bf49a43
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions src/metrics/reader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ const view = @import("view.zig");
pub const MetricReadError = error{
CollectFailedOnMissingMeterProvider,
ExportFailed,
ForceFlushTimedOut,
};

/// MetricReader reads metrics' data from a MeterProvider.
Expand Down Expand Up @@ -76,6 +77,7 @@ pub const MetricReader = struct {
// No meter provider to collect from.
return MetricReadError.CollectFailedOnMissingMeterProvider;
}

switch (self.exporter.exportBatch(metricsData)) {
ExportResult.Success => return,
ExportResult.Failure => return MetricReadError.ExportFailed,
Expand Down Expand Up @@ -198,19 +200,37 @@ pub const MetricExporter = struct {
const Self = @This();
exporter: *const ExportFn,

var exportCompleted: std.atomic.Value(bool) = std.atomic.Value(bool).init(false);

pub fn new(exporter: *const ExportFn) Self {
return Self{
.exporter = exporter,
};
}

pub fn exportBatch(self: Self, metrics: pbmetrics.MetricsData) ExportResult {
// Acquire the lock to ensure that forceFlush is waiting for export to complete.
_ = exportCompleted.load(.acquire);
defer exportCompleted.store(true, .release);

// Call the exporter function to process metrics data.
self.exporter(metrics) catch |e| {
std.debug.print("MetricExporter exportBatch failed: {?}\n", .{e});
return ExportResult.Failure;
};
return ExportResult.Success;
}
// Ensure that all the data is flushed to the destination.
pub fn forceFlush(_: Self, timeout_ms: u64) !void {
const start = std.time.milliTimestamp(); // Miliseconds
const timeout: i64 = @intCast(timeout_ms);
while (std.time.milliTimestamp() < start + timeout) {
if (exportCompleted.load(.acquire)) {
return;
} else std.time.sleep(std.time.ns_per_ms);
}
return MetricReadError.ForceFlushTimedOut;
}
};

// test harness to build a noop exporter.
Expand All @@ -224,6 +244,13 @@ fn mockExporter(metrics: pbmetrics.MetricsData) MetricReadError!void {
} // only one resource metrics is expected in this mock
}

// test harness to build an exporter that times out.
fn waiterExporter(_: pbmetrics.MetricsData) MetricReadError!void {
// Sleep for 1 second to simulate a slow exporter.
std.time.sleep(std.time.ns_per_ms * 1000);
return;
}

test "build no-op metric exporter" {
const exporter: *const ExportFn = noopExporter;
var me = MetricExporter.new(exporter);
Expand Down Expand Up @@ -254,3 +281,38 @@ test "exported metrics by calling metric reader" {

try reader.collect();
}

test "metric exporter force flush succeeds" {
var me = MetricExporter.new(noopExporter);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
};
defer metrics.deinit();
const result = me.exportBatch(metrics);
try std.testing.expectEqual(ExportResult.Success, result);

try me.forceFlush(1000);
}

fn backgroundRunner(me: *MetricExporter, metrics: pbmetrics.MetricsData) !void {
_ = me.exportBatch(metrics);
}

test "metric exporter force flush fails" {
var me = MetricExporter.new(waiterExporter);

const metrics = pbmetrics.MetricsData{
.resource_metrics = std.ArrayList(pbmetrics.ResourceMetrics).init(std.testing.allocator),
};
defer metrics.deinit();
var bg = try std.Thread.spawn(
.{},
backgroundRunner,
.{ &me, metrics },
);
bg.detach();

const e = me.forceFlush(0);
try std.testing.expectError(MetricReadError.ForceFlushTimedOut, e);
}

0 comments on commit bf49a43

Please sign in to comment.