diff --git a/core/src/telemetry/otlp.rs b/core/src/telemetry/otlp.rs index 9d4655f1c..da3398e25 100644 --- a/core/src/telemetry/otlp.rs +++ b/core/src/telemetry/otlp.rs @@ -16,7 +16,15 @@ use opentelemetry_sdk::{ Resource, }; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::{Arc, RwLock}, + time::Duration, +}; +use tracing::trace; + +type U64Gauges = HashMap<&'static str, (u64, Vec)>; +type F64Gauges = HashMap<&'static str, (f64, Vec)>; // NOTE: Buffers are less space efficient, as opposed to the solution with in place compute. // That can be optimized by using dedicated data structure with proper bounds. @@ -28,6 +36,9 @@ pub struct Metrics { counters: HashMap<&'static str, Counter>, metric_buffer: Vec, counter_buffer: Vec, + u64_gauges: Arc>, + f64_gauges: Arc>, + registred_gauges: HashSet<&'static str>, } impl Metrics { @@ -42,26 +53,34 @@ impl Metrics { .collect() } - fn record_u64(&self, name: &'static str, value: u64, attributes: Vec) -> Result<()> { + fn register_u64_gauge(&self, name: &'static str) { let gauge_name = self.gauge_name(name); + let u64_gauges = self.u64_gauges.clone(); self.meter - .u64_observable_gauge(gauge_name) + .u64_observable_gauge(gauge_name.clone()) .with_callback(move |observer| { - observer.observe(value, &attributes); + let u64_gauges = u64_gauges.read().unwrap(); + if let Some((value, attributes)) = u64_gauges.get(name) { + trace!("Observed gauge: {gauge_name}, {value}, {attributes:?}"); + observer.observe(*value, &attributes.clone()); + }; }) .build(); - Ok(()) } - fn record_f64(&self, name: &'static str, value: f64, attributes: Vec) -> Result<()> { + fn register_f64_gauge(&self, name: &'static str) { let gauge_name = self.gauge_name(name); + let f64_gauges = self.f64_gauges.clone(); self.meter - .f64_observable_gauge(gauge_name) + .f64_observable_gauge(gauge_name.clone()) .with_callback(move |observer| { - observer.observe(value, &attributes); + let f64_gauges = f64_gauges.read().unwrap(); + if let Some((value, attributes)) = f64_gauges.get(name) { + trace!("Observed gauge: {gauge_name}, {value}, {attributes:?}"); + observer.observe(*value, &attributes.clone()); + }; }) .build(); - Ok(()) } /// Puts counter to the counter buffer if it is allowed. @@ -102,13 +121,24 @@ impl Metrics { self.counters[&counter].add(value, &metric_attributes); } - // TODO: Aggregate errors instead of early return + let mut u64_gauges = self.u64_gauges.write().unwrap(); for (metric, value) in metrics_u64.into_iter() { - self.record_u64(metric, value, metric_attributes.clone())?; + if !self.registred_gauges.contains(metric) { + self.register_u64_gauge(metric); + self.registred_gauges.insert(metric); + } + let metric_attributes = metric_attributes.clone(); + u64_gauges.insert(metric, (value, metric_attributes)); } + let mut f64_gauges = self.f64_gauges.write().unwrap(); for (metric, value) in metrics_f64.into_iter() { - self.record_f64(metric, value, metric_attributes.clone())?; + if !self.registred_gauges.contains(metric) { + self.register_f64_gauge(metric); + self.registred_gauges.insert(metric); + } + let metric_attributes = metric_attributes.clone(); + f64_gauges.insert(metric, (value, metric_attributes)); } Ok(()) @@ -283,6 +313,9 @@ pub fn initialize( counters, metric_buffer: vec![], counter_buffer: vec![], + u64_gauges: Default::default(), + f64_gauges: Default::default(), + registred_gauges: Default::default(), }) }