diff --git a/client/src/config.rs b/client/src/config.rs index 7ba81c10f..e83efa559 100644 --- a/client/src/config.rs +++ b/client/src/config.rs @@ -84,7 +84,6 @@ impl From<&RuntimeConfig> for MaintenanceConfig { replication_factor: val.libp2p.kademlia.record_replication_factor.get() as u16, query_timeout: val.libp2p.kademlia.query_timeout, pruning_interval: val.libp2p.kademlia.store_pruning_interval, - telemetry_flush_interval: val.otel.ot_flush_block_interval, automatic_server_mode: val.libp2p.kademlia.automatic_server_mode, total_memory_gb_threshold: val.total_memory_gb_threshold, num_cpus_threshold: val.num_cpus_threshold, diff --git a/client/src/main.rs b/client/src/main.rs index c60e0ae31..8cb2c233e 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -296,23 +296,27 @@ async fn run( // construct Metric Attributes and initialize Metrics let metric_attributes = vec![ - ("version".to_string(), version.to_string()), - ("role".to_string(), "lightnode".to_string()), - ("origin".to_string(), cfg.origin.to_string()), - ("peerID".to_string(), peer_id.to_string()), - ("avail_address".to_string(), identity_cfg.avail_public_key), - ("network".to_string(), Network::name(&cfg.genesis_hash)), - ("client_id".to_string(), client_id.to_string()), - ("execution_id".to_string(), execution_id.to_string()), + ("version", version.to_string()), + ("role", "lightnode".to_string()), + ("origin", cfg.origin.to_string()), + ("peerID", peer_id.to_string()), + ("avail_address", identity_cfg.avail_public_key), + ("network", Network::name(&cfg.genesis_hash)), + ("client_id", client_id.to_string()), + ("execution_id", execution_id.to_string()), ( - "client_alias".to_string(), + "client_alias", cfg.client_alias.clone().unwrap_or("".to_string()), ), ]; - let metrics = - telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone()) - .wrap_err("Unable to initialize OpenTelemetry service")?; + let metrics = telemetry::otlp::initialize( + cfg.project_name.clone(), + &cfg.origin, + cfg.otel.clone(), + metric_attributes, + ) + .wrap_err("Unable to initialize OpenTelemetry service")?; let rpc_host = db .get(RpcNodeKey) @@ -324,7 +328,6 @@ async fn run( cfg.libp2p.kademlia.operation_mode.into(), rpc_host, Multiaddr::empty(), - metric_attributes, ); spawn_in_span(shutdown.with_cancel(async move { @@ -446,52 +449,33 @@ impl BlockStat { struct ClientState { metrics: Metrics, - kad_mode: Mode, - multiaddress: Multiaddr, - rpc_host: String, - metric_attributes: Vec<(String, String)>, active_blocks: HashMap, } impl ClientState { - fn new( - metrics: Metrics, - kad_mode: Mode, - rpc_host: String, - multiaddress: Multiaddr, - metric_attributes: Vec<(String, String)>, - ) -> Self { - ClientState { + fn new(metrics: Metrics, kad_mode: Mode, rpc_host: String, multiaddress: Multiaddr) -> Self { + let mut state = ClientState { metrics, - kad_mode, - multiaddress, - rpc_host, - metric_attributes, active_blocks: Default::default(), - } + }; + state.update_operating_mode(kad_mode); + state.update_rpc_host(rpc_host); + state.update_multiaddress(multiaddress); + state } fn update_multiaddress(&mut self, value: Multiaddr) { - self.multiaddress = value; + self.metrics + .set_attribute("multiaddress", value.to_string()); } fn update_operating_mode(&mut self, value: Mode) { - self.kad_mode = value; + self.metrics + .set_attribute("operating_mode", value.to_string()); } fn update_rpc_host(&mut self, value: String) { - self.rpc_host = value; - } - - fn attributes(&self) -> Vec<(String, String)> { - let mut attrs = vec![ - ("operating_mode".to_string(), self.kad_mode.to_string()), - ("multiaddress".to_string(), self.multiaddress.to_string()), - ("rpc_host".to_string(), self.rpc_host.to_string()), - ]; - - attrs.extend(self.metric_attributes.clone()); - attrs + self.metrics.set_attribute("rpc_host", value); } fn get_block_stat(&mut self, block_num: u32) -> Result<&mut BlockStat> { @@ -576,40 +560,41 @@ impl ClientState { mut lc_receiver: UnboundedReceiver, mut rpc_receiver: broadcast::Receiver, ) { - self.metrics.count(MetricCounter::Starts, self.attributes()); + self.metrics.count(MetricCounter::Starts); loop { select! { Some(p2p_event) = p2p_receiver.recv() => { match p2p_event { P2pEvent::Count => { - self.metrics.count(MetricCounter::EventLoopEvent, self.attributes()); + self.metrics.count(MetricCounter::EventLoopEvent); }, P2pEvent::IncomingGetRecord => { - self.metrics.count(MetricCounter::IncomingGetRecord, self.attributes()); + self.metrics.count(MetricCounter::IncomingGetRecord); }, P2pEvent::IncomingPutRecord => { - self.metrics.count(MetricCounter::IncomingPutRecord, self.attributes()); + self.metrics.count(MetricCounter::IncomingPutRecord); }, P2pEvent::KadModeChange(mode) => { + self.update_operating_mode(mode); }, P2pEvent::Ping(rtt) => { self.metrics.record(MetricValue::DHTPingLatency(rtt.as_millis() as f64)); }, P2pEvent::IncomingConnection => { - self.metrics.count(MetricCounter::IncomingConnections, self.attributes()); + self.metrics.count(MetricCounter::IncomingConnections); }, P2pEvent::IncomingConnectionError => { - self.metrics.count(MetricCounter::IncomingConnectionErrors, self.attributes()); + self.metrics.count(MetricCounter::IncomingConnectionErrors); }, P2pEvent::MultiaddressUpdate(address) => { self.update_multiaddress(address); }, P2pEvent::EstablishedConnection => { - self.metrics.count(MetricCounter::EstablishedConnections, self.attributes()); + self.metrics.count(MetricCounter::EstablishedConnections); }, P2pEvent::OutgoingConnectionError => { - self.metrics.count(MetricCounter::OutgoingConnectionErrors, self.attributes()); + self.metrics.count(MetricCounter::OutgoingConnectionErrors); }, P2pEvent::PutRecord { block_num, records } => { self.handle_new_put_record(block_num, records); @@ -634,16 +619,6 @@ impl ClientState { } Some(maintenance_event) = maintenance_receiver.recv() => { match maintenance_event { - MaintenanceEvent::FlushMetrics(block_num) => { - if let Err(error) = self.metrics.flush(self.attributes()) { - error!( - block_num, - "Could not handle Flush Maintenance event properly: {error}" - ); - } else { - info!(block_num, "Flushing metrics finished"); - }; - }, MaintenanceEvent::RecordStats { connected_peers, block_confidence_treshold, @@ -656,7 +631,7 @@ impl ClientState { self.metrics.record(MetricValue::DHTQueryTimeout(query_timeout)); }, MaintenanceEvent::CountUps => { - self.metrics.count(MetricCounter::Up, self.attributes()); + self.metrics.count(MetricCounter::Up); }, } } @@ -666,7 +641,7 @@ impl ClientState { self.metrics.record(MetricValue::BlockProcessingDelay(delay)); }, LcEvent::CountSessionBlocks => { - self.metrics.count(MetricCounter::SessionBlocks,self.attributes()); + self.metrics.count(MetricCounter::SessionBlocks); }, LcEvent::RecordBlockHeight(block_num) => { self.metrics.record(MetricValue::BlockHeight(block_num)); diff --git a/core/src/maintenance.rs b/core/src/maintenance.rs index be04f00b3..cbf30f1da 100644 --- a/core/src/maintenance.rs +++ b/core/src/maintenance.rs @@ -10,7 +10,6 @@ use crate::{ }; pub enum OutputEvent { - FlushMetrics(u32), RecordStats { connected_peers: usize, block_confidence_treshold: f64, @@ -34,11 +33,6 @@ pub async fn process_block( } } - if block_number % maintenance_config.telemetry_flush_interval == 0 { - info!(block_number, "Flushing metrics..."); - event_sender.send(OutputEvent::FlushMetrics(block_number))?; - } - p2p_client .shrink_kademlia_map() .await diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 36149d482..537ca53f3 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -43,12 +43,19 @@ impl MetricName for MetricCounter { } impl MetricCounter { - fn is_buffered(&self) -> bool { - !matches!(self, MetricCounter::Starts) - } - - fn as_last(&self) -> bool { - matches!(self, MetricCounter::Up) + pub fn default_values() -> Vec { + vec![ + MetricCounter::Starts, + MetricCounter::Up, + MetricCounter::SessionBlocks, + MetricCounter::OutgoingConnectionErrors, + MetricCounter::IncomingConnectionErrors, + MetricCounter::IncomingConnections, + MetricCounter::EstablishedConnections, + MetricCounter::IncomingPutRecord, + MetricCounter::IncomingGetRecord, + MetricCounter::EventLoopEvent, + ] } fn is_allowed(&self, origin: &Origin) -> bool { @@ -85,6 +92,29 @@ pub enum MetricValue { RPCCallDuration(f64), } +impl MetricValue { + pub fn default_values() -> Vec { + vec![ + MetricValue::BlockHeight(0), + MetricValue::BlockConfidence(0.0), + MetricValue::BlockConfidenceThreshold(0.0), + MetricValue::BlockProcessingDelay(0.0), + MetricValue::DHTReplicationFactor(0), + MetricValue::DHTFetched(0.0), + MetricValue::DHTFetchedPercentage(0.0), + MetricValue::DHTFetchDuration(0.0), + MetricValue::DHTPutDuration(0.0), + MetricValue::DHTPutSuccess(0.0), + MetricValue::DHTConnectedPeers(0), + MetricValue::DHTQueryTimeout(0), + MetricValue::DHTPingLatency(0.0), + MetricValue::RPCFetched(0.0), + MetricValue::RPCFetchDuration(0.0), + MetricValue::RPCCallDuration(0.0), + ] + } +} + impl MetricName for MetricValue { fn name(&self) -> &'static str { use MetricValue::*; diff --git a/core/src/telemetry/otlp.rs b/core/src/telemetry/otlp.rs index 9d4655f1c..30611ae7d 100644 --- a/core/src/telemetry/otlp.rs +++ b/core/src/telemetry/otlp.rs @@ -16,65 +16,50 @@ use opentelemetry_sdk::{ Resource, }; use serde::{Deserialize, Serialize}; -use std::{collections::HashMap, time::Duration}; +use std::{ + cmp, + collections::HashMap, + sync::{Arc, RwLock}, + time::Duration, +}; +use tracing::trace; + +type U64Gauges = HashMap<&'static str, (u64, Vec)>; +type F64Gauges = HashMap<&'static str, (f64, u64, Vec)>; +type Attributes = HashMap<&'static str, String>; // NOTE: Buffers are less space efficient, as opposed to the solution with in place compute. // That can be optimized by using dedicated data structure with proper bounds. #[derive(Debug)] pub struct Metrics { - meter: Meter, - project_name: ProjectName, origin: Origin, counters: HashMap<&'static str, Counter>, - metric_buffer: Vec, - counter_buffer: Vec, + u64_gauges: Arc>, + f64_gauges: Arc>, + attributes: Arc>, } impl Metrics { - fn gauge_name(&self, name: &'static str) -> String { - format!("{project_name}.{name}", project_name = self.project_name) + pub fn set_attribute(&mut self, name: &'static str, value: String) { + let mut attributes = self.attributes.write().unwrap(); + attributes.insert(name, value); } - fn map_attributes(&self, attributes: Vec<(String, String)>) -> Vec { + fn attributes(&self) -> Vec { + let attributes = self.attributes.read().unwrap(); attributes - .into_iter() - .map(|(k, v)| KeyValue::new(k, v)) + .iter() + .map(|(k, v)| KeyValue::new(*k, v.clone())) .collect() } - fn record_u64(&self, name: &'static str, value: u64, attributes: Vec) -> Result<()> { - let gauge_name = self.gauge_name(name); - self.meter - .u64_observable_gauge(gauge_name) - .with_callback(move |observer| { - observer.observe(value, &attributes); - }) - .build(); - Ok(()) - } - - fn record_f64(&self, name: &'static str, value: f64, attributes: Vec) -> Result<()> { - let gauge_name = self.gauge_name(name); - self.meter - .f64_observable_gauge(gauge_name) - .with_callback(move |observer| { - observer.observe(value, &attributes); - }) - .build(); - Ok(()) - } - /// Puts counter to the counter buffer if it is allowed. /// If counter is not buffered, counter is incremented. - pub fn count(&mut self, counter: super::MetricCounter, attributes: Vec<(String, String)>) { + pub fn count(&mut self, counter: super::MetricCounter) { if !counter.is_allowed(&self.origin) { return; } - if !counter.is_buffered() { - self.counters[&counter.name()].add(1, &self.map_attributes(attributes)); - return; - } - self.counter_buffer.push(counter); + self.counters[&counter.name()].add(1, &self.attributes()); } /// Puts metric to the metric buffer if it is allowed. @@ -86,32 +71,26 @@ impl Metrics { return; } - self.metric_buffer.push(value.into()); - } - - /// Calculates counters and average metrics, and flushes buffers to the collector. - pub fn flush(&mut self, attributes: Vec<(String, String)>) -> Result<()> { - let metric_attributes = self.map_attributes(attributes); - let counters = flatten_counters(&self.counter_buffer); - self.counter_buffer.clear(); - - let (metrics_u64, metrics_f64) = flatten_metrics(&self.metric_buffer); - self.metric_buffer.clear(); - - for (counter, value) in counters { - self.counters[&counter].add(value, &metric_attributes); - } - - // TODO: Aggregate errors instead of early return - for (metric, value) in metrics_u64.into_iter() { - self.record_u64(metric, value, metric_attributes.clone())?; - } - - for (metric, value) in metrics_f64.into_iter() { - self.record_f64(metric, value, metric_attributes.clone())?; + let record: Record = value.into(); + match record { + Record::MaxU64(name, value) => { + let mut u64_gauges = self.u64_gauges.write().unwrap(); + u64_gauges + .entry(name) + .and_modify(|(old, _)| *old = cmp::max(*old, value)) + .or_insert((value, self.attributes())); + }, + Record::AvgF64(name, value) => { + let mut f64_gauges = self.f64_gauges.write().unwrap(); + f64_gauges + .entry(name) + .and_modify(|(average, n, _)| { + *n += 1; + *average += value - *average / *n as f64; + }) + .or_insert((value, 1, self.attributes())); + }, } - - Ok(()) } } @@ -153,76 +132,72 @@ impl From for Record { } } -/// Counts occurrences of counters in the provided buffer. -/// Returned value is a `HashMap` where the keys are the counter name, -/// and values are the counts of those counters. -fn flatten_counters(buffer: &[MetricCounter]) -> HashMap<&'static str, u64> { - let mut result = HashMap::new(); - for counter in buffer { - result - .entry(counter.name()) - .and_modify(|count| { - if !counter.as_last() { - *count += 1 - } - }) - .or_insert(1); - } - result +fn init_counters( + meter: Meter, + origin: &Origin, + project_name: ProjectName, +) -> HashMap<&'static str, Counter> { + MetricCounter::default_values() + .iter() + .filter(|counter| MetricCounter::is_allowed(counter, origin)) + .map(|counter| { + let otel_counter_name = format!("{project_name}.{name}", name = counter.name()); + // Keep the `static str as the local buffer map key, but change the OTel counter name` + (counter.name(), meter.u64_counter(otel_counter_name).build()) + }) + .collect() } -/// Aggregates buffered metrics into `u64` or `f64` values, depending on the metric. -/// Returned values are a `HashMap`s where the keys are the metric name, -/// and values are the aggregations (avg, max, etc.) of those metrics. -fn flatten_metrics(buffer: &[Record]) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) { - let mut u64_maximums: HashMap<&'static str, Vec> = HashMap::new(); - let mut f64_averages: HashMap<&'static str, Vec> = HashMap::new(); +fn init_gauges( + meter: Meter, + origin: &Origin, + project_name: ProjectName, +) -> (Arc>, Arc>) { + let u64_gauges: Arc> = Default::default(); + let f64_gauges: Arc> = Default::default(); + + for value in MetricValue::default_values() { + if !value.is_allowed(origin) { + continue; + } - for value in buffer { - match value { - Record::MaxU64(name, number) => u64_maximums.entry(name).or_default().push(*number), - Record::AvgF64(name, number) => f64_averages.entry(name).or_default().push(*number), + match value.into() { + Record::MaxU64(name, _) => { + let gauge_name = project_name.gauge_name(name); + let u64_gauges = u64_gauges.clone(); + + meter + .u64_observable_gauge(gauge_name.clone()) + .with_callback(move |observer| { + let mut u64_gauges = u64_gauges.write().unwrap(); + if let Some((value, attributes)) = u64_gauges.get(name) { + trace!("Observed gauge: {gauge_name}, {value}, {attributes:?}"); + observer.observe(*value, &attributes.clone()); + }; + u64_gauges.remove(name); + }) + .build(); + }, + Record::AvgF64(name, _) => { + let gauge_name = project_name.gauge_name(name); + let f64_gauges = f64_gauges.clone(); + + meter + .f64_observable_gauge(gauge_name.clone()) + .with_callback(move |observer| { + let mut f64_gauges = f64_gauges.write().unwrap(); + if let Some((value, _, attributes)) = f64_gauges.get(name) { + trace!("Observed gauge: {gauge_name}, {value}, {attributes:?}"); + observer.observe(*value, &attributes.clone()); + }; + f64_gauges.remove(name); + }) + .build(); + }, } } - let u64_metrics = u64_maximums - .into_iter() - .map(|(name, v)| (name, v.into_iter().max().unwrap_or(0))) - .collect(); - - let f64_metrics = f64_averages - .into_iter() - .map(|(name, v)| (name, v.iter().sum::() / v.len() as f64)) - .collect(); - - (u64_metrics, f64_metrics) -} - -fn init_counters( - meter: Meter, - origin: &Origin, - project_name: ProjectName, -) -> HashMap<&'static str, Counter> { - [ - MetricCounter::Starts, - MetricCounter::Up, - MetricCounter::SessionBlocks, - MetricCounter::OutgoingConnectionErrors, - MetricCounter::IncomingConnectionErrors, - MetricCounter::IncomingConnections, - MetricCounter::EstablishedConnections, - MetricCounter::IncomingPutRecord, - MetricCounter::IncomingGetRecord, - MetricCounter::EventLoopEvent, - ] - .iter() - .filter(|counter| MetricCounter::is_allowed(counter, origin)) - .map(|counter| { - let otel_counter_name = format!("{project_name}.{name}", name = counter.name()); - // Keep the `static str as the local buffer map key, but change the OTel counter name` - (counter.name(), meter.u64_counter(otel_counter_name).build()) - }) - .collect() + (u64_gauges, f64_gauges) } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -232,7 +207,6 @@ pub struct OtelConfig { pub ot_collector_endpoint: String, pub ot_export_period: u64, pub ot_export_timeout: u64, - pub ot_flush_block_interval: u32, } impl Default for OtelConfig { @@ -241,7 +215,6 @@ impl Default for OtelConfig { ot_collector_endpoint: "http://127.0.0.1:4317".to_string(), ot_export_period: 300, ot_export_timeout: 10, - ot_flush_block_interval: 15, } } } @@ -250,7 +223,9 @@ pub fn initialize( project_name: ProjectName, origin: &Origin, ot_config: OtelConfig, + attributes: Vec<(&'static str, String)>, ) -> Result { + let attributes: Attributes = attributes.into_iter().collect(); let exporter = MetricExporter::builder() .with_tonic() .with_endpoint(&ot_config.ot_collector_endpoint) @@ -276,124 +251,12 @@ pub fn initialize( // Initialize counters - they need to persist unlike Gauges that are recreated on every record let counters = init_counters(meter.clone(), origin, project_name.clone()); + let (u64_gauges, f64_gauges) = init_gauges(meter, origin, project_name.clone()); Ok(Metrics { - meter, - project_name, origin: origin.clone(), counters, - metric_buffer: vec![], - counter_buffer: vec![], + u64_gauges, + f64_gauges, + attributes: Arc::new(RwLock::new(attributes)), }) } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_flatten_counters() { - use MetricCounter::*; - // Empty buffer - assert!(flatten_counters(&[] as &[MetricCounter]).is_empty()); - - let one = flatten_counters(&[Starts]); - let mut expected = HashMap::new(); - expected.insert(Starts.name(), 1); - assert_eq!(one, expected); - - let two = flatten_counters(&[Starts, Starts]); - let mut expected = HashMap::new(); - expected.insert(Starts.name(), 2); - assert_eq!(two, expected); - - let buffer = vec![ - Starts, - Up, - SessionBlocks, - IncomingConnectionErrors, - IncomingConnectionErrors, - IncomingConnections, - Up, - Starts, - IncomingGetRecord, - Up, - IncomingPutRecord, - Starts, - ]; - let result = flatten_counters(&buffer); - let mut expected = HashMap::new(); - expected.insert(Starts.name(), 3); - expected.insert(Up.name(), 1); - expected.insert(SessionBlocks.name(), 1); - expected.insert(IncomingConnectionErrors.name(), 2); - expected.insert(IncomingConnections.name(), 1); - expected.insert(IncomingGetRecord.name(), 1); - expected.insert(IncomingPutRecord.name(), 1); - assert_eq!(result, expected); - } - - fn flatten_metrics( - values: Vec, - ) -> (HashMap<&'static str, u64>, HashMap<&'static str, f64>) { - super::flatten_metrics(&values.into_iter().map(Into::into).collect::>()) - } - - #[test] - fn test_flatten_metrics() { - let (m_u64, m_f64) = flatten_metrics(vec![]); - assert!(m_u64.is_empty()); - assert!(m_f64.is_empty()); - - let buffer = vec![MetricValue::BlockConfidence(90.0)]; - let (m_u64, m_f64) = flatten_metrics(buffer); - assert!(m_u64.is_empty()); - assert_eq!(m_f64.len(), 1); - assert_eq!(m_f64.get("light.block.confidence"), Some(&90.0)); - - let buffer = vec![ - MetricValue::BlockConfidence(90.0), - MetricValue::BlockHeight(1), - MetricValue::BlockConfidence(93.0), - ]; - let (m_u64, m_f64) = flatten_metrics(buffer); - assert_eq!(m_u64.len(), 1); - assert_eq!(m_u64.get("light.block.height"), Some(&1)); - assert_eq!(m_f64.len(), 1); - assert_eq!(m_f64.get("light.block.confidence"), Some(&91.5)); - - let buffer = vec![ - MetricValue::BlockConfidence(90.0), - MetricValue::BlockHeight(1), - MetricValue::BlockConfidence(93.0), - MetricValue::BlockConfidence(93.0), - MetricValue::BlockConfidence(99.0), - MetricValue::BlockHeight(10), - MetricValue::BlockHeight(1), - ]; - let (m_u64, m_f64) = flatten_metrics(buffer); - assert_eq!(m_u64.len(), 1); - assert_eq!(m_u64.get("light.block.height"), Some(&10)); - assert_eq!(m_f64.len(), 1); - assert_eq!(m_f64.get("light.block.confidence"), Some(&93.75)); - - let buffer = vec![ - MetricValue::DHTConnectedPeers(90), - MetricValue::DHTFetchDuration(1.0), - MetricValue::DHTPutSuccess(10.0), - MetricValue::BlockConfidence(99.0), - MetricValue::DHTFetchDuration(2.0), - MetricValue::DHTFetchDuration(2.1), - MetricValue::BlockHeight(999), - MetricValue::DHTConnectedPeers(80), - MetricValue::BlockConfidence(98.0), - ]; - let (m_u64, m_f64) = flatten_metrics(buffer); - assert_eq!(m_u64.len(), 1); - assert_eq!(m_u64.get("light.block.height"), Some(&999)); - assert_eq!(m_f64.len(), 4); - assert_eq!(m_f64.get("light.dht.put_success"), Some(&10.0)); - assert_eq!(m_f64.get("light.dht.fetch_duration"), Some(&1.7)); - assert_eq!(m_f64.get("light.block.confidence"), Some(&98.5)); - assert_eq!(m_f64.get("light.dht.connected_peers"), Some(&85.0)); - } -} diff --git a/core/src/types.rs b/core/src/types.rs index 13c450293..1eabbdf5d 100644 --- a/core/src/types.rs +++ b/core/src/types.rs @@ -416,7 +416,6 @@ pub struct MaintenanceConfig { pub replication_factor: u16, pub query_timeout: Duration, pub pruning_interval: u32, - pub telemetry_flush_interval: u32, pub automatic_server_mode: bool, pub total_memory_gb_threshold: f64, pub num_cpus_threshold: usize, @@ -639,6 +638,10 @@ impl ProjectName { pub fn new>(name: S) -> Self { ProjectName(name.into().to_case(Case::Snake)) } + + pub fn gauge_name(&self, name: &'static str) -> String { + format!("{self}.{name}") + } } impl Default for ProjectName { diff --git a/crawler/src/main.rs b/crawler/src/main.rs index a2d6fb803..d028a1f9a 100644 --- a/crawler/src/main.rs +++ b/crawler/src/main.rs @@ -28,7 +28,7 @@ use tokio::{ mpsc::{self, UnboundedReceiver}, }, }; -use tracing::{error, info, span, warn, Level}; +use tracing::{info, span, warn, Level}; mod config; @@ -156,7 +156,6 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> let (maintenance_sender, maintenance_receiver) = mpsc::unbounded_channel::(); spawn_in_span(shutdown.with_cancel(maintenance::run( - config.otel.ot_flush_block_interval, block_rx, shutdown.clone(), maintenance_sender, @@ -174,20 +173,21 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> ))); let metric_attributes = vec![ - ("role".to_string(), "crawler".to_string()), - ("origin".to_string(), config.origin.to_string()), - ("version".to_string(), version.to_string()), - ("peerID".to_string(), p2p_peer_id.to_string()), - ("partition_size".to_string(), partition_size), - ("network".to_string(), Network::name(&config.genesis_hash)), - ("client_alias".to_string(), config.client_alias), - ("operating_mode".to_string(), "client".to_string()), + ("role", "crawler".to_string()), + ("origin", config.origin.to_string()), + ("version", version.to_string()), + ("peerID", p2p_peer_id.to_string()), + ("partition_size", partition_size), + ("network", Network::name(&config.genesis_hash)), + ("client_alias", config.client_alias), + ("operating_mode", "client".to_string()), ]; let metrics = otlp::initialize( ProjectName::new("avail".to_string()), &config.origin, config.otel.clone(), + metric_attributes, ) .wrap_err("Unable to initialize OpenTelemetry service")?; @@ -196,7 +196,7 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> .map(|node| node.host) .ok_or_else(|| eyre!("No connected host found"))?; - let mut state = CrawlerState::new(metrics, String::default(), rpc_host, metric_attributes); + let mut state = CrawlerState::new(metrics, String::default(), rpc_host); spawn_in_span(shutdown.with_cancel(async move { state @@ -210,38 +210,22 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> struct CrawlerState { metrics: Metrics, - multiaddress: String, - rpc_host: String, - metric_attributes: Vec<(String, String)>, } impl CrawlerState { - fn new( - metrics: Metrics, - multiaddress: String, - rpc_host: String, - metric_attributes: Vec<(String, String)>, - ) -> Self { - CrawlerState { - metrics, - multiaddress, - rpc_host, - metric_attributes, - } + fn new(metrics: Metrics, multiaddress: String, rpc_host: String) -> Self { + let mut state = CrawlerState { metrics }; + state.update_rpc_host(rpc_host); + state.update_multiaddress(multiaddress); + state } fn update_multiaddress(&mut self, value: String) { - self.multiaddress = value; + self.metrics.set_attribute("multiaddress", value); } - fn attributes(&self) -> Vec<(String, String)> { - let mut attrs = vec![ - ("multiaddress".to_string(), self.multiaddress.clone()), - ("rpc_host".to_string(), self.rpc_host.to_string()), - ]; - - attrs.extend(self.metric_attributes.clone()); - attrs + fn update_rpc_host(&mut self, value: String) { + self.metrics.set_attribute("rpc_host", value); } async fn handle_events( @@ -250,32 +234,32 @@ impl CrawlerState { mut maintenance_receiver: UnboundedReceiver, mut crawler_receiver: UnboundedReceiver, ) { - self.metrics.count(MetricCounter::Starts, self.attributes()); + self.metrics.count(MetricCounter::Starts); loop { select! { Some(p2p_event) = p2p_receiver.recv() => { match p2p_event { P2pEvent::Count => { - self.metrics.count(MetricCounter::EventLoopEvent, self.attributes()); + self.metrics.count(MetricCounter::EventLoopEvent); }, P2pEvent::Ping(rtt) => { self.metrics.record(MetricValue::DHTPingLatency(rtt.as_millis() as f64)) ; }, P2pEvent::IncomingConnection => { - self.metrics.count(MetricCounter::IncomingConnections, self.attributes()); + self.metrics.count(MetricCounter::IncomingConnections) }, P2pEvent::IncomingConnectionError => { - self.metrics.count(MetricCounter::IncomingConnectionErrors, self.attributes()); + self.metrics.count(MetricCounter::IncomingConnectionErrors) }, P2pEvent::MultiaddressUpdate(address) => { - self.update_multiaddress(address.to_string()); + self.update_multiaddress(address.to_string()) }, P2pEvent::EstablishedConnection => { - self.metrics.count(MetricCounter::EstablishedConnections, self.attributes()); + self.metrics.count(MetricCounter::EstablishedConnections) }, P2pEvent::OutgoingConnectionError => { - self.metrics.count(MetricCounter::OutgoingConnectionErrors, self.attributes()); + self.metrics.count(MetricCounter::OutgoingConnectionErrors); }, // Crawler doesn't need to handle all P2P events and KAD mode changes _ => {} @@ -283,18 +267,8 @@ impl CrawlerState { } Some(maintenance_event) = maintenance_receiver.recv() => { match maintenance_event { - MaintenanceEvent::FlushMetrics(block_num) => { - if let Err(error) = self.metrics.flush(self.attributes()) { - error!( - block_num, - "Could not handle Flush Maintenance event properly: {error}" - ); - } else { - info!(block_num, "Flushing metrics finished"); - }; - }, MaintenanceEvent::CountUps => { - self.metrics.count(MetricCounter::Up, self.attributes()); + self.metrics.count(MetricCounter::Up); }, } } @@ -326,12 +300,10 @@ mod maintenance { use tracing::{error, info}; pub enum OutputEvent { - FlushMetrics(u32), CountUps, } pub async fn run( - ot_flush_block_interval: u32, mut block_receiver: broadcast::Receiver, shutdown: Controller, event_sender: UnboundedSender, @@ -340,19 +312,7 @@ mod maintenance { loop { match block_receiver.recv().await.map_err(Report::from) { - Ok(block) => { - let block_num = block.block_num; - if block_num % ot_flush_block_interval == 0 { - info!(block_num, "Flushing metrics..."); - if let Err(error) = event_sender.send(OutputEvent::FlushMetrics(block_num)) - { - let error_msg = - format!("Failed to send FlushMetrics event: {:#}", error); - error!("{error_msg}"); - _ = shutdown.trigger_shutdown(error_msg); - break; - } - }; + Ok(_) => { if let Err(error) = event_sender.send(OutputEvent::CountUps) { let error_msg = format!("Failed to send CountUps event: {:#}", error); error!("{error_msg}"); diff --git a/fat/src/main.rs b/fat/src/main.rs index abffbf2bf..9d6b8e558 100644 --- a/fat/src/main.rs +++ b/fat/src/main.rs @@ -158,7 +158,6 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> let (maintenance_sender, maintenance_receiver) = mpsc::unbounded_channel::(); spawn_in_span(shutdown.with_cancel(maintenance::run( - config.otel.ot_flush_block_interval, block_rx, shutdown.clone(), maintenance_sender, @@ -196,19 +195,20 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> ))); let metric_attributes = vec![ - ("role".to_string(), "fat".to_string()), - ("origin".to_string(), Origin::FatClient.to_string()), - ("version".to_string(), version.to_string()), - ("peerID".to_string(), p2p_peer_id.to_string()), - ("partition_size".to_string(), partition_size), - ("network".to_string(), Network::name(&config.genesis_hash)), - ("operating_mode".to_string(), "client".to_string()), + ("role", "fat".to_string()), + ("origin", Origin::FatClient.to_string()), + ("version", version.to_string()), + ("peerID", p2p_peer_id.to_string()), + ("partition_size", partition_size), + ("network", Network::name(&config.genesis_hash)), + ("operating_mode", "client".to_string()), ]; let metrics = telemetry::otlp::initialize( ProjectName::new("avail".to_string()), &Origin::FatClient, config.otel.clone(), + metric_attributes, ) .wrap_err("Unable to initialize OpenTelemetry service")?; @@ -217,7 +217,7 @@ async fn run(config: Config, db: DB, shutdown: Controller) -> Result<()> .map(|node| node.host) .ok_or_else(|| eyre!("No connected host found"))?; - let mut state = FatState::new(metrics, String::default(), rpc_host, metric_attributes); + let mut state = FatState::new(metrics, String::default(), rpc_host); spawn_in_span(shutdown.with_cancel(async move { state @@ -280,44 +280,26 @@ impl BlockStat { struct FatState { metrics: Metrics, - multiaddress: String, - rpc_host: String, - metric_attributes: Vec<(String, String)>, active_blocks: HashMap, } impl FatState { - fn new( - metrics: Metrics, - multiaddress: String, - rpc_host: String, - metric_attributes: Vec<(String, String)>, - ) -> Self { - FatState { + fn new(metrics: Metrics, multiaddress: String, rpc_host: String) -> Self { + let mut state = FatState { metrics, - multiaddress, - rpc_host, - metric_attributes, active_blocks: Default::default(), - } + }; + state.update_rpc_host(rpc_host); + state.update_multiaddress(multiaddress); + state } fn update_multiaddress(&mut self, value: String) { - self.multiaddress = value; + self.metrics.set_attribute("multiaddress", value); } fn update_rpc_host(&mut self, value: String) { - self.rpc_host = value; - } - - fn attributes(&self) -> Vec<(String, String)> { - let mut attrs = vec![ - ("multiaddress".to_string(), self.multiaddress.clone()), - ("rpc_host".to_string(), self.rpc_host.to_string()), - ]; - - attrs.extend(self.metric_attributes.clone()); - attrs + self.metrics.set_attribute("rpc_host", value); } fn get_block_stat(&mut self, block_num: u32) -> Result<&mut BlockStat> { @@ -402,38 +384,38 @@ impl FatState { mut fat_receiver: UnboundedReceiver, mut rpc_receiver: broadcast::Receiver, ) { - self.metrics.count(MetricCounter::Starts, self.attributes()); + self.metrics.count(MetricCounter::Starts); loop { select! { Some(p2p_event) = p2p_receiver.recv() => { match p2p_event { P2pEvent::Count => { - self.metrics.count(MetricCounter::EventLoopEvent, self.attributes()); + self.metrics.count(MetricCounter::EventLoopEvent) }, P2pEvent::IncomingGetRecord => { - self.metrics.count(MetricCounter::IncomingGetRecord, self.attributes()); + self.metrics.count(MetricCounter::IncomingGetRecord) }, P2pEvent::IncomingPutRecord => { - self.metrics.count(MetricCounter::IncomingPutRecord, self.attributes()); + self.metrics.count(MetricCounter::IncomingPutRecord) }, P2pEvent::Ping(rtt) => { self.metrics.record(MetricValue::DHTPingLatency(rtt.as_millis() as f64)) ; }, P2pEvent::IncomingConnection => { - self.metrics.count(MetricCounter::IncomingConnections, self.attributes()); + self.metrics.count(MetricCounter::IncomingConnections) }, P2pEvent::IncomingConnectionError => { - self.metrics.count(MetricCounter::IncomingConnectionErrors, self.attributes()); + self.metrics.count(MetricCounter::IncomingConnectionErrors) }, P2pEvent::MultiaddressUpdate(address) => { self.update_multiaddress(address.to_string()); }, P2pEvent::EstablishedConnection => { - self.metrics.count(MetricCounter::EstablishedConnections, self.attributes()); + self.metrics.count(MetricCounter::EstablishedConnections) }, P2pEvent::OutgoingConnectionError => { - self.metrics.count(MetricCounter::OutgoingConnectionErrors, self.attributes()); + self.metrics.count(MetricCounter::OutgoingConnectionErrors) }, P2pEvent::PutRecord { block_num, records } => { self.handle_new_put_record(block_num, records); @@ -460,25 +442,15 @@ impl FatState { } Some(maintenance_event) = maintenance_receiver.recv() => { match maintenance_event { - MaintenanceEvent::FlushMetrics(block_num) => { - if let Err(error) = self.metrics.flush(self.attributes()) { - error!( - block_num, - "Could not handle Flush Maintenance event properly: {error}" - ); - } else { - info!(block_num, "Flushing metrics finished"); - }; - }, MaintenanceEvent::CountUps => { - self.metrics.count(MetricCounter::Up, self.attributes()); + self.metrics.count(MetricCounter::Up); }, } } Some(fat_event) = fat_receiver.recv() => { match fat_event { FatEvent::CountSessionBlocks => { - self.metrics.count(MetricCounter::SessionBlocks, self.attributes()); + self.metrics.count(MetricCounter::SessionBlocks); }, FatEvent::RecordBlockHeight(block_num) => { self.metrics.record(MetricValue::BlockHeight(block_num)); @@ -511,12 +483,10 @@ mod maintenance { use tracing::{error, info}; pub enum OutputEvent { - FlushMetrics(u32), CountUps, } pub async fn run( - ot_flush_block_interval: u32, mut block_receiver: broadcast::Receiver, shutdown: Controller, event_sender: UnboundedSender, @@ -525,19 +495,7 @@ mod maintenance { loop { match block_receiver.recv().await.map_err(Report::from) { - Ok(block) => { - let block_num = block.block_num; - if block_num % ot_flush_block_interval == 0 { - info!(block_num, "Flushing metrics..."); - if let Err(error) = event_sender.send(OutputEvent::FlushMetrics(block_num)) - { - let error_msg = - format!("Failed to send FlushMetrics event: {:#}", error); - error!("{error_msg}"); - _ = shutdown.trigger_shutdown(error_msg); - break; - } - }; + Ok(_) => { if let Err(error) = event_sender.send(OutputEvent::CountUps) { let error_msg = format!("Failed to send CountUps event: {:#}", error); error!("{error_msg}");