From c001813c06a69946ea29d13cb91b0b1c8e510c47 Mon Sep 17 00:00:00 2001 From: Kais baccour <31540265+kaisbaccour@users.noreply.github.com> Date: Mon, 9 Sep 2024 15:28:26 +0200 Subject: [PATCH] refactor-metrics (#76) --- Cargo.lock | 18 +++---------- lgn-worker/Cargo.toml | 2 +- lgn-worker/src/main.rs | 28 ++++++++++--------- lgn-worker/src/manager/mod.rs | 29 +++++++++----------- lgn-worker/src/metrics/mod.rs | 51 ----------------------------------- 5 files changed, 33 insertions(+), 95 deletions(-) delete mode 100644 lgn-worker/src/metrics/mod.rs diff --git a/Cargo.lock b/Cargo.lock index fc6aa90..967dbe5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3851,7 +3851,7 @@ dependencies = [ "groth16_framework 0.1.0 (git+https://github.com/Lagrange-Labs/Euclid-database.git)", "hex", "lgn-messages", - "metrics 0.23.0", + "metrics", "mimalloc", "mp2_common", "mp2_v1", @@ -3893,7 +3893,7 @@ dependencies = [ "lgn-auth", "lgn-messages", "lgn-provers", - "metrics 0.22.3", + "metrics", "metrics-exporter-prometheus", "mimalloc", "rand", @@ -4043,16 +4043,6 @@ version = "2.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" -[[package]] -name = "metrics" -version = "0.22.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2be3cbd384d4e955b231c895ce10685e3d8260c5ccffae898c96c723b0772835" -dependencies = [ - "ahash", - "portable-atomic", -] - [[package]] name = "metrics" version = "0.23.0" @@ -4076,7 +4066,7 @@ dependencies = [ "hyper-util", "indexmap 2.5.0", "ipnet", - "metrics 0.23.0", + "metrics", "metrics-util", "quanta", "thiserror", @@ -4093,7 +4083,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.14.5", - "metrics 0.23.0", + "metrics", "num_cpus", "quanta", "sketches-ddsketch", diff --git a/lgn-worker/Cargo.toml b/lgn-worker/Cargo.toml index b7c2ed6..03c9f7a 100644 --- a/lgn-worker/Cargo.toml +++ b/lgn-worker/Cargo.toml @@ -43,7 +43,7 @@ lazy-static-include = { version = "3.2.1" } lgn-auth = { path = "../lgn-auth" } lgn-messages = { path = "../lgn-messages" } lgn-provers = { path = "../lgn-provers"} -metrics = { version = "0.22.0" } +metrics = { version = "0.23.0" } metrics-exporter-prometheus = { version = "0.15.0" } mimalloc = { version = "0.1.39", default-features = false } rand = { version = "0.8.5", default-features = false, features = [ diff --git a/lgn-worker/src/main.rs b/lgn-worker/src/main.rs index 122c8d5..f64316b 100644 --- a/lgn-worker/src/main.rs +++ b/lgn-worker/src/main.rs @@ -1,8 +1,8 @@ -use ::metrics::counter; use anyhow::*; use backtrace::Backtrace; use clap::Parser; use jwt::{Claims, RegisteredClaims}; +use metrics::counter; use mimalloc::MiMalloc; use std::fmt::Debug; use std::net::TcpStream; @@ -20,7 +20,6 @@ use crate::checksum::{fetch_checksum_file, verify_directory_checksums}; use crate::config::Config; use crate::manager::v1::register_v1_provers; use crate::manager::ProversManager; -use crate::metrics::Metrics; use ethers::signers::Wallet; use lgn_auth::jwt::JWTAuth; use lgn_messages::types::{DownstreamPayload, ReplyType, TaskType, ToProverType, UpstreamPayload}; @@ -31,7 +30,6 @@ use tungstenite::stream::MaybeTlsStream; mod checksum; mod config; mod manager; -mod metrics; #[global_allocator] static GLOBAL: MiMalloc = MiMalloc; @@ -121,7 +119,6 @@ fn main() -> anyhow::Result<()> { fn run(config: &Config) -> Result<()> { info!("Version: {}", env!("CARGO_PKG_VERSION")); - let metrics = Metrics::new(); let lagrange_wallet = match ( &config.avs.lagr_keystore, &config.avs.lagr_pwd, @@ -176,7 +173,7 @@ fn run(config: &Config) -> Result<()> { // Connect to the server let (mut ws_socket, _) = connect(connection_request)?; - metrics.increment_gateway_connection_count(); + counter!("zkmr_worker_gateway_connection_count").increment(1); info!("Connected to the gateway"); info!("Authenticating"); @@ -219,7 +216,7 @@ fn run(config: &Config) -> Result<()> { fetch_checksum_file(checksum_url, expected_checksums_file)?; } - let mut provers_manager = ProversManager::::new(&metrics); + let mut provers_manager = ProversManager::::new(); register_v1_provers(config, &mut provers_manager); if !config.public_params.skip_checksum { @@ -227,13 +224,12 @@ fn run(config: &Config) -> Result<()> { .context("Failed to verify checksums")?; } - start_work(&metrics, &mut ws_socket, &mut provers_manager)?; + start_work(&mut ws_socket, &mut provers_manager)?; Ok(()) } fn start_work( - metrics: &Metrics, ws_socket: &mut WebSocket>, provers_manager: &mut ProversManager, ) -> Result<()> @@ -255,7 +251,9 @@ where match msg { Message::Text(content) => { trace!("Received message: {:?}", content); - metrics.increment_websocket_messages_received("text"); + + counter!("zkmr_worker_websocket_messages_received_total", "message_type" => "text") + .increment(1); match serde_json::from_str::>(&content)? { DownstreamPayload::Todo { envelope } => { @@ -269,11 +267,12 @@ where ws_socket.send(Message::Text(serde_json::to_string( &UpstreamPayload::Done(reply), )?))?; - metrics.increment_websocket_messages_sent("text"); + counter!("zkmr_worker_websocket_messages_sent_total", "message_type" => "text") + .increment(1); } Err(e) => { error!("Error processing task: {:?}", e); - metrics.increment_error_count("proof processing"); + counter!("zkmr_worker_error_count", "error_type" => "proof processing").increment(1); } } } @@ -282,7 +281,9 @@ where } Message::Ping(_) => { debug!("Received ping or close message"); - metrics.increment_websocket_messages_received("ping"); + + counter!("zkmr_worker_websocket_messages_received_total", "message_type" => "ping") + .increment(1); } Message::Close(_) => { info!("Received close message"); @@ -290,7 +291,8 @@ where } _ => { error!("unexpected frame: {msg}"); - metrics.increment_error_count("unexpected frame"); + counter!("zkmr_worker_error_count", "error_type" => "unexpected frame") + .increment(1); } } } diff --git a/lgn-worker/src/manager/mod.rs b/lgn-worker/src/manager/mod.rs index ca455c7..f183895 100644 --- a/lgn-worker/src/manager/mod.rs +++ b/lgn-worker/src/manager/mod.rs @@ -1,30 +1,28 @@ pub(crate) mod v0; pub(crate) mod v1; -use crate::metrics::Metrics; use anyhow::bail; use lgn_messages::types::{MessageEnvelope, MessageReplyEnvelope, ProverType, ToProverType}; use lgn_provers::provers::LgnProver; +use metrics::{counter, histogram}; use std::collections::HashMap; use tracing::debug; /// Manages provers for different proving task types -pub(crate) struct ProversManager<'a, T: 'a, R> +pub(crate) struct ProversManager where T: ToProverType, { provers: HashMap>>, - metrics: &'a Metrics, } -impl<'a, T: 'a, R> ProversManager<'a, T, R> +impl<'a, T: 'a, R> ProversManager where T: ToProverType, { - pub(crate) fn new(metrics: &'a Metrics) -> Self { + pub(crate) fn new() -> Self { Self { provers: HashMap::default(), - metrics, } } @@ -50,8 +48,8 @@ where ) -> anyhow::Result> { let prover_type: ProverType = envelope.inner.to_prover_type(); - self.metrics - .increment_tasks_received(prover_type.to_string().as_str()); + counter!("zkmr_worker_tasks_received_total", "task_type" => prover_type.to_string()) + .increment(1); match self.provers.get_mut(&prover_type) { Some(prover) => { @@ -61,18 +59,17 @@ where let result = prover.run(envelope)?; - self.metrics - .increment_tasks_processed(prover_type.to_string().as_str()); - self.metrics.observe_task_processing_duration( - prover_type.to_string().as_str(), - start_time.elapsed().as_secs_f64(), - ); + counter!("zkmr_worker_tasks_processed_total", "task_type" => prover_type.to_string()) + .increment(1); + histogram!("zkmr_worker_task_processing_duration_seconds", "task_type" => prover_type.to_string()) + .record(start_time.elapsed().as_secs_f64()); Ok(result) } None => { - self.metrics - .increment_tasks_failed(prover_type.to_string().as_str()); + counter!("zkmr_worker_tasks_failed_total", "task_type" => prover_type.to_string()) + .increment(1); + bail!("No prover found for task type: {:?}", prover_type); } } diff --git a/lgn-worker/src/metrics/mod.rs b/lgn-worker/src/metrics/mod.rs deleted file mode 100644 index ef102dd..0000000 --- a/lgn-worker/src/metrics/mod.rs +++ /dev/null @@ -1,51 +0,0 @@ -use metrics::{counter, gauge, histogram, SharedString}; - -pub struct Metrics {} - -impl Metrics { - pub fn new() -> Self { - Self {} - } - - pub fn increment_tasks_received(&self, task_type: &str) { - let task_type = SharedString::from(String::from(task_type)); - counter!("zkmr_worker_tasks_received_total", "task_type" => task_type).increment(1); - } - - pub fn increment_tasks_processed(&self, task_type: &str) { - let task_type = SharedString::from(String::from(task_type)); - counter!("zkmr_worker_tasks_processed_total", "task_type" => task_type).increment(1); - } - - pub fn increment_tasks_failed(&self, task_type: &str) { - let task_type = SharedString::from(String::from(task_type)); - counter!("zkmr_worker_tasks_failed_total", "task_type" => task_type).increment(1); - } - - pub fn observe_task_processing_duration(&self, task_type: &str, duration: f64) { - let task_type = SharedString::from(String::from(task_type)); - histogram!("zkmr_worker_task_processing_duration_seconds", "task_type" => task_type) - .record(duration); - } - - pub fn increment_websocket_messages_received(&self, message_type: &str) { - let message_type = SharedString::from(String::from(message_type)); - counter!("zkmr_worker_websocket_messages_received_total", "message_type" => message_type) - .increment(1); - } - - pub fn increment_websocket_messages_sent(&self, message_type: &str) { - let message_type = SharedString::from(String::from(message_type)); - counter!("zkmr_worker_websocket_messages_sent_total", "message_type" => message_type) - .increment(1); - } - - pub fn increment_error_count(&self, error_type: &str) { - let error_type = SharedString::from(String::from(error_type)); - counter!("zkmr_worker_error_count", "error_type" => error_type).increment(1); - } - - pub fn increment_gateway_connection_count(&self) { - gauge!("zkmr_worker_gateway_connection_count").increment(1); - } -}