From 0948e1e3ca47e7de7114d27ccb123faa4c881b19 Mon Sep 17 00:00:00 2001 From: AlexandreBrg Date: Mon, 10 Apr 2023 19:35:42 +0200 Subject: [PATCH 1/2] chore(scheduler): migrate logger to tracing Signed-off-by: AlexandreBrg --- Cargo.lock | 5 +++++ scheduler/Cargo.toml | 7 +++++++ scheduler/src/config_parser.rs | 6 ------ scheduler/src/grpc/controller.rs | 2 +- scheduler/src/grpc/mod.rs | 2 +- scheduler/src/lib.rs | 2 +- scheduler/src/main.rs | 6 ++++-- scheduler/src/state_manager/mod.rs | 26 +++++++++++++++++--------- 8 files changed, 36 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2cfbe2e7..85523d23 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2888,6 +2888,7 @@ dependencies = [ "env_logger", "log", "node_metrics", + "once_cell", "proto", "rand", "serde", @@ -2895,6 +2896,10 @@ dependencies = [ "tokio", "tokio-stream", "tonic", + "tracing", + "tracing-futures", + "tracing-subscriber", + "tracing-timing", ] [[package]] diff --git a/scheduler/Cargo.toml b/scheduler/Cargo.toml index 8805d077..bee008c7 100644 --- a/scheduler/Cargo.toml +++ b/scheduler/Cargo.toml @@ -24,6 +24,13 @@ clap = "2.33.3" serde = "1.0.126" serde_json = "1.0.64" +# Instrumentation +tracing = { workspace = true } +tracing-futures = { workspace = true } +tracing-subscriber = { workspace = true } +tracing-timing = { workspace = true } +once_cell = "1.17.1" + [dependencies.tokio] version = "1.6.1" features = ["rt-multi-thread", "macros", "sync", "time"] diff --git a/scheduler/src/config_parser.rs b/scheduler/src/config_parser.rs index 34ee1d78..40515fba 100644 --- a/scheduler/src/config_parser.rs +++ b/scheduler/src/config_parser.rs @@ -39,12 +39,6 @@ impl ConfigParser { .takes_value(true) .default_value("0.0.0.0:4996"), ) - .arg( - Arg::with_name("v") - .short("v") - .multiple(true) - .help("Sets the level of verbosity"), - ) .get_matches(); let workers_ip: SocketAddrV4 = matches diff --git a/scheduler/src/grpc/controller.rs b/scheduler/src/grpc/controller.rs index 50853d56..27d7e18d 100644 --- a/scheduler/src/grpc/controller.rs +++ b/scheduler/src/grpc/controller.rs @@ -1,5 +1,4 @@ use crate::grpc::GRPCService; -use log::error; use proto::common::WorkerStatus; use proto::controller::controller_server::Controller as ControllerClient; use proto::controller::WorkloadScheduling; @@ -8,6 +7,7 @@ use scheduler::{Event, WorkloadRequest}; use tokio::sync::mpsc::channel; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; +use tracing::error; #[tonic::async_trait] impl ControllerClient for GRPCService { diff --git a/scheduler/src/grpc/mod.rs b/scheduler/src/grpc/mod.rs index eca766f8..ba2b39fe 100644 --- a/scheduler/src/grpc/mod.rs +++ b/scheduler/src/grpc/mod.rs @@ -1,11 +1,11 @@ mod controller; mod worker; -use log::error; use scheduler::Event; use scheduler::Send; use tokio::sync::mpsc::Sender; use tonic::{Code, Status}; +use tracing::error; #[derive(Debug, Clone)] pub struct GRPCService { diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs index f10d15b6..9f81678b 100644 --- a/scheduler/src/lib.rs +++ b/scheduler/src/lib.rs @@ -1,5 +1,4 @@ use definition::workload::WorkloadDefinition; -use log::{error, info}; use node_metrics::metrics::Metrics; use proto::common::{InstanceMetric, WorkerMetric, WorkerStatus, WorkloadRequestKind}; use proto::controller::WorkloadScheduling; @@ -10,6 +9,7 @@ use std::net::SocketAddr; use tokio::sync::mpsc::error::SendError; use tokio::sync::mpsc::Sender; use tonic::Status; +use tracing::{error, info}; /// Define the structure of message send through the channel between /// the manager and a worker diff --git a/scheduler/src/main.rs b/scheduler/src/main.rs index ac309653..fef077fc 100644 --- a/scheduler/src/main.rs +++ b/scheduler/src/main.rs @@ -6,7 +6,6 @@ use crate::config_parser::ConfigParser; use crate::grpc::GRPCService; use crate::state_manager::{StateManager, StateManagerEvent}; use env_logger::Env; -use log::{debug, error, info, warn}; use proto::common::worker_status::Status; use proto::common::{ResourceStatus, WorkerMetric as WorkerMetricProto, WorkerStatus}; use proto::controller::controller_server::ControllerServer; @@ -16,6 +15,7 @@ use scheduler::{Controller, SchedulerError, Worker, WorkerRegisterChannelType}; use std::default::Default; use std::net::{SocketAddr, SocketAddrV4}; use std::sync::Arc; +use tracing::{debug, error, info, warn}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::Mutex; @@ -296,7 +296,9 @@ impl Manager { #[tokio::main] async fn main() -> Result<(), Box> { let config = ConfigParser::new()?; - env_logger::Builder::from_env(Env::default().default_filter_or(&config.verbosity_level)).init(); + let subscriber = tracing_subscriber::fmt().compact().finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Failed to initiate the logger subscriber"); info!("Starting up..."); let manager = Manager::run(config.workers_endpoint, config.controller_endpoint); manager.await?; diff --git a/scheduler/src/state_manager/mod.rs b/scheduler/src/state_manager/mod.rs index 6f16401f..3254d86d 100644 --- a/scheduler/src/state_manager/mod.rs +++ b/scheduler/src/state_manager/mod.rs @@ -2,7 +2,6 @@ mod lib; use crate::state_manager::lib::int_to_resource_status; use definition::workload::WorkloadDefinition; -use log::{debug, error, info}; use proto::common::{InstanceMetric, ResourceStatus, WorkerMetric, WorkloadRequestKind}; use proto::worker::InstanceScheduling; use rand::seq::IteratorRandom; @@ -12,6 +11,7 @@ use std::fmt; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Mutex; +use tracing::{debug, error, info}; #[derive(Debug)] pub enum StateManagerEvent { @@ -291,6 +291,10 @@ impl StateManager { } } + #[tracing::instrument( + skip(self), + fields(workload_id = %request.workload_id, instance_id = %request.instance_id), + )] fn action_create_workload(&mut self, request: WorkloadRequest) -> Result<(), SchedulerError> { let instance = WorkloadInstance::new( request.instance_id.clone(), @@ -320,7 +324,10 @@ impl StateManager { status: ResourceStatus::Pending, }; - info!("[process_schedule_request] Received scheduling request for {}, with {:#?} replicas", workload.id, workload.definition.replicas); + info!( + "[process_schedule_request] Received scheduling request with {:#?} replicas", + workload.definition.replicas + ); self.state.insert(workload.id.clone(), workload); } @@ -365,14 +372,15 @@ impl StateManager { Ok(()) } + #[tracing::instrument( + skip(self), + fields(workload_id = %request.workload_id, instance_id = %request.instance_id), + )] fn action_destroy_instance(&mut self, request: WorkloadRequest) -> Result<(), SchedulerError> { let workload = self.state.get_mut(&request.workload_id); if workload.is_none() { - error!( - "Requested workload {} hasn't any instance available", - request.workload_id - ); + error!("Requested workload hasn't any instance available"); return Err(SchedulerError::WorkloadNotExisting(request.workload_id)); } @@ -385,8 +393,8 @@ impl StateManager { let def_replicas = &workload.definition.replicas.unwrap_or(1); info!( - "[process_schedule_request] Received destroy request for {}, with {:#?} replicas", - workload.id, def_replicas + "[process_schedule_request] Received destroy request with {:#?} replicas", + workload.definition.replicas ); let instance = workload.instances.get_mut(&request.instance_id); @@ -405,7 +413,7 @@ impl StateManager { if workload.replicas > *def_replicas { self.action_minus_replicas(&request.workload_id, def_replicas)?; } else { - info!("Workload {} is getting unscheduled", workload.id); + info!("Workload is getting unscheduled"); workload.status = ResourceStatus::Destroying; // Keep workload replicas a 1 as we are going to 0 it will be deleted automatically // by the state manager From ebf8f7adaed755b100bef481efa021d0b33b308e Mon Sep 17 00:00:00 2001 From: AlexandreBrg Date: Mon, 10 Apr 2023 19:38:58 +0200 Subject: [PATCH 2/2] chore(riklet): use conventional logger methods Signed-off-by: AlexandreBrg --- riklet/src/core.rs | 4 ++-- riklet/src/main.rs | 4 +--- riklet/src/utils.rs | 19 ++++--------------- scheduler/src/main.rs | 4 ++-- 4 files changed, 9 insertions(+), 22 deletions(-) diff --git a/riklet/src/core.rs b/riklet/src/core.rs index 46ec739e..fa0ab475 100644 --- a/riklet/src/core.rs +++ b/riklet/src/core.rs @@ -15,7 +15,7 @@ use tracing_subscriber::field::debug; use thiserror::Error; use tonic::{transport::Channel, Request, Streaming}; -use tracing::{debug, error, event, Level}; +use tracing::{debug, error, event, info, Level}; const METRICS_UPDATER_INTERVAL: u64 = 15 * 1000; @@ -148,8 +148,8 @@ impl Riklet { } pub async fn run(&mut self) -> Result<()> { - event!(Level::INFO, "Riklet is running."); self.start_metrics_updater(); + info!("Riklet is running"); while let Some(workload) = self .stream diff --git a/riklet/src/main.rs b/riklet/src/main.rs index 37c5ad31..35d1a007 100644 --- a/riklet/src/main.rs +++ b/riklet/src/main.rs @@ -17,10 +17,8 @@ use tracing::error; #[tokio::main] async fn main() -> Result<()> { - init_logger(Some("debug".to_string()))?; + init_logger()?; - // run a function to test #[instrument] macro - // test_instrument(); // If the process doesn't have root privileges, exit and display error. if !nix::unistd::Uid::effective().is_root() { error!("Riklet must run with root privileges."); diff --git a/riklet/src/utils.rs b/riklet/src/utils.rs index 7b32d93b..45073378 100644 --- a/riklet/src/utils.rs +++ b/riklet/src/utils.rs @@ -1,5 +1,4 @@ use anyhow::Result; -use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Registry}; pub fn banner() { println!( @@ -14,19 +13,9 @@ pub fn banner() { ); } -pub fn init_logger(log_level: Option) -> Result<()> { - let logger = tracing_subscriber::fmt::layer().json(); - // Try to get the log level from the environment variable `RUST_LOG`. - // If the variable is not defined, then use the cli argument or the default value 'info' if neither is defined - let env_filter = EnvFilter::try_from_default_env() - .or_else(|_| { - let level = log_level.unwrap_or_else(|| "info".to_string()); - EnvFilter::try_new(level.as_str()) - })? - .add_directive("h2=OFF".parse().unwrap()); // disable all events from the `h2` crate; - - let collector = Registry::default().with(logger).with(env_filter); - tracing::subscriber::set_global_default(collector)?; - +pub fn init_logger() -> Result<()> { + let subscriber = tracing_subscriber::fmt().compact().finish(); + tracing::subscriber::set_global_default(subscriber) + .expect("Failed to initiate the logger subscriber"); Ok(()) } diff --git a/scheduler/src/main.rs b/scheduler/src/main.rs index fef077fc..09476b68 100644 --- a/scheduler/src/main.rs +++ b/scheduler/src/main.rs @@ -5,14 +5,14 @@ mod state_manager; use crate::config_parser::ConfigParser; use crate::grpc::GRPCService; use crate::state_manager::{StateManager, StateManagerEvent}; -use env_logger::Env; + use proto::common::worker_status::Status; use proto::common::{ResourceStatus, WorkerMetric as WorkerMetricProto, WorkerStatus}; use proto::controller::controller_server::ControllerServer; use proto::worker::worker_server::WorkerServer; use scheduler::Event; use scheduler::{Controller, SchedulerError, Worker, WorkerRegisterChannelType}; -use std::default::Default; + use std::net::{SocketAddr, SocketAddrV4}; use std::sync::Arc; use tracing::{debug, error, info, warn};