Skip to content

Commit

Permalink
Register telemetry gauges on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Jan 16, 2025
1 parent 8c9d78c commit 58b578b
Show file tree
Hide file tree
Showing 17 changed files with 289 additions and 519 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions client/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## 1.12.5

- Update `avail-light-core` to 1.2.0
- Remove `ot_flush_block_interval` from configuration

## [1.12.4](https://github.com/availproject/avail-light/releases/tag/avail-light-client-v1.12.4) - 2024-12-20

- Update `avail-light-core` to 1.1.0
Expand Down
2 changes: 1 addition & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "avail-light-client"
version = "1.12.4"
version = "1.12.5"
build = "../build.rs"
edition = "2021"
description = "Avail network p2p Light Client"
Expand Down
1 change: 0 additions & 1 deletion client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ impl From<&RuntimeConfig> for MaintenanceConfig {
replication_factor: val.libp2p.kademlia.record_replication_factor.get() as u16,
query_timeout: val.libp2p.kademlia.query_timeout,
pruning_interval: val.libp2p.kademlia.store_pruning_interval,
telemetry_flush_interval: val.otel.ot_flush_block_interval,
automatic_server_mode: val.libp2p.kademlia.automatic_server_mode,
total_memory_gb_threshold: val.total_memory_gb_threshold,
num_cpus_threshold: val.num_cpus_threshold,
Expand Down
132 changes: 45 additions & 87 deletions client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use avail_light_core::{
shutdown::Controller,
sync_client::SyncClient,
sync_finality::SyncFinality,
telemetry::{self, otlp::Metrics, MetricCounter, MetricValue},
telemetry::{
self, otlp::Metrics, MetricCounter, MetricValue, ATTRIBUTE_MULTIADDRESS,
ATTRIBUTE_OPERATING_MODE, ATTRIBUTE_RPC_HOST,
},
types::{
load_or_init_suri, Delay, IdentityConfig, MaintenanceConfig, PeerAddress, SecretKey, Uuid,
},
Expand Down Expand Up @@ -294,38 +297,40 @@ async fn run(
lc_sender,
)));

let rpc_host = db
.get(RpcNodeKey)
.map(|node| node.host)
.ok_or_else(|| eyre!("No connected host found"))?;
let operating_mode: Mode = cfg.libp2p.kademlia.operation_mode.into();

// construct Metric Attributes and initialize Metrics
let metric_attributes = vec![
("version".to_string(), version.to_string()),
("role".to_string(), "lightnode".to_string()),
("origin".to_string(), cfg.origin.to_string()),
("peerID".to_string(), peer_id.to_string()),
("avail_address".to_string(), identity_cfg.avail_public_key),
("network".to_string(), Network::name(&cfg.genesis_hash)),
("client_id".to_string(), client_id.to_string()),
("execution_id".to_string(), execution_id.to_string()),
("version", version.to_string()),
("role", "lightnode".to_string()),
("origin", cfg.origin.to_string()),
("peerID", peer_id.to_string()),
("avail_address", identity_cfg.avail_public_key),
("network", Network::name(&cfg.genesis_hash)),
("client_id", client_id.to_string()),
("execution_id", execution_id.to_string()),
(
"client_alias".to_string(),
"client_alias",
cfg.client_alias.clone().unwrap_or("".to_string()),
),
(ATTRIBUTE_MULTIADDRESS, Multiaddr::empty().to_string()),
(ATTRIBUTE_OPERATING_MODE, operating_mode.to_string()),
(ATTRIBUTE_RPC_HOST, rpc_host),
];

let metrics =
telemetry::otlp::initialize(cfg.project_name.clone(), &cfg.origin, cfg.otel.clone())
.wrap_err("Unable to initialize OpenTelemetry service")?;

let rpc_host = db
.get(RpcNodeKey)
.map(|node| node.host)
.ok_or_else(|| eyre!("No connected host found"))?;

let mut state = ClientState::new(
metrics,
cfg.libp2p.kademlia.operation_mode.into(),
rpc_host,
Multiaddr::empty(),
let metrics = telemetry::otlp::initialize(
cfg.project_name.clone(),
&cfg.origin,
cfg.otel.clone(),
metric_attributes,
);
)
.wrap_err("Unable to initialize OpenTelemetry service")?;

let mut state = ClientState::new(metrics);

spawn_in_span(shutdown.with_cancel(async move {
state
Expand Down Expand Up @@ -446,54 +451,17 @@ impl BlockStat {

struct ClientState {
metrics: Metrics,
kad_mode: Mode,
multiaddress: Multiaddr,
rpc_host: String,
metric_attributes: Vec<(String, String)>,
active_blocks: HashMap<u32, BlockStat>,
}

impl ClientState {
fn new(
metrics: Metrics,
kad_mode: Mode,
rpc_host: String,
multiaddress: Multiaddr,
metric_attributes: Vec<(String, String)>,
) -> Self {
fn new(metrics: Metrics) -> Self {
ClientState {
metrics,
kad_mode,
multiaddress,
rpc_host,
metric_attributes,
active_blocks: Default::default(),
}
}

fn update_multiaddress(&mut self, value: Multiaddr) {
self.multiaddress = value;
}

fn update_operating_mode(&mut self, value: Mode) {
self.kad_mode = value;
}

fn update_rpc_host(&mut self, value: String) {
self.rpc_host = value;
}

fn attributes(&self) -> Vec<(String, String)> {
let mut attrs = vec![
("operating_mode".to_string(), self.kad_mode.to_string()),
("multiaddress".to_string(), self.multiaddress.to_string()),
("rpc_host".to_string(), self.rpc_host.to_string()),
];

attrs.extend(self.metric_attributes.clone());
attrs
}

fn get_block_stat(&mut self, block_num: u32) -> Result<&mut BlockStat> {
self.active_blocks
.get_mut(&block_num)
Expand Down Expand Up @@ -576,40 +544,40 @@ impl ClientState {
mut lc_receiver: UnboundedReceiver<LcEvent>,
mut rpc_receiver: broadcast::Receiver<RpcEvent>,
) {
self.metrics.count(MetricCounter::Starts, self.attributes());
self.metrics.count(MetricCounter::Starts);
loop {
select! {
Some(p2p_event) = p2p_receiver.recv() => {
match p2p_event {
P2pEvent::Count => {
self.metrics.count(MetricCounter::EventLoopEvent, self.attributes());
self.metrics.count(MetricCounter::EventLoopEvent);
},
P2pEvent::IncomingGetRecord => {
self.metrics.count(MetricCounter::IncomingGetRecord, self.attributes());
self.metrics.count(MetricCounter::IncomingGetRecord);
},
P2pEvent::IncomingPutRecord => {
self.metrics.count(MetricCounter::IncomingPutRecord, self.attributes());
self.metrics.count(MetricCounter::IncomingPutRecord);
},
P2pEvent::KadModeChange(mode) => {
self.update_operating_mode(mode);
},
self.metrics.set_attribute(ATTRIBUTE_OPERATING_MODE, mode.to_string());
}
P2pEvent::Ping(rtt) => {
self.metrics.record(MetricValue::DHTPingLatency(rtt.as_millis() as f64));
},
P2pEvent::IncomingConnection => {
self.metrics.count(MetricCounter::IncomingConnections, self.attributes());
self.metrics.count(MetricCounter::IncomingConnections);
},
P2pEvent::IncomingConnectionError => {
self.metrics.count(MetricCounter::IncomingConnectionErrors, self.attributes());
self.metrics.count(MetricCounter::IncomingConnectionErrors);
},
P2pEvent::MultiaddressUpdate(address) => {
self.update_multiaddress(address);
self.metrics.set_attribute(ATTRIBUTE_MULTIADDRESS, address.to_string());
},
P2pEvent::EstablishedConnection => {
self.metrics.count(MetricCounter::EstablishedConnections, self.attributes());
self.metrics.count(MetricCounter::EstablishedConnections);
},
P2pEvent::OutgoingConnectionError => {
self.metrics.count(MetricCounter::OutgoingConnectionErrors, self.attributes());
self.metrics.count(MetricCounter::OutgoingConnectionErrors);
},
P2pEvent::PutRecord { block_num, records } => {
self.handle_new_put_record(block_num, records);
Expand All @@ -634,16 +602,6 @@ impl ClientState {
}
Some(maintenance_event) = maintenance_receiver.recv() => {
match maintenance_event {
MaintenanceEvent::FlushMetrics(block_num) => {
if let Err(error) = self.metrics.flush(self.attributes()) {
error!(
block_num,
"Could not handle Flush Maintenance event properly: {error}"
);
} else {
info!(block_num, "Flushing metrics finished");
};
},
MaintenanceEvent::RecordStats {
connected_peers,
block_confidence_treshold,
Expand All @@ -656,7 +614,7 @@ impl ClientState {
self.metrics.record(MetricValue::DHTQueryTimeout(query_timeout));
},
MaintenanceEvent::CountUps => {
self.metrics.count(MetricCounter::Up, self.attributes());
self.metrics.count(MetricCounter::Up);
},
}
}
Expand All @@ -666,7 +624,7 @@ impl ClientState {
self.metrics.record(MetricValue::BlockProcessingDelay(delay));
},
LcEvent::CountSessionBlocks => {
self.metrics.count(MetricCounter::SessionBlocks,self.attributes());
self.metrics.count(MetricCounter::SessionBlocks);
},
LcEvent::RecordBlockHeight(block_num) => {
self.metrics.record(MetricValue::BlockHeight(block_num));
Expand All @@ -692,7 +650,7 @@ impl ClientState {

Ok(rpc_event) = rpc_receiver.recv() => {
if let RpcEvent::ConnectedHost(host) = rpc_event {
self.update_rpc_host(host);
self.metrics.set_attribute(ATTRIBUTE_RPC_HOST, host);
}
}
// break the loop if all channels are closed
Expand Down
4 changes: 4 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 1.2.0

- Fix issue with multiple telemetry gauge callbacks

## [1.1.0](https://github.com/availproject/avail-light/tree/avail-light-core-v1.1.0) - 2024-12-20

- Temporary remove WebRTC support to reduce memory usage
Expand Down
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "avail-light-core"
version = "1.1.0"
version = "1.2.0"
edition = "2021"
description = "Avail Light core driving library"

Expand Down
6 changes: 0 additions & 6 deletions core/src/maintenance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
};

pub enum OutputEvent {
FlushMetrics(u32),
RecordStats {
connected_peers: usize,
block_confidence_treshold: f64,
Expand All @@ -34,11 +33,6 @@ pub async fn process_block(
}
}

if block_number % maintenance_config.telemetry_flush_interval == 0 {
info!(block_number, "Flushing metrics...");
event_sender.send(OutputEvent::FlushMetrics(block_number))?;
}

p2p_client
.shrink_kademlia_map()
.await
Expand Down
46 changes: 40 additions & 6 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ pub mod otlp;

use crate::types::Origin;

pub const ATTRIBUTE_MULTIADDRESS: &str = "multiaddress";
pub const ATTRIBUTE_OPERATING_MODE: &str = "operating_mode";
pub const ATTRIBUTE_RPC_HOST: &str = "rpc_host";

pub trait Value: Send + Clone {
fn is_allowed(&self, origin: &Origin) -> bool;
}
Expand Down Expand Up @@ -43,12 +47,19 @@ impl MetricName for MetricCounter {
}

impl MetricCounter {
fn is_buffered(&self) -> bool {
!matches!(self, MetricCounter::Starts)
}

fn as_last(&self) -> bool {
matches!(self, MetricCounter::Up)
pub fn default_values() -> Vec<MetricCounter> {
vec![
MetricCounter::Starts,
MetricCounter::Up,
MetricCounter::SessionBlocks,
MetricCounter::OutgoingConnectionErrors,
MetricCounter::IncomingConnectionErrors,
MetricCounter::IncomingConnections,
MetricCounter::EstablishedConnections,
MetricCounter::IncomingPutRecord,
MetricCounter::IncomingGetRecord,
MetricCounter::EventLoopEvent,
]
}

fn is_allowed(&self, origin: &Origin) -> bool {
Expand Down Expand Up @@ -85,6 +96,29 @@ pub enum MetricValue {
RPCCallDuration(f64),
}

impl MetricValue {
pub fn default_values() -> Vec<MetricValue> {
vec![
MetricValue::BlockHeight(0),
MetricValue::BlockConfidence(0.0),
MetricValue::BlockConfidenceThreshold(0.0),
MetricValue::BlockProcessingDelay(0.0),
MetricValue::DHTReplicationFactor(0),
MetricValue::DHTFetched(0.0),
MetricValue::DHTFetchedPercentage(0.0),
MetricValue::DHTFetchDuration(0.0),
MetricValue::DHTPutDuration(0.0),
MetricValue::DHTPutSuccess(0.0),
MetricValue::DHTConnectedPeers(0),
MetricValue::DHTQueryTimeout(0),
MetricValue::DHTPingLatency(0.0),
MetricValue::RPCFetched(0.0),
MetricValue::RPCFetchDuration(0.0),
MetricValue::RPCCallDuration(0.0),
]
}
}

impl MetricName for MetricValue {
fn name(&self) -> &'static str {
use MetricValue::*;
Expand Down
Loading

0 comments on commit 58b578b

Please sign in to comment.