Skip to content

Commit

Permalink
Register telemetry callbacks only once
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jan 13, 2025
1 parent 4df8f58 commit 0708c3c
Showing 1 changed file with 45 additions and 12 deletions.
57 changes: 45 additions & 12 deletions core/src/telemetry/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@ use opentelemetry_sdk::{
Resource,
};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, time::Duration};
use std::{
collections::{HashMap, HashSet},
sync::{Arc, RwLock},
time::Duration,
};
use tracing::trace;

type U64Gauges = HashMap<&'static str, (u64, Vec<KeyValue>)>;
type F64Gauges = HashMap<&'static str, (f64, Vec<KeyValue>)>;

// NOTE: Buffers are less space efficient, as opposed to the solution with in place compute.
// That can be optimized by using dedicated data structure with proper bounds.
Expand All @@ -28,6 +36,9 @@ pub struct Metrics {
counters: HashMap<&'static str, Counter<u64>>,
metric_buffer: Vec<Record>,
counter_buffer: Vec<MetricCounter>,
u64_gauges: Arc<RwLock<U64Gauges>>,
f64_gauges: Arc<RwLock<F64Gauges>>,
registred_gauges: HashSet<&'static str>,
}

impl Metrics {
Expand All @@ -42,26 +53,34 @@ impl Metrics {
.collect()
}

fn record_u64(&self, name: &'static str, value: u64, attributes: Vec<KeyValue>) -> Result<()> {
fn register_u64_gauge(&self, name: &'static str) {
let gauge_name = self.gauge_name(name);
let u64_gauges = self.u64_gauges.clone();
self.meter
.u64_observable_gauge(gauge_name)
.u64_observable_gauge(gauge_name.clone())
.with_callback(move |observer| {
observer.observe(value, &attributes);
let u64_gauges = u64_gauges.read().unwrap();
if let Some((value, attributes)) = u64_gauges.get(name) {
trace!("Observed gauge: {gauge_name}, {value}, {attributes:?}");
observer.observe(*value, &attributes.clone());
};
})
.build();
Ok(())
}

fn record_f64(&self, name: &'static str, value: f64, attributes: Vec<KeyValue>) -> Result<()> {
fn register_f64_gauge(&self, name: &'static str) {
let gauge_name = self.gauge_name(name);
let f64_gauges = self.f64_gauges.clone();
self.meter
.f64_observable_gauge(gauge_name)
.f64_observable_gauge(gauge_name.clone())
.with_callback(move |observer| {
observer.observe(value, &attributes);
let f64_gauges = f64_gauges.read().unwrap();
if let Some((value, attributes)) = f64_gauges.get(name) {
trace!("Observed gauge: {gauge_name}, {value}, {attributes:?}");
observer.observe(*value, &attributes.clone());
};
})
.build();
Ok(())
}

/// Puts counter to the counter buffer if it is allowed.
Expand Down Expand Up @@ -102,13 +121,24 @@ impl Metrics {
self.counters[&counter].add(value, &metric_attributes);
}

// TODO: Aggregate errors instead of early return
let mut u64_gauges = self.u64_gauges.write().unwrap();
for (metric, value) in metrics_u64.into_iter() {
self.record_u64(metric, value, metric_attributes.clone())?;
if !self.registred_gauges.contains(metric) {
self.register_u64_gauge(metric);
self.registred_gauges.insert(metric);
}
let metric_attributes = metric_attributes.clone();
u64_gauges.insert(metric, (value, metric_attributes));
}

let mut f64_gauges = self.f64_gauges.write().unwrap();
for (metric, value) in metrics_f64.into_iter() {
self.record_f64(metric, value, metric_attributes.clone())?;
if !self.registred_gauges.contains(metric) {
self.register_f64_gauge(metric);
self.registred_gauges.insert(metric);
}
let metric_attributes = metric_attributes.clone();
f64_gauges.insert(metric, (value, metric_attributes));
}

Ok(())
Expand Down Expand Up @@ -283,6 +313,9 @@ pub fn initialize(
counters,
metric_buffer: vec![],
counter_buffer: vec![],
u64_gauges: Default::default(),
f64_gauges: Default::default(),
registred_gauges: Default::default(),
})
}

Expand Down

0 comments on commit 0708c3c

Please sign in to comment.