Skip to content

Commit

Permalink
Register telemetry gauges on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jan 14, 2025
1 parent 4df8f58 commit d704074
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 461 deletions.
1 change: 0 additions & 1 deletion client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl From<&RuntimeConfig> for MaintenanceConfig {
replication_factor: val.libp2p.kademlia.record_replication_factor.get() as u16,
query_timeout: val.libp2p.kademlia.query_timeout,
pruning_interval: val.libp2p.kademlia.store_pruning_interval,
telemetry_flush_interval: val.otel.ot_flush_block_interval,
automatic_server_mode: val.libp2p.kademlia.automatic_server_mode,
total_memory_gb_threshold: val.total_memory_gb_threshold,
num_cpus_threshold: val.num_cpus_threshold,
Expand Down
103 changes: 39 additions & 64 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,23 +296,27 @@ async fn run(

// construct Metric Attributes and initialize Metrics
let metric_attributes = vec![
("version".to_string(), version.to_string()),
("role".to_string(), "lightnode".to_string()),
("origin".to_string(), cfg.origin.to_string()),
("peerID".to_string(), peer_id.to_string()),
("avail_address".to_string(), identity_cfg.avail_public_key),
("network".to_string(), Network::name(&cfg.genesis_hash)),
("client_id".to_string(), client_id.to_string()),
("execution_id".to_string(), execution_id.to_string()),
("version", version.to_string()),
("role", "lightnode".to_string()),
("origin", cfg.origin.to_string()),
("peerID", peer_id.to_string()),
("avail_address", identity_cfg.avail_public_key),
("network", Network::name(&cfg.genesis_hash)),
("client_id", client_id.to_string()),
("execution_id", execution_id.to_string()),
(
"client_alias".to_string(),
"client_alias",
cfg.client_alias.clone().unwrap_or("".to_string()),
),
];

let metrics =
telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone())
.wrap_err("Unable to initialize OpenTelemetry service")?;
let metrics = telemetry::otlp::initialize(
cfg.project_name.clone(),
&cfg.origin,
cfg.otel.clone(),
metric_attributes,
)
.wrap_err("Unable to initialize OpenTelemetry service")?;

let rpc_host = db
.get(RpcNodeKey)
Expand All @@ -324,7 +328,6 @@ async fn run(
cfg.libp2p.kademlia.operation_mode.into(),
rpc_host,
Multiaddr::empty(),
metric_attributes,
);

spawn_in_span(shutdown.with_cancel(async move {
Expand Down Expand Up @@ -446,52 +449,33 @@ impl BlockStat {

struct ClientState {
metrics: Metrics,
kad_mode: Mode,
multiaddress: Multiaddr,
rpc_host: String,
metric_attributes: Vec<(String, String)>,
active_blocks: HashMap<u32, BlockStat>,
}

impl ClientState {
fn new(
metrics: Metrics,
kad_mode: Mode,
rpc_host: String,
multiaddress: Multiaddr,
metric_attributes: Vec<(String, String)>,
) -> Self {
ClientState {
fn new(metrics: Metrics, kad_mode: Mode, rpc_host: String, multiaddress: Multiaddr) -> Self {
let mut state = ClientState {
metrics,
kad_mode,
multiaddress,
rpc_host,
metric_attributes,
active_blocks: Default::default(),
}
};
state.update_operating_mode(kad_mode);
state.update_rpc_host(rpc_host);
state.update_multiaddress(multiaddress);
state
}

fn update_multiaddress(&mut self, value: Multiaddr) {
self.multiaddress = value;
self.metrics
.set_attribute("multiaddress", value.to_string());
}

fn update_operating_mode(&mut self, value: Mode) {
self.kad_mode = value;
self.metrics
.set_attribute("operating_mode", value.to_string());
}

fn update_rpc_host(&mut self, value: String) {
self.rpc_host = value;
}

fn attributes(&self) -> Vec<(String, String)> {
let mut attrs = vec![
("operating_mode".to_string(), self.kad_mode.to_string()),
("multiaddress".to_string(), self.multiaddress.to_string()),
("rpc_host".to_string(), self.rpc_host.to_string()),
];

attrs.extend(self.metric_attributes.clone());
attrs
self.metrics.set_attribute("rpc_host", value);
}

fn get_block_stat(&mut self, block_num: u32) -> Result<&mut BlockStat> {
Expand Down Expand Up @@ -576,40 +560,41 @@ impl ClientState {
mut lc_receiver: UnboundedReceiver<LcEvent>,
mut rpc_receiver: broadcast::Receiver<RpcEvent>,
) {
self.metrics.count(MetricCounter::Starts, self.attributes());
self.metrics.count(MetricCounter::Starts);
loop {
select! {
Some(p2p_event) = p2p_receiver.recv() => {
match p2p_event {
P2pEvent::Count => {
self.metrics.count(MetricCounter::EventLoopEvent, self.attributes());
self.metrics.count(MetricCounter::EventLoopEvent);
},
P2pEvent::IncomingGetRecord => {
self.metrics.count(MetricCounter::IncomingGetRecord, self.attributes());
self.metrics.count(MetricCounter::IncomingGetRecord);
},
P2pEvent::IncomingPutRecord => {
self.metrics.count(MetricCounter::IncomingPutRecord, self.attributes());
self.metrics.count(MetricCounter::IncomingPutRecord);
},
P2pEvent::KadModeChange(mode) => {

self.update_operating_mode(mode);
},
P2pEvent::Ping(rtt) => {
self.metrics.record(MetricValue::DHTPingLatency(rtt.as_millis() as f64));
},
P2pEvent::IncomingConnection => {
self.metrics.count(MetricCounter::IncomingConnections, self.attributes());
self.metrics.count(MetricCounter::IncomingConnections);
},
P2pEvent::IncomingConnectionError => {
self.metrics.count(MetricCounter::IncomingConnectionErrors, self.attributes());
self.metrics.count(MetricCounter::IncomingConnectionErrors);
},
P2pEvent::MultiaddressUpdate(address) => {
self.update_multiaddress(address);
},
P2pEvent::EstablishedConnection => {
self.metrics.count(MetricCounter::EstablishedConnections, self.attributes());
self.metrics.count(MetricCounter::EstablishedConnections);
},
P2pEvent::OutgoingConnectionError => {
self.metrics.count(MetricCounter::OutgoingConnectionErrors, self.attributes());
self.metrics.count(MetricCounter::OutgoingConnectionErrors);
},
P2pEvent::PutRecord { block_num, records } => {
self.handle_new_put_record(block_num, records);
Expand All @@ -634,16 +619,6 @@ impl ClientState {
}
Some(maintenance_event) = maintenance_receiver.recv() => {
match maintenance_event {
MaintenanceEvent::FlushMetrics(block_num) => {
if let Err(error) = self.metrics.flush(self.attributes()) {
error!(
block_num,
"Could not handle Flush Maintenance event properly: {error}"
);
} else {
info!(block_num, "Flushing metrics finished");
};
},
MaintenanceEvent::RecordStats {
connected_peers,
block_confidence_treshold,
Expand All @@ -656,7 +631,7 @@ impl ClientState {
self.metrics.record(MetricValue::DHTQueryTimeout(query_timeout));
},
MaintenanceEvent::CountUps => {
self.metrics.count(MetricCounter::Up, self.attributes());
self.metrics.count(MetricCounter::Up);
},
}
}
Expand All @@ -666,7 +641,7 @@ impl ClientState {
self.metrics.record(MetricValue::BlockProcessingDelay(delay));
},
LcEvent::CountSessionBlocks => {
self.metrics.count(MetricCounter::SessionBlocks,self.attributes());
self.metrics.count(MetricCounter::SessionBlocks);
},
LcEvent::RecordBlockHeight(block_num) => {
self.metrics.record(MetricValue::BlockHeight(block_num));
Expand Down
6 changes: 0 additions & 6 deletions core/src/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
};

pub enum OutputEvent {
FlushMetrics(u32),
RecordStats {
connected_peers: usize,
block_confidence_treshold: f64,
Expand All @@ -34,11 +33,6 @@ pub async fn process_block(
}
}

if block_number % maintenance_config.telemetry_flush_interval == 0 {
info!(block_number, "Flushing metrics...");
event_sender.send(OutputEvent::FlushMetrics(block_number))?;
}

p2p_client
.shrink_kademlia_map()
.await
Expand Down
42 changes: 36 additions & 6 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@ impl MetricName for MetricCounter {
}

impl MetricCounter {
fn is_buffered(&self) -> bool {
!matches!(self, MetricCounter::Starts)
}

fn as_last(&self) -> bool {
matches!(self, MetricCounter::Up)
pub fn default_values() -> Vec<MetricCounter> {
vec![
MetricCounter::Starts,
MetricCounter::Up,
MetricCounter::SessionBlocks,
MetricCounter::OutgoingConnectionErrors,
MetricCounter::IncomingConnectionErrors,
MetricCounter::IncomingConnections,
MetricCounter::EstablishedConnections,
MetricCounter::IncomingPutRecord,
MetricCounter::IncomingGetRecord,
MetricCounter::EventLoopEvent,
]
}

fn is_allowed(&self, origin: &Origin) -> bool {
Expand Down Expand Up @@ -85,6 +92,29 @@ pub enum MetricValue {
RPCCallDuration(f64),
}

impl MetricValue {
pub fn default_values() -> Vec<MetricValue> {
vec![
MetricValue::BlockHeight(0),
MetricValue::BlockConfidence(0.0),
MetricValue::BlockConfidenceThreshold(0.0),
MetricValue::BlockProcessingDelay(0.0),
MetricValue::DHTReplicationFactor(0),
MetricValue::DHTFetched(0.0),
MetricValue::DHTFetchedPercentage(0.0),
MetricValue::DHTFetchDuration(0.0),
MetricValue::DHTPutDuration(0.0),
MetricValue::DHTPutSuccess(0.0),
MetricValue::DHTConnectedPeers(0),
MetricValue::DHTQueryTimeout(0),
MetricValue::DHTPingLatency(0.0),
MetricValue::RPCFetched(0.0),
MetricValue::RPCFetchDuration(0.0),
MetricValue::RPCCallDuration(0.0),
]
}
}

impl MetricName for MetricValue {
fn name(&self) -> &'static str {
use MetricValue::*;
Expand Down
Loading

0 comments on commit d704074

Please sign in to comment.