Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: migrate scheduler logger to tracing #85

Merged
merged 2 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions riklet/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use tracing_subscriber::field::debug;

use thiserror::Error;
use tonic::{transport::Channel, Request, Streaming};
use tracing::{debug, error, event, Level};
use tracing::{debug, error, event, info, Level};

const METRICS_UPDATER_INTERVAL: u64 = 15 * 1000;

Expand Down Expand Up @@ -148,8 +148,8 @@ impl Riklet {
}

pub async fn run(&mut self) -> Result<()> {
event!(Level::INFO, "Riklet is running.");
self.start_metrics_updater();
info!("Riklet is running");

while let Some(workload) = self
.stream
Expand Down
4 changes: 1 addition & 3 deletions riklet/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ use tracing::error;

#[tokio::main]
async fn main() -> Result<()> {
init_logger(Some("debug".to_string()))?;
init_logger()?;

// run a function to test #[instrument] macro
// test_instrument();
// If the process doesn't have root privileges, exit and display error.
if !nix::unistd::Uid::effective().is_root() {
error!("Riklet must run with root privileges.");
Expand Down
19 changes: 4 additions & 15 deletions riklet/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Result;
use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Registry};

pub fn banner() {
println!(
Expand All @@ -14,19 +13,9 @@ pub fn banner() {
);
}

pub fn init_logger(log_level: Option<String>) -> Result<()> {
let logger = tracing_subscriber::fmt::layer().json();
// Try to get the log level from the environment variable `RUST_LOG`.
// If the variable is not defined, then use the cli argument or the default value 'info' if neither is defined
let env_filter = EnvFilter::try_from_default_env()
.or_else(|_| {
let level = log_level.unwrap_or_else(|| "info".to_string());
EnvFilter::try_new(level.as_str())
})?
.add_directive("h2=OFF".parse().unwrap()); // disable all events from the `h2` crate;

let collector = Registry::default().with(logger).with(env_filter);
tracing::subscriber::set_global_default(collector)?;

pub fn init_logger() -> Result<()> {
let subscriber = tracing_subscriber::fmt().compact().finish();
tracing::subscriber::set_global_default(subscriber)
.expect("Failed to initiate the logger subscriber");
Ok(())
}
7 changes: 7 additions & 0 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ clap = "2.33.3"
serde = "1.0.126"
serde_json = "1.0.64"

# Instrumentation
tracing = { workspace = true }
tracing-futures = { workspace = true }
tracing-subscriber = { workspace = true }
tracing-timing = { workspace = true }
once_cell = "1.17.1"

[dependencies.tokio]
version = "1.6.1"
features = ["rt-multi-thread", "macros", "sync", "time"]
Expand Down
6 changes: 0 additions & 6 deletions scheduler/src/config_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,6 @@ impl ConfigParser {
.takes_value(true)
.default_value("0.0.0.0:4996"),
)
.arg(
Arg::with_name("v")
.short("v")
.multiple(true)
.help("Sets the level of verbosity"),
)
.get_matches();

let workers_ip: SocketAddrV4 = matches
Expand Down
2 changes: 1 addition & 1 deletion scheduler/src/grpc/controller.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::grpc::GRPCService;
use log::error;
use proto::common::WorkerStatus;
use proto::controller::controller_server::Controller as ControllerClient;
use proto::controller::WorkloadScheduling;
Expand All @@ -8,6 +7,7 @@ use scheduler::{Event, WorkloadRequest};
use tokio::sync::mpsc::channel;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tracing::error;

#[tonic::async_trait]
impl ControllerClient for GRPCService {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
mod controller;
mod worker;

use log::error;
use scheduler::Event;
use scheduler::Send;
use tokio::sync::mpsc::Sender;
use tonic::{Code, Status};
use tracing::error;

#[derive(Debug, Clone)]
pub struct GRPCService {
Expand Down
2 changes: 1 addition & 1 deletion scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use definition::workload::WorkloadDefinition;
use log::{error, info};
use node_metrics::metrics::Metrics;
use proto::common::{InstanceMetric, WorkerMetric, WorkerStatus, WorkloadRequestKind};
use proto::controller::WorkloadScheduling;
Expand All @@ -10,6 +9,7 @@ use std::net::SocketAddr;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tonic::Status;
use tracing::{error, info};

/// Define the structure of message send through the channel between
/// the manager and a worker
Expand Down
10 changes: 6 additions & 4 deletions scheduler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@ mod state_manager;
use crate::config_parser::ConfigParser;
use crate::grpc::GRPCService;
use crate::state_manager::{StateManager, StateManagerEvent};
use env_logger::Env;
use log::{debug, error, info, warn};

use proto::common::worker_status::Status;
use proto::common::{ResourceStatus, WorkerMetric as WorkerMetricProto, WorkerStatus};
use proto::controller::controller_server::ControllerServer;
use proto::worker::worker_server::WorkerServer;
use scheduler::Event;
use scheduler::{Controller, SchedulerError, Worker, WorkerRegisterChannelType};
use std::default::Default;

use std::net::{SocketAddr, SocketAddrV4};
use std::sync::Arc;
use tracing::{debug, error, info, warn};

use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -296,7 +296,9 @@ impl Manager {
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = ConfigParser::new()?;
env_logger::Builder::from_env(Env::default().default_filter_or(&config.verbosity_level)).init();
let subscriber = tracing_subscriber::fmt().compact().finish();
alexandrebrg marked this conversation as resolved.
Show resolved Hide resolved
tracing::subscriber::set_global_default(subscriber)
.expect("Failed to initiate the logger subscriber");
info!("Starting up...");
let manager = Manager::run(config.workers_endpoint, config.controller_endpoint);
manager.await?;
Expand Down
26 changes: 17 additions & 9 deletions scheduler/src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ mod lib;

use crate::state_manager::lib::int_to_resource_status;
use definition::workload::WorkloadDefinition;
use log::{debug, error, info};
use proto::common::{InstanceMetric, ResourceStatus, WorkerMetric, WorkloadRequestKind};
use proto::worker::InstanceScheduling;
use rand::seq::IteratorRandom;
Expand All @@ -12,6 +11,7 @@ use std::fmt;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Mutex;
use tracing::{debug, error, info};

#[derive(Debug)]
pub enum StateManagerEvent {
Expand Down Expand Up @@ -291,6 +291,10 @@ impl StateManager {
}
}

#[tracing::instrument(
skip(self),
fields(workload_id = %request.workload_id, instance_id = %request.instance_id),
)]
fn action_create_workload(&mut self, request: WorkloadRequest) -> Result<(), SchedulerError> {
let instance = WorkloadInstance::new(
request.instance_id.clone(),
Expand Down Expand Up @@ -320,7 +324,10 @@ impl StateManager {
status: ResourceStatus::Pending,
};

info!("[process_schedule_request] Received scheduling request for {}, with {:#?} replicas", workload.id, workload.definition.replicas);
info!(
"[process_schedule_request] Received scheduling request with {:#?} replicas",
workload.definition.replicas
);

self.state.insert(workload.id.clone(), workload);
}
Expand Down Expand Up @@ -365,14 +372,15 @@ impl StateManager {
Ok(())
}

#[tracing::instrument(
skip(self),
fields(workload_id = %request.workload_id, instance_id = %request.instance_id),
)]
fn action_destroy_instance(&mut self, request: WorkloadRequest) -> Result<(), SchedulerError> {
let workload = self.state.get_mut(&request.workload_id);

if workload.is_none() {
error!(
"Requested workload {} hasn't any instance available",
request.workload_id
);
error!("Requested workload hasn't any instance available");
return Err(SchedulerError::WorkloadNotExisting(request.workload_id));
}

Expand All @@ -385,8 +393,8 @@ impl StateManager {
let def_replicas = &workload.definition.replicas.unwrap_or(1);

info!(
"[process_schedule_request] Received destroy request for {}, with {:#?} replicas",
workload.id, def_replicas
"[process_schedule_request] Received destroy request with {:#?} replicas",
workload.definition.replicas
);

let instance = workload.instances.get_mut(&request.instance_id);
Expand All @@ -405,7 +413,7 @@ impl StateManager {
if workload.replicas > *def_replicas {
self.action_minus_replicas(&request.workload_id, def_replicas)?;
} else {
info!("Workload {} is getting unscheduled", workload.id);
info!("Workload is getting unscheduled");
workload.status = ResourceStatus::Destroying;
// Keep workload replicas a 1 as we are going to 0 it will be deleted automatically
// by the state manager
Expand Down