Skip to content

Commit

Permalink
refactor-metrics (#76)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaisbaccour authored Sep 9, 2024
1 parent aa09173 commit c001813
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 95 deletions.
18 changes: 4 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lgn-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ lazy-static-include = { version = "3.2.1" }
lgn-auth = { path = "../lgn-auth" }
lgn-messages = { path = "../lgn-messages" }
lgn-provers = { path = "../lgn-provers"}
metrics = { version = "0.22.0" }
metrics = { version = "0.23.0" }
metrics-exporter-prometheus = { version = "0.15.0" }
mimalloc = { version = "0.1.39", default-features = false }
rand = { version = "0.8.5", default-features = false, features = [
Expand Down
28 changes: 15 additions & 13 deletions lgn-worker/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use ::metrics::counter;
use anyhow::*;
use backtrace::Backtrace;
use clap::Parser;
use jwt::{Claims, RegisteredClaims};
use metrics::counter;
use mimalloc::MiMalloc;
use std::fmt::Debug;
use std::net::TcpStream;
Expand All @@ -20,7 +20,6 @@ use crate::checksum::{fetch_checksum_file, verify_directory_checksums};
use crate::config::Config;
use crate::manager::v1::register_v1_provers;
use crate::manager::ProversManager;
use crate::metrics::Metrics;
use ethers::signers::Wallet;
use lgn_auth::jwt::JWTAuth;
use lgn_messages::types::{DownstreamPayload, ReplyType, TaskType, ToProverType, UpstreamPayload};
Expand All @@ -31,7 +30,6 @@ use tungstenite::stream::MaybeTlsStream;
mod checksum;
mod config;
mod manager;
mod metrics;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -121,7 +119,6 @@ fn main() -> anyhow::Result<()> {

fn run(config: &Config) -> Result<()> {
info!("Version: {}", env!("CARGO_PKG_VERSION"));
let metrics = Metrics::new();
let lagrange_wallet = match (
&config.avs.lagr_keystore,
&config.avs.lagr_pwd,
Expand Down Expand Up @@ -176,7 +173,7 @@ fn run(config: &Config) -> Result<()> {

// Connect to the server
let (mut ws_socket, _) = connect(connection_request)?;
metrics.increment_gateway_connection_count();
counter!("zkmr_worker_gateway_connection_count").increment(1);
info!("Connected to the gateway");

info!("Authenticating");
Expand Down Expand Up @@ -219,21 +216,20 @@ fn run(config: &Config) -> Result<()> {
fetch_checksum_file(checksum_url, expected_checksums_file)?;
}

let mut provers_manager = ProversManager::<TaskType, ReplyType>::new(&metrics);
let mut provers_manager = ProversManager::<TaskType, ReplyType>::new();
register_v1_provers(config, &mut provers_manager);

if !config.public_params.skip_checksum {
verify_directory_checksums(&config.public_params.dir, expected_checksums_file)
.context("Failed to verify checksums")?;
}

start_work(&metrics, &mut ws_socket, &mut provers_manager)?;
start_work(&mut ws_socket, &mut provers_manager)?;

Ok(())
}

fn start_work<T, R>(
metrics: &Metrics,
ws_socket: &mut WebSocket<MaybeTlsStream<TcpStream>>,
provers_manager: &mut ProversManager<T, R>,
) -> Result<()>
Expand All @@ -255,7 +251,9 @@ where
match msg {
Message::Text(content) => {
trace!("Received message: {:?}", content);
metrics.increment_websocket_messages_received("text");

counter!("zkmr_worker_websocket_messages_received_total", "message_type" => "text")
.increment(1);

match serde_json::from_str::<DownstreamPayload<T>>(&content)? {
DownstreamPayload::Todo { envelope } => {
Expand All @@ -269,11 +267,12 @@ where
ws_socket.send(Message::Text(serde_json::to_string(
&UpstreamPayload::Done(reply),
)?))?;
metrics.increment_websocket_messages_sent("text");
counter!("zkmr_worker_websocket_messages_sent_total", "message_type" => "text")
.increment(1);
}
Err(e) => {
error!("Error processing task: {:?}", e);
metrics.increment_error_count("proof processing");
counter!("zkmr_worker_error_count", "error_type" => "proof processing").increment(1);
}
}
}
Expand All @@ -282,15 +281,18 @@ where
}
Message::Ping(_) => {
debug!("Received ping or close message");
metrics.increment_websocket_messages_received("ping");

counter!("zkmr_worker_websocket_messages_received_total", "message_type" => "ping")
.increment(1);
}
Message::Close(_) => {
info!("Received close message");
return Ok(());
}
_ => {
error!("unexpected frame: {msg}");
metrics.increment_error_count("unexpected frame");
counter!("zkmr_worker_error_count", "error_type" => "unexpected frame")
.increment(1);
}
}
}
Expand Down
29 changes: 13 additions & 16 deletions lgn-worker/src/manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
pub(crate) mod v0;
pub(crate) mod v1;

use crate::metrics::Metrics;
use anyhow::bail;
use lgn_messages::types::{MessageEnvelope, MessageReplyEnvelope, ProverType, ToProverType};
use lgn_provers::provers::LgnProver;
use metrics::{counter, histogram};
use std::collections::HashMap;
use tracing::debug;

/// Manages provers for different proving task types
pub(crate) struct ProversManager<'a, T: 'a, R>
pub(crate) struct ProversManager<T, R>
where
T: ToProverType,
{
provers: HashMap<ProverType, Box<dyn LgnProver<T, R>>>,
metrics: &'a Metrics,
}

impl<'a, T: 'a, R> ProversManager<'a, T, R>
impl<'a, T: 'a, R> ProversManager<T, R>
where
T: ToProverType,
{
pub(crate) fn new(metrics: &'a Metrics) -> Self {
pub(crate) fn new() -> Self {
Self {
provers: HashMap::default(),
metrics,
}
}

Expand All @@ -50,8 +48,8 @@ where
) -> anyhow::Result<MessageReplyEnvelope<R>> {
let prover_type: ProverType = envelope.inner.to_prover_type();

self.metrics
.increment_tasks_received(prover_type.to_string().as_str());
counter!("zkmr_worker_tasks_received_total", "task_type" => prover_type.to_string())
.increment(1);

match self.provers.get_mut(&prover_type) {
Some(prover) => {
Expand All @@ -61,18 +59,17 @@ where

let result = prover.run(envelope)?;

self.metrics
.increment_tasks_processed(prover_type.to_string().as_str());
self.metrics.observe_task_processing_duration(
prover_type.to_string().as_str(),
start_time.elapsed().as_secs_f64(),
);
counter!("zkmr_worker_tasks_processed_total", "task_type" => prover_type.to_string())
.increment(1);
histogram!("zkmr_worker_task_processing_duration_seconds", "task_type" => prover_type.to_string())
.record(start_time.elapsed().as_secs_f64());

Ok(result)
}
None => {
self.metrics
.increment_tasks_failed(prover_type.to_string().as_str());
counter!("zkmr_worker_tasks_failed_total", "task_type" => prover_type.to_string())
.increment(1);

bail!("No prover found for task type: {:?}", prover_type);
}
}
Expand Down
51 changes: 0 additions & 51 deletions lgn-worker/src/metrics/mod.rs

This file was deleted.

0 comments on commit c001813

Please sign in to comment.