From 9a34c6823694e6d806201c9246856372f6204060 Mon Sep 17 00:00:00 2001 From: Malo Polese Date: Tue, 21 Mar 2023 00:29:43 +0100 Subject: [PATCH] clean code --- proto/src/lib.rs | 48 +- riklet/crates/oci/src/image_manager.rs | 2 +- riklet/src/cli/config.rs | 4 +- riklet/src/core.rs | 77 +-- riklet/src/core.rs.backup | 496 ------------------ riklet/src/iptables/mod.rs | 2 +- riklet/src/iptables/platform_linux.rs | 10 +- riklet/src/iptables/rule.rs | 9 +- riklet/src/main.rs | 29 +- riklet/src/runtime/function_runtime.rs | 50 +- riklet/src/runtime/mod.rs | 59 +-- .../src/runtime/network/function_network.rs | 130 +++++ riklet/src/runtime/network/mod.rs | 172 +----- riklet/src/runtime/network/pod_network.rs | 24 + riklet/src/runtime/pod_runtime.rs | 29 +- riklet/src/structs.rs | 4 +- riklet/src/utils.rs | 20 + 17 files changed, 322 insertions(+), 843 deletions(-) delete mode 100644 riklet/src/core.rs.backup create mode 100644 riklet/src/runtime/network/function_network.rs create mode 100644 riklet/src/runtime/network/pod_network.rs diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 13595923..36a382d3 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -1,8 +1,8 @@ use std::fmt::Display; -use common::{ResourceStatus, WorkloadRequestKind}; +use common::{worker_status::Status, InstanceMetric, ResourceStatus, WorkloadRequestKind}; use serde::{Deserialize, Serialize}; - +use std::ops::Deref; pub mod common { tonic::include_proto!("common"); } @@ -77,9 +77,9 @@ impl From for InstanceStatus { } } -impl Into for InstanceStatus { - fn into(self) -> i32 { - match self { +impl From for i32 { + fn from(value: InstanceStatus) -> Self { + match value { InstanceStatus::Unknown(_) => 0, InstanceStatus::Pending => 1, InstanceStatus::Running => 2, @@ -107,3 +107,41 @@ impl From for InstanceStatus { } pub extern crate protobuf; + +pub enum WorkloadAction { + CREATE, + DELETE, +} + +pub struct WorkerStatus(pub common::WorkerStatus); +impl WorkerStatus { + pub fn new(identifier: String, instance_id: String, status: InstanceStatus) -> Self { + Self(common::WorkerStatus { + identifier, + host_address: None, + status: Some(Status::Instance(InstanceMetric { + instance_id, + status: status.into(), + metrics: "".to_string(), + })), + }) + } +} + +impl Deref for WorkerStatus { + type Target = common::WorkerStatus; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl From for WorkloadAction { + fn from(value: i32) -> Self { + match value { + 0 => WorkloadAction::CREATE, + 1 => WorkloadAction::DELETE, + _ => panic!("Unknown workload action"), + } + } +} diff --git a/riklet/crates/oci/src/image_manager.rs b/riklet/crates/oci/src/image_manager.rs index 9db7b0f6..110c1e92 100644 --- a/riklet/crates/oci/src/image_manager.rs +++ b/riklet/crates/oci/src/image_manager.rs @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize}; use std::path::PathBuf; use tracing::{event, Level}; -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct ImageManagerConfiguration { pub oci_manager: UmociConfiguration, pub image_puller: SkopeoConfiguration, diff --git a/riklet/src/cli/config.rs b/riklet/src/cli/config.rs index 86a81f9d..343e41f4 100644 --- a/riklet/src/cli/config.rs +++ b/riklet/src/cli/config.rs @@ -32,7 +32,7 @@ pub enum ConfigurationError { type Result = std::result::Result; -#[derive(Deserialize, Debug, Serialize, PartialEq, Clone)] +#[derive(Deserialize, Debug, Serialize, PartialEq, Eq, Clone)] pub struct Configuration { pub master_ip: String, pub log_level: String, @@ -68,7 +68,7 @@ impl Configuration { ); let contents = std::fs::read(path).map_err(ConfigurationError::Load)?; - Ok(toml::from_slice(&contents).map_err(ConfigurationError::Parse)?) + toml::from_slice(&contents).map_err(ConfigurationError::Parse) } /// Load the configuration file diff --git a/riklet/src/core.rs b/riklet/src/core.rs index 84a94dc5..3a304b5a 100644 --- a/riklet/src/core.rs +++ b/riklet/src/core.rs @@ -1,26 +1,18 @@ use crate::cli::config::{Configuration, ConfigurationError}; use crate::emitters::metrics_emitter::MetricsEmitter; -use crate::iptables::rule::Rule; -use crate::iptables::{Chain, Iptables, MutateIptables, Table}; -use crate::network::net::{Net, NetworkInterfaceConfig}; -use crate::runtime::{DynamicRuntimeManager, Runtime, RuntimeConfigurator, RuntimeManagerError}; -use crate::structs::{Container, WorkloadDefinition}; +use crate::runtime::{DynamicRuntimeManager, Runtime, RuntimeConfigurator, RuntimeError}; +use crate::structs::WorkloadDefinition; use crate::traits::EventEmitter; use crate::utils::banner; -use proto::common::{InstanceMetric, WorkerRegistration, WorkerStatus}; +use proto::common::WorkerRegistration; use proto::worker::worker_client::WorkerClient; use proto::worker::InstanceScheduling; -use proto::InstanceStatus; +use proto::{InstanceStatus, WorkerStatus, WorkloadAction}; use std::collections::HashMap; -use std::net::Ipv4Addr; -use std::path::{Path, PathBuf}; -use std::ops::Deref; -use std::time::Duration; -use std::{fs, io, thread}; use thiserror::Error; use tonic::{transport::Channel, Request, Streaming}; -use tracing::{debug, event, Level}; +use tracing::{event, Level}; const METRICS_UPDATER_INTERVAL: u64 = 15 * 1000; @@ -39,58 +31,19 @@ pub enum RikletError { ConnectionError(tonic::transport::Error), #[error("Runtime error: {0}")] - RuntimeManagerError(RuntimeManagerError), + RuntimeManagerError(RuntimeError), } type Result = std::result::Result; -enum WorkloadAction { - CREATE, - DELETE, -} - -struct RikletWorkerStatus(WorkerStatus); -impl RikletWorkerStatus { - fn new(identifier: String, instance_id: String, status: InstanceStatus) -> Self { - Self(WorkerStatus { - identifier, - host_address: None, - status: Some(proto::common::worker_status::Status::Instance( - InstanceMetric { - instance_id, - status: status.into(), - metrics: "".to_string(), - }, - )), - }) - } -} - -impl Deref for RikletWorkerStatus { - type Target = WorkerStatus; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl Into for i32 { - fn into(self) -> WorkloadAction { - match self { - 0 => WorkloadAction::CREATE, - 1 => WorkloadAction::DELETE, - _ => panic!("Unknown workload action"), - } - } -} - #[derive(Debug)] pub struct Riklet { config: Configuration, hostname: String, client: WorkerClient, stream: Streaming, + // Can be pod or function runtimes + // The key is the instance id runtimes: HashMap>, - // function_config: FnConfiguration, } impl Riklet { @@ -105,7 +58,7 @@ impl Riklet { println!("workload action: {}", &workload.action); - Ok(match &workload.action.into() { + match &workload.action.into() { WorkloadAction::CREATE => { self.create_workload(workload, dynamic_runtime_manager) .await? @@ -114,7 +67,9 @@ impl Riklet { self.delete_workload(workload, dynamic_runtime_manager) .await? } - }) + }; + + Ok(()) } async fn create_workload( @@ -143,8 +98,6 @@ impl Riklet { ) -> Result<()> { event!(Level::DEBUG, "Destroying workload"); let instance_id: &String = &workload.instance_id; - // Destroy the runtime - // TODO self.runtimes.remove(instance_id); @@ -158,8 +111,7 @@ impl Riklet { async fn send_status(&self, status: InstanceStatus, instance_id: &str) -> Result<()> { event!(Level::DEBUG, "Sending status : {}", status); - let status = - RikletWorkerStatus::new(self.hostname.clone(), instance_id.to_string(), status); + let status = WorkerStatus::new(self.hostname.clone(), instance_id.to_string(), status); MetricsEmitter::emit_event(self.client.clone(), vec![status.0]) .await @@ -201,7 +153,6 @@ impl Riklet { let hostname = gethostname::gethostname().into_string().unwrap(); let config = Configuration::load().map_err(RikletError::ConfigurationError)?; - // let function_config = FnConfiguration::load(); let mut client = WorkerClient::connect(config.master_ip.clone()) .await @@ -214,8 +165,6 @@ impl Riklet { }); let stream = client.register(request).await.unwrap().into_inner(); - // TODO Network - Ok(Self { hostname, client, diff --git a/riklet/src/core.rs.backup b/riklet/src/core.rs.backup deleted file mode 100644 index a0feb176..00000000 --- a/riklet/src/core.rs.backup +++ /dev/null @@ -1,496 +0,0 @@ -use crate::cli::config::Configuration; -use crate::cli::function_config::FnConfiguration; -use crate::emitters::metrics_emitter::MetricsEmitter; -use crate::structs::{Container, WorkloadDefinition}; -use crate::traits::EventEmitter; -use cri::console::ConsoleSocket; -use cri::container::{CreateArgs, DeleteArgs, Runc}; -use curl::easy::Easy; -use firepilot::microvm::{BootSource, Config, Drive, MicroVM, NetworkInterface}; -use firepilot::Firecracker; -use ipnetwork::Ipv4Network; -use lz4::Decoder; -use node_metrics::metrics_manager::MetricsManager; -use oci::image_manager::ImageManager; -use proto::common::{InstanceMetric, WorkerMetric, WorkerRegistration, WorkerStatus}; -use proto::worker::worker_client::WorkerClient; -use proto::worker::InstanceScheduling; -use shared::utils::ip_allocator::IpAllocator; -use std::collections::HashMap; -use std::error::Error; -use std::fs::File; -use std::io::Write; -use std::net::Ipv4Addr; -use std::path::{Path, PathBuf}; -use std::process::Command; -use std::time::Duration; -use std::{fs, io, thread}; -use tonic::{transport::Channel, Request, Streaming}; -use tracing::{event, Level}; - -// const TAP_SCRIPT_DEFAULT_LOCATION: &str = "/app/setup-host-tap.sh"; -const MASK_LONG: &str = "255.255.255.252"; -const DEFAULT_AGENT_PORT: u16 = 8080; - -#[derive(Debug)] -pub struct Riklet { - hostname: String, - client: WorkerClient, - stream: Streaming, - image_manager: ImageManager, - container_runtime: Runc, - workloads: HashMap>, - ip_allocator: IpAllocator, - function_config: FnConfiguration, -} - -impl Riklet { - /// Display a banner - fn banner() { - println!( - r#" - ______ _____ _ __ _ _____ _____ - | ___ \_ _| | / /| | | ___|_ _| - | |_/ / | | | |/ / | | | |__ | | - | / | | | \ | | | __| | | - | |\ \ _| |_| |\ \| |____| |___ | | - \_| \_|\___/\_| \_/\_____/\____/ \_/ - "# - ); - } - - /// Bootstrap a Riklet in order to run properly. - pub async fn bootstrap() -> Result> { - event!(Level::DEBUG, "Riklet bootstraping process started."); - // Get the hostname of the host to register - let hostname = gethostname::gethostname().into_string().unwrap(); - - // Display the banner, just for fun :D - Riklet::banner(); - - // Load the configuration - let config = Configuration::load()?; - - // load the function runtime configuration - let function_config = FnConfiguration::load(); - - // Connect to the master node scheduler - let mut client = WorkerClient::connect(config.master_ip.clone()).await?; - event!(Level::DEBUG, "gRPC WorkerClient connected."); - - event!(Level::DEBUG, "Node's registration to the master"); - let request = Request::new(WorkerRegistration { - hostname: hostname.clone(), - }); - let stream = client.register(request).await?.into_inner(); - - event!(Level::DEBUG, "Container runtime initialization"); - let container_runtime = Runc::new(config.runner.clone())?; - event!(Level::DEBUG, "Image manager initialization"); - let image_manager = ImageManager::new(config.manager.clone())?; - - // Initialize the ip allocator - let network = Ipv4Network::new(Ipv4Addr::new(192, 168, 1, 0), 24).unwrap(); - let ip_allocator = IpAllocator::new(network); - - Ok(Self { - hostname, - container_runtime, - image_manager, - client, - stream, - workloads: HashMap::>::new(), - ip_allocator, - function_config, - }) - } - - /// Handle a workload (eg CREATE, UPDATE, DELETE, READ) - pub async fn handle_workload( - &mut self, - workload: &InstanceScheduling, - ) -> Result<(), Box> { - event!(Level::DEBUG, "Handling workload"); - match &workload.action { - // Create - 0 => { - self.create_workload(workload).await?; - } - // Delete - 1 => { - self.delete_workload(workload).await?; - } - _ => { - event!(Level::ERROR, "Method not allowed") - } - } - - Ok(()) - } - - fn download_image(url: &String, file_path: &String) -> Result<(), Box> { - event!( - Level::DEBUG, - "Downloading image from {} to {}", - url, - file_path - ); - - let mut easy = Easy::new(); - let mut buffer = Vec::new(); - easy.url(&url)?; - easy.follow_location(true)?; - - { - let mut transfer = easy.transfer(); - transfer.write_function(|data| { - buffer.extend_from_slice(data); - Ok(data.len()) - })?; - transfer.perform()?; - } - - let response_code = easy.response_code()?; - if response_code != 200 { - return Err(format!("Response code from registry: {}", response_code).into()); - } - - { - event!(Level::DEBUG, "Writing data to {}", file_path); - let mut file = File::create(&file_path)?; - file.write_all(buffer.as_slice())?; - } - - Ok(()) - } - - fn decompress(source: &Path, destination: &Path) -> Result<(), Box> { - let input_file = File::open(source)?; - let mut decoder = Decoder::new(input_file)?; - let mut output_file = File::create(destination)?; - io::copy(&mut decoder, &mut output_file)?; - Ok(()) - } - - async fn create_workload( - &mut self, - workload: &InstanceScheduling, - ) -> Result<(), Box> { - event!(Level::DEBUG, "Creating workload"); - let workload_definition: WorkloadDefinition = - serde_json::from_str(&workload.definition[..]).unwrap(); - let instance_id: &String = &workload.instance_id; - - if workload_definition.kind == "Function" { - event!(Level::INFO, "Function workload detected"); - - let rootfs_url = workload_definition - .spec - .function - .clone() - .unwrap() - .execution - .rootfs - .to_string(); - - let download_directory = format!("/tmp/{}", &workload_definition.name); - let file_path = format!("{}/rootfs.ext4", &download_directory); - - let file_pathbuf = Path::new(&file_path); - if !file_pathbuf.exists() { - let lz4_path = format!("{}.lz4", &file_path); - fs::create_dir(&download_directory)?; - - Self::download_image(&rootfs_url, &lz4_path).map_err(|e| { - event!(Level::ERROR, "Error while downloading image: {}", e); - fs::remove_dir_all(&download_directory) - .expect("Error while removing directory"); - e - })?; - - Self::decompress(Path::new(&lz4_path), file_pathbuf).map_err(|e| { - event!(Level::ERROR, "Error while decompressing image: {}", e); - fs::remove_dir_all(&download_directory) - .expect("Error while removing directory"); - e - })?; - } - - // Get port to expose function - let exposed_port = workload_definition - .spec - .function - .map(|f| f.exposure.map(|e| e.port)) - .flatten() - .ok_or(()) - .unwrap(); - - // Alocate ip range for tap interface and firecracker micro VM - let subnet = self - .ip_allocator - .allocate_subnet() - .ok_or("No more internal ip available")?; - - let tap_ip = subnet.nth(1).ok_or("Fail get tap ip")?; - let firecracker_ip = subnet.nth(2).ok_or("Fail to get firecracker ip")?; - - let output = Command::new("/bin/sh") - .arg(&self.function_config.script_path) - .arg(&workload_definition.name) - .arg(tap_ip.to_string()) - .output()?; - - if !output.status.success() { - let stderr = String::from_utf8_lossy(&output.stderr); - if !stderr.is_empty() { - event!(Level::ERROR, "stderr: {}", stderr); - } - return Err(stderr.into()); - } - - // Create a new IPTables object - let ipt = iptables::new(false).unwrap(); - - // Port forward microvm on the host - ipt.append( - "nat", - "OUTPUT", - &format!( - "-p tcp --dport {} -d {} -j DNAT --to-destination {}:{}", - exposed_port, self.function_config.ifnet_ip, firecracker_ip, DEFAULT_AGENT_PORT - ), - ) - .unwrap(); - - // Allow NAT on the interface connected to the internet. - ipt.append( - "nat", - "POSTROUTING", - &format!("-o {} -j MASQUERADE", self.function_config.ifnet), - ) - .unwrap(); - - // Add the FORWARD rules to the filter table - ipt.append_unique( - "filter", - "FORWARD", - &"-m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT", - ) - .unwrap(); - ipt.append( - "filter", - "FORWARD", - &format!( - "-i rik-{}-tap -o {} -j ACCEPT", - workload_definition.name, self.function_config.ifnet - ), - ) - .unwrap(); - - let firecracker = Firecracker::new(Some(firepilot::FirecrackerOptions { - command: Some(self.function_config.firecracker_location.clone()), - ..Default::default() - })) - .unwrap(); - - event!(Level::DEBUG, "Creating a new MicroVM"); - let vm = MicroVM::from(Config { - boot_source: BootSource { - kernel_image_path: self.function_config.kernel_location.clone(), - boot_args: Some(format!( - "console=ttyS0 reboot=k nomodules random.trust_cpu=on panic=1 pci=off tsc=reliable i8042.nokbd i8042.noaux ipv6.disable=1 quiet loglevel=0 ip={firecracker_ip}::{tap_ip}:{MASK_LONG}::eth0:off" - )), - initrd_path: None, - }, - drives: vec![Drive { - drive_id: "rootfs".to_string(), - path_on_host: PathBuf::from(&file_path), - is_read_only: false, - is_root_device: true, - }], - network_interfaces: vec![NetworkInterface { - iface_id: "eth0".to_string(), - guest_mac: Some("AA:FC:00:00:00:01".to_string()), - host_dev_name: format!("rik-{}-tap", workload_definition.name), - }], - }); - - event!(Level::DEBUG, "Starting the MicroVM"); - thread::spawn(move || { - firecracker.start(&vm).unwrap(); - }); - - event!( - Level::INFO, - "Function '{}' scheduled and available at {}:{}", - workload_definition.name, - firecracker_ip, - DEFAULT_AGENT_PORT - ) - } else { - event!(Level::INFO, "Container workload detected"); - - let containers = workload_definition.get_containers(instance_id); - - // Inform the scheduler that the workload is creating - self.send_status(5, instance_id).await; - - self.workloads - .insert(instance_id.clone(), containers.clone()); - - for container in containers { - let id = container.id.unwrap(); - - let image = &self.image_manager.pull(&container.image[..]).await?; - - // New console socket for the container - let socket_path = PathBuf::from(format!("/tmp/{}", &id)); - let console_socket = ConsoleSocket::new(&socket_path)?; - - tokio::spawn(async move { - match console_socket - .get_listener() - .as_ref() - .unwrap() - .accept() - .await - { - Ok((stream, _socket_addr)) => { - Box::leak(Box::new(stream)); - } - Err(err) => { - event!(Level::ERROR, "Receive PTY master error : {:?}", err) - } - } - }); - self.container_runtime - .run( - &id[..], - image.bundle.as_ref().unwrap(), - Some(&CreateArgs { - pid_file: None, - console_socket: Some(socket_path), - no_pivot: false, - no_new_keyring: false, - detach: true, - }), - ) - .await?; - - event!(Level::INFO, "Started container {}", id); - } - } - - event!( - Level::INFO, - "Workload '{}' successfully processed.", - &workload.instance_id - ); - - event!( - Level::DEBUG, - "Informing the scheduler that the containers are running" - ); - self.send_status(2, instance_id).await; - - Ok(()) - } - - async fn delete_workload( - &mut self, - workload: &InstanceScheduling, - ) -> Result<(), Box> { - let instance_id = &workload.instance_id; - let containers = self.workloads.get(&instance_id[..]).unwrap(); - - for container in containers { - event!( - Level::INFO, - "Destroying container {}", - &container.id.as_ref().unwrap() - ); - self.container_runtime - .delete( - &container.id.as_ref().unwrap()[..], - Some(&DeleteArgs { force: true }), - ) - .await - .unwrap_or_else(|err| { - event!(Level::ERROR, "Error while destroying container : {:?}", err) - }); - } - - event!( - Level::INFO, - "Workload '{}' successfully destroyed.", - &workload.instance_id - ); - - // Inform the scheduler that the containers are running - self.send_status(4, instance_id).await; - - Ok(()) - } - - async fn send_status(&self, status: i32, instance_id: &str) { - event!(Level::DEBUG, "Sending status : {}", status); - MetricsEmitter::emit_event( - self.client.clone(), - vec![WorkerStatus { - identifier: self.hostname.clone(), - host_address: None, - status: Some(proto::common::worker_status::Status::Instance( - InstanceMetric { - instance_id: instance_id.to_string().clone(), - status, - metrics: "".to_string(), - }, - )), - }], - ) - .await - .unwrap_or_else(|err| event!(Level::ERROR, "Error while sending status : {:?}", err)); - } - - /// Run the metrics updater - fn start_metrics_updater(&self) { - event!(Level::INFO, "Starting metrics updater"); - let client = self.client.clone(); - let hostname = self.hostname.clone(); - - tokio::spawn(async move { - let mut metrics_manager = MetricsManager::new(); - loop { - let node_metric = metrics_manager.fetch(); - MetricsEmitter::emit_event( - client.clone(), - vec![WorkerStatus { - host_address: None, - identifier: hostname.clone(), - status: Some(proto::common::worker_status::Status::Worker(WorkerMetric { - status: 2, - metrics: node_metric.to_json().unwrap(), - })), - }], - ) - .await - .unwrap_or_else(|err| { - event!(Level::ERROR, "Error while sending metrics : {:?}", err) - }); - tokio::time::sleep(Duration::from_millis(15000)).await; - } - }); - } - - /// Wait for workloads - pub async fn accept(&mut self) -> Result<(), Box> { - event!(Level::INFO, "Riklet is running."); - // Start the metrics updater - self.start_metrics_updater(); - - while let Some(workload) = &self.stream.message().await? { - let _ = self.handle_workload(workload).await; - } - Ok(()) - } -} diff --git a/riklet/src/iptables/mod.rs b/riklet/src/iptables/mod.rs index 3a556bee..3d08ab5c 100644 --- a/riklet/src/iptables/mod.rs +++ b/riklet/src/iptables/mod.rs @@ -145,7 +145,7 @@ impl Iptables { iptables::new(false) .map(|iptables| Iptables { inner: iptables, - cleanup: cleanup, + cleanup, rules: vec![], }) .map_err(|e| IptablesError::LoadFailed(e.to_string())) diff --git a/riklet/src/iptables/platform_linux.rs b/riklet/src/iptables/platform_linux.rs index cf96b52a..9b332176 100644 --- a/riklet/src/iptables/platform_linux.rs +++ b/riklet/src/iptables/platform_linux.rs @@ -27,10 +27,7 @@ impl MutateIptables for Iptables { self.inner .append(&rule.table.to_string(), &rule.chain.to_string(), &rule.rule) .map_err(|e| IptablesError::LoadFailed(e.to_string())) - .and_then(|_| { - self.rules.push(rule.clone()); - Ok(()) - }) + .map(|_| self.rules.push(rule.clone())) } /// Tries to delete a rule, in case it does not exist it will throw [IptablesError::AlreadyDeleted] /// ## Example @@ -54,10 +51,7 @@ impl MutateIptables for Iptables { self.inner .delete(&rule.table.to_string(), &rule.chain.to_string(), &rule.rule) .map_err(|e| IptablesError::LoadFailed(e.to_string())) - .and_then(|_| { - self.rules.retain(|r| r != rule); - Ok(()) - }) + .map(|_| self.rules.retain(|r| r != rule)) } /// Tries to determine whether a rule exists or not. If it does return true, else false diff --git a/riklet/src/iptables/rule.rs b/riklet/src/iptables/rule.rs index cf9d4dd5..ac36f7bd 100644 --- a/riklet/src/iptables/rule.rs +++ b/riklet/src/iptables/rule.rs @@ -10,6 +10,7 @@ pub struct Rule { pub rule: String, } +#[allow(dead_code)] impl Rule { pub fn new(chain: Chain, table: Table, rule: String) -> Self { Rule { chain, table, rule } @@ -18,12 +19,6 @@ impl Rule { impl Display for Rule { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{}({}): {}", - self.table.to_string(), - self.chain.to_string(), - self.rule - ) + write!(f, "{}({}): {}", self.table, self.chain, self.rule) } } diff --git a/riklet/src/main.rs b/riklet/src/main.rs index b942a556..c7ae4e04 100644 --- a/riklet/src/main.rs +++ b/riklet/src/main.rs @@ -10,35 +10,10 @@ mod traits; mod utils; use crate::core::Riklet; +use crate::utils::init_logger; use anyhow::Result; -use once_cell::sync::Lazy; -use shared::utils::ip_allocator::IpAllocator; -use std::sync::Mutex; -use tracing::{event, Level}; -use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Registry}; - -// Initialize Singleton for IpAllocator -static IP_ALLOCATOR: Lazy> = Lazy::new(|| { - let ip_allocator = IpAllocator::new().expect("Fail to load IP allocator"); - Mutex::new(ip_allocator) -}); - -pub fn init_logger(log_level: Option) -> Result<()> { - let logger = tracing_subscriber::fmt::layer().json(); - // Try to get the log level from the environment variable `RUST_LOG`. - // If the variable is not defined, then use the cli argument or the default value 'info' if neither is defined - let env_filter = EnvFilter::try_from_default_env() - .or_else(|_| { - let level = log_level.unwrap_or("info".to_string()); - EnvFilter::try_new(level.as_str()) - })? - .add_directive("h2=OFF".parse().unwrap()); // disable all events from the `h2` crate; - - let collector = Registry::default().with(logger).with(env_filter); - tracing::subscriber::set_global_default(collector)?; - Ok(()) -} +use tracing::{event, Level}; #[tokio::main] async fn main() -> Result<()> { diff --git a/riklet/src/runtime/function_runtime.rs b/riklet/src/runtime/function_runtime.rs index 59a5ebac..c3e77046 100644 --- a/riklet/src/runtime/function_runtime.rs +++ b/riklet/src/runtime/function_runtime.rs @@ -1,6 +1,6 @@ use crate::{ cli::{config::Configuration, function_config::FnConfiguration}, - runtime::{network::RuntimeNetwork, RuntimeError, RuntimeManagerError}, + runtime::{network::RuntimeNetwork, RuntimeError}, structs::WorkloadDefinition, }; use async_trait::async_trait; @@ -20,7 +20,7 @@ use std::{ }; use tracing::{event, Level}; -use super::{network::FunctionRuntimeNetwork, Runtime, RuntimeManager}; +use super::{network::function_network::FunctionRuntimeNetwork, Runtime, RuntimeManager}; #[derive(Debug)] struct FunctionRuntime { @@ -32,8 +32,11 @@ struct FunctionRuntime { #[async_trait] impl Runtime for FunctionRuntime { - async fn run(&mut self) -> super::RuntimeResult<()> { - self.network.init().await.map_err(RuntimeError::Network)?; + async fn run(&mut self) -> super::Result<()> { + self.network + .init() + .await + .map_err(RuntimeError::NetworkError)?; event!(Level::INFO, "Function workload detected"); @@ -43,7 +46,7 @@ impl Runtime for FunctionRuntime { command: Some(self.function_config.firecracker_location.clone()), ..Default::default() }) - .map_err(RuntimeError::Firecracker)?; + .map_err(RuntimeError::FirecrackerError)?; event!(Level::DEBUG, "Creating a new MicroVM"); let vm = MicroVM::from(Config { @@ -69,9 +72,11 @@ impl Runtime for FunctionRuntime { }); event!(Level::DEBUG, "Starting the MicroVM"); - thread::spawn(move || -> super::RuntimeResult<()> { + thread::spawn(move || -> super::Result<()> { event!(Level::INFO, "Function started"); - firecracker.start(&vm).map_err(RuntimeError::Firecracker)?; + firecracker + .start(&vm) + .map_err(RuntimeError::FirecrackerError)?; Ok(()) }); @@ -106,9 +111,9 @@ impl FunctionRuntimeManager { let mut easy = Easy::new(); let mut buffer = Vec::new(); - easy.url(&url).map_err(RuntimeManagerError::CurlError)?; + easy.url(url).map_err(RuntimeError::FetchingError)?; easy.follow_location(true) - .map_err(RuntimeManagerError::CurlError)?; + .map_err(RuntimeError::FetchingError)?; { let mut transfer = easy.transfer(); @@ -117,32 +122,30 @@ impl FunctionRuntimeManager { buffer.extend_from_slice(data); Ok(data.len()) }) - .map_err(RuntimeManagerError::CurlError)?; - transfer.perform().map_err(RuntimeManagerError::CurlError)?; + .map_err(RuntimeError::FetchingError)?; + transfer.perform().map_err(RuntimeError::FetchingError)?; } - let response_code = easy - .response_code() - .map_err(RuntimeManagerError::CurlError)?; + let response_code = easy.response_code().map_err(RuntimeError::FetchingError)?; if response_code != 200 { // return Err(format!("Response code from registry: {}", response_code).into()); } { event!(Level::DEBUG, "Writing data to {}", file_path); - let mut file = File::create(&file_path).map_err(RuntimeManagerError::IoError)?; + let mut file = File::create(file_path).map_err(RuntimeError::IoError)?; file.write_all(buffer.as_slice()) - .map_err(RuntimeManagerError::IoError)?; + .map_err(RuntimeError::IoError)?; } Ok(()) } fn decompress(&self, source: &Path, destination: &Path) -> super::Result<()> { - let input_file = File::open(source).map_err(RuntimeManagerError::IoError)?; - let mut decoder = Decoder::new(input_file).map_err(RuntimeManagerError::IoError)?; - let mut output_file = File::create(destination).map_err(RuntimeManagerError::IoError)?; - io::copy(&mut decoder, &mut output_file).map_err(RuntimeManagerError::IoError)?; + let input_file = File::open(source).map_err(RuntimeError::IoError)?; + let mut decoder = Decoder::new(input_file).map_err(RuntimeError::IoError)?; + let mut output_file = File::create(destination).map_err(RuntimeError::IoError)?; + io::copy(&mut decoder, &mut output_file).map_err(RuntimeError::IoError)?; Ok(()) } @@ -155,7 +158,7 @@ impl FunctionRuntimeManager { if !file_pathbuf.exists() { let lz4_path = format!("{}.lz4", &file_path); - fs::create_dir(&download_directory).map_err(RuntimeManagerError::IoError)?; + fs::create_dir(&download_directory).map_err(RuntimeError::IoError)?; self.download_image(&rootfs_url, &lz4_path).map_err(|e| { event!(Level::ERROR, "Error while downloading image: {}", e); @@ -184,13 +187,12 @@ impl RuntimeManager for FunctionRuntimeManager { event!(Level::INFO, "Function workload detected"); let workload_definition: WorkloadDefinition = serde_json::from_str(workload.definition.as_str()) - .map_err(RuntimeManagerError::JsonError)?; + .map_err(RuntimeError::ParsingError)?; Ok(Box::new(FunctionRuntime { function_config: FnConfiguration::load(), file_path: self.create_fs(&workload_definition)?, - network: FunctionRuntimeNetwork::new(&workload) - .map_err(RuntimeManagerError::Network)?, + network: FunctionRuntimeNetwork::new(&workload).map_err(RuntimeError::NetworkError)?, workload_definition, })) } diff --git a/riklet/src/runtime/mod.rs b/riklet/src/runtime/mod.rs index 09d3e96c..a651763f 100644 --- a/riklet/src/runtime/mod.rs +++ b/riklet/src/runtime/mod.rs @@ -14,49 +14,34 @@ use std::fmt::Debug; use thiserror::Error; #[derive(Debug, Error)] -pub enum RuntimeManagerError { - #[error("Runtime error: {0}")] - Runtime(RuntimeError), - +pub enum RuntimeError { #[error("Network error: {0}")] - Network(NetworkError), + NetworkError(NetworkError), - #[error("Curl error: {0}")] - CurlError(curl::Error), + #[error("Fetching error: {0}")] + FetchingError(curl::Error), #[error("IO error: {0}")] IoError(std::io::Error), - #[error("JSON error: {0}")] - JsonError(serde_json::Error), + #[error("Parsing error: {0}")] + ParsingError(serde_json::Error), #[error("OCI error: {0}")] - OCI(oci::Error), + OciError(oci::Error), #[error("CRI error: {0}")] - CRI(cri::Error), -} + CriError(cri::Error), -type Result = std::result::Result; - -#[derive(Debug, Error)] -pub enum RuntimeError { #[error("Firecracker error: {0}")] - Firecracker(FirecrackerError), - - #[error("OCI error: {0}")] - OCI(oci::Error), + FirecrackerError(FirecrackerError), +} - #[error("CRI error: {0}")] - CRI(cri::Error), +type Result = std::result::Result; - #[error("Network error: {0}")] - Network(NetworkError), -} -type RuntimeResult = std::result::Result; #[async_trait] pub trait Runtime: Send + Sync + Debug { - async fn run(&mut self) -> RuntimeResult<()>; + async fn run(&mut self) -> Result<()>; } #[async_trait] @@ -74,7 +59,7 @@ pub trait RuntimeManager: Send + Sync { config: Configuration, ) -> Result> { let mut runtime = self.create_runtime(workload.clone(), config.clone())?; - runtime.run().await.map_err(RuntimeManagerError::Runtime)?; + runtime.run().await?; Ok(runtime) } @@ -85,15 +70,15 @@ pub trait RuntimeManager: Send + Sync { } enum WorkloadKind { - FUNCTION, - POD, + Function, + Pod, } -impl Into for String { - fn into(self) -> WorkloadKind { - match self.as_str() { - "Function" => WorkloadKind::FUNCTION, - "Pod" => WorkloadKind::POD, +impl From for WorkloadKind { + fn from(kind: String) -> Self { + match kind.as_str() { + "Function" => WorkloadKind::Function, + "Pod" => WorkloadKind::Pod, _ => panic!("Unknown workload kind"), } } @@ -104,8 +89,8 @@ pub type DynamicRuntimeManager<'a> = &'a dyn RuntimeManager; impl RuntimeConfigurator { pub fn create(workload_definition: &WorkloadDefinition) -> DynamicRuntimeManager { match workload_definition.kind.clone().into() { - WorkloadKind::FUNCTION => &FunctionRuntimeManager {}, - WorkloadKind::POD => &PodRuntimeManager {}, + WorkloadKind::Function => &FunctionRuntimeManager {}, + WorkloadKind::Pod => &PodRuntimeManager {}, } } } diff --git a/riklet/src/runtime/network/function_network.rs b/riklet/src/runtime/network/function_network.rs new file mode 100644 index 00000000..15918622 --- /dev/null +++ b/riklet/src/runtime/network/function_network.rs @@ -0,0 +1,130 @@ +use async_trait::async_trait; +use proto::worker::InstanceScheduling; +use std::fmt::Debug; +use std::net::Ipv4Addr; +use tracing::debug; + +use crate::network::net::{Net, NetworkInterfaceConfig}; +use crate::{ + cli::function_config::FnConfiguration, + iptables::{rule::Rule, Chain, Iptables, MutateIptables, Table}, + structs::WorkloadDefinition, +}; + +use super::{NetworkError, Result, RuntimeNetwork, IP_ALLOCATOR}; + +#[derive(Debug, Clone)] +pub struct FunctionRuntimeNetwork { + pub mask_long: String, + pub firecracker_ip: Ipv4Addr, + pub tap_ip: Ipv4Addr, + pub function_config: FnConfiguration, + pub default_agent_port: u16, + pub workload_definition: WorkloadDefinition, + pub workload: InstanceScheduling, +} + +impl FunctionRuntimeNetwork { + pub fn new(workload: &InstanceScheduling) -> Result { + let default_agent_port: u16 = 8080; + let mask_long: &str = "255.255.255.252"; + + let workload_definition: WorkloadDefinition = + serde_json::from_str(workload.definition.as_str()) + .map_err(NetworkError::ParsingError)?; + + // Alocate ip range for tap interface and firecracker micro VM + let subnet = IP_ALLOCATOR + .lock() + .unwrap() + .allocate_subnet() + .ok_or_else(|| NetworkError::Error("No more internal ip available".to_string()))?; + + let tap_ip = subnet + .nth(1) + .ok_or_else(|| NetworkError::Error("Fail get tap ip".to_string()))?; + + let firecracker_ip = subnet + .nth(2) + .ok_or_else(|| NetworkError::Error("Fail to get firecracker ip".to_string()))?; + + Ok(FunctionRuntimeNetwork { + mask_long: mask_long.to_string(), + firecracker_ip, + function_config: FnConfiguration::load(), + tap_ip, + default_agent_port, + workload: workload.clone(), + workload_definition, + }) + } +} + +#[async_trait] +impl RuntimeNetwork for FunctionRuntimeNetwork { + async fn init(&self) -> Result<()> { + println!("Function network initialized"); + + // Port forward microvm on the host + let exposed_port = self.workload_definition.get_expected_port(); + + let config = NetworkInterfaceConfig::new( + self.workload.instance_id.clone(), + self.workload_definition.name.clone(), + self.tap_ip, + ) + .map_err(NetworkError::NetworkInterfaceError)?; + + let _tap = Net::new_with_tap(config) + .await + .map_err(NetworkError::NetworkInterfaceError)?; + debug!("Waiting for the microvm to start"); + + // Create a new IPTables object + let mut ipt = Iptables::new(false).map_err(NetworkError::IptablesError)?; + + let rule = Rule { + rule: format!( + "-p tcp --dport {} -d {} -j DNAT --to-destination {}:{}", + exposed_port, + self.function_config.ifnet_ip, + self.firecracker_ip, + self.default_agent_port + ), + chain: Chain::Output, + table: Table::Nat, + }; + ipt.create(&rule).map_err(NetworkError::IptablesError)?; + + // Allow NAT on the interface connected to the internet. + let rule = Rule { + rule: format!("-o {} -j MASQUERADE", self.function_config.ifnet), + chain: Chain::PostRouting, + table: Table::Nat, + }; + ipt.create(&rule).map_err(NetworkError::IptablesError)?; + + // Add the FORWARD rules to the filter table + let rule = Rule { + rule: "-m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT".to_string(), + chain: Chain::Forward, + table: Table::Filter, + }; + ipt.create(&rule).map_err(NetworkError::IptablesError)?; + let rule = Rule { + rule: format!( + "-i rik-{}-tap -o {} -j ACCEPT", + self.workload_definition.name, self.function_config.ifnet + ), + chain: Chain::Forward, + table: Table::Filter, + }; + ipt.create(&rule).map_err(NetworkError::IptablesError)?; + + Ok(()) + } + + async fn destroy(&self) -> Result<()> { + todo!() + } +} diff --git a/riklet/src/runtime/network/mod.rs b/riklet/src/runtime/network/mod.rs index a7dc3a68..5f4bcacb 100644 --- a/riklet/src/runtime/network/mod.rs +++ b/riklet/src/runtime/network/mod.rs @@ -1,172 +1,42 @@ -use std::{net::Ipv4Addr, process::Command}; +pub mod function_network; +pub mod pod_network; use async_trait::async_trait; -use proto::worker::InstanceScheduling; +use once_cell::sync::Lazy; +use shared::utils::ip_allocator::IpAllocator; use std::fmt::Debug; +use std::sync::Mutex; use thiserror::Error; -use tracing::{debug, event, Level}; + +use crate::iptables::IptablesError; +use crate::network::net::NetworkInterfaceError; + +// Initialize Singleton for IpAllocator +static IP_ALLOCATOR: Lazy> = Lazy::new(|| { + let ip_allocator = IpAllocator::new().expect("Fail to load IP allocator"); + Mutex::new(ip_allocator) +}); #[derive(Debug, Error)] pub enum NetworkError { #[error("Network error: {0}")] - CommonNetworkError(String), - - #[error("IO error: {0}")] - IoError(std::io::Error), + Error(String), #[error("Iptables error: {0}")] - Iptables(IptablesError), + IptablesError(IptablesError), + + #[error("Parsing error: {0}")] + ParsingError(serde_json::Error), - #[error("Json error: {0}")] - JsonError(serde_json::Error), + #[error("Network interface error: {0}")] + NetworkInterfaceError(NetworkInterfaceError), } type Result = std::result::Result; -use crate::network::net::{Net, NetworkInterfaceConfig}; -use crate::{ - cli::function_config::FnConfiguration, - iptables::{rule::Rule, Chain, Iptables, IptablesError, MutateIptables, Table}, - structs::WorkloadDefinition, - IP_ALLOCATOR, -}; - #[async_trait] pub trait RuntimeNetwork: Send + Sync + Debug { async fn init(&self) -> Result<()>; async fn destroy(&self) -> Result<()>; } - -#[derive(Debug, Clone)] -pub struct FunctionRuntimeNetwork { - pub mask_long: String, - pub firecracker_ip: Ipv4Addr, - pub tap_ip: Ipv4Addr, - pub function_config: FnConfiguration, - pub default_agent_port: u16, - pub workload_definition: WorkloadDefinition, - pub workload: InstanceScheduling, -} - -impl FunctionRuntimeNetwork { - pub fn new(workload: &InstanceScheduling) -> Result { - let default_agent_port: u16 = 8080; - let mask_long: &str = "255.255.255.252"; - let workload_definition: WorkloadDefinition = - serde_json::from_str(workload.definition.as_str()).map_err(NetworkError::JsonError)?; - - // Alocate ip range for tap interface and firecracker micro VM - let subnet = IP_ALLOCATOR - .lock() - .unwrap() - .allocate_subnet() - .ok_or("No more internal ip available") - .map_err(|e| NetworkError::CommonNetworkError(e.to_string()))?; - - let tap_ip = subnet - .nth(1) - .ok_or("Fail get tap ip") - .map_err(|e| NetworkError::CommonNetworkError(e.to_string()))?; - - let firecracker_ip = subnet - .nth(2) - .ok_or("Fail to get firecracker ip") - .map_err(|e| NetworkError::CommonNetworkError(e.to_string()))?; - - Ok(FunctionRuntimeNetwork { - mask_long: mask_long.to_string(), - firecracker_ip, - function_config: FnConfiguration::load(), - tap_ip, - default_agent_port, - workload: workload.clone(), - workload_definition: workload_definition.clone(), - }) - } -} - -#[async_trait] -impl RuntimeNetwork for FunctionRuntimeNetwork { - async fn init(&self) -> Result<()> { - println!("Function network initialized"); - - let config = NetworkInterfaceConfig::new( - self.workload.instance_id.clone(), - self.workload_definition.name.clone(), - self.tap_ip, - ) - .unwrap(); - let tap = Net::new_with_tap(config).await.unwrap(); // TODO Error; - debug!("Waiting for the microvm to start"); - - // Create a new IPTables object - let mut ipt = Iptables::new(false).map_err(NetworkError::Iptables)?; - - // Port forward microvm on the host - let exposed_port = self.workload_definition.get_expected_port(); - let rule = Rule { - rule: format!( - "-p tcp --dport {} -d {} -j DNAT --to-destination {}:{}", - exposed_port, - self.function_config.ifnet_ip, - self.firecracker_ip, - self.default_agent_port - ), - chain: Chain::Output, - table: Table::Nat, - }; - ipt.create(&rule).map_err(NetworkError::Iptables)?; - - // Allow NAT on the interface connected to the internet. - let rule = Rule { - rule: format!("-o {} -j MASQUERADE", self.function_config.ifnet), - chain: Chain::PostRouting, - table: Table::Nat, - }; - ipt.create(&rule).map_err(NetworkError::Iptables)?; - - // Add the FORWARD rules to the filter table - let rule = Rule { - rule: "-m conntrack --ctstate RELATED,ESTABLISHED -j ACCEPT".to_string(), - chain: Chain::Forward, - table: Table::Filter, - }; - ipt.create(&rule).map_err(NetworkError::Iptables)?; - let rule = Rule { - rule: format!( - "-i rik-{}-tap -o {} -j ACCEPT", - self.workload_definition.name, self.function_config.ifnet - ), - chain: Chain::Forward, - table: Table::Filter, - }; - ipt.create(&rule).map_err(NetworkError::Iptables)?; - - Ok(()) - } - - async fn destroy(&self) -> Result<()> { - todo!() - } -} - -#[derive(Debug)] -pub struct PodRuntimeNetwork {} - -impl PodRuntimeNetwork { - pub fn new() -> Self { - PodRuntimeNetwork {} - } -} - -#[async_trait] -impl RuntimeNetwork for PodRuntimeNetwork { - async fn init(&self) -> Result<()> { - todo!() - } - - async fn destroy(&self) -> Result<()> { - todo!() - } -} diff --git a/riklet/src/runtime/network/pod_network.rs b/riklet/src/runtime/network/pod_network.rs new file mode 100644 index 00000000..622105dc --- /dev/null +++ b/riklet/src/runtime/network/pod_network.rs @@ -0,0 +1,24 @@ +use async_trait::async_trait; +use std::fmt::Debug; + +use super::{Result, RuntimeNetwork}; + +#[derive(Debug)] +pub struct PodRuntimeNetwork {} + +impl PodRuntimeNetwork { + pub fn new() -> Self { + PodRuntimeNetwork {} + } +} + +#[async_trait] +impl RuntimeNetwork for PodRuntimeNetwork { + async fn init(&self) -> Result<()> { + todo!() + } + + async fn destroy(&self) -> Result<()> { + todo!() + } +} diff --git a/riklet/src/runtime/pod_runtime.rs b/riklet/src/runtime/pod_runtime.rs index 41fc9072..04834dff 100644 --- a/riklet/src/runtime/pod_runtime.rs +++ b/riklet/src/runtime/pod_runtime.rs @@ -14,7 +14,7 @@ use proto::worker::InstanceScheduling; use std::path::PathBuf; use tracing::{event, Level}; -use super::{network::PodRuntimeNetwork, Runtime, RuntimeManager, RuntimeManagerError}; +use super::{network::pod_network::PodRuntimeNetwork, Runtime, RuntimeManager}; #[derive(Debug)] struct PodRuntime { @@ -27,19 +27,16 @@ struct PodRuntime { #[async_trait] impl Runtime for PodRuntime { - async fn run(&mut self) -> super::RuntimeResult<()> { - self.network.init().await.map_err(RuntimeError::Network)?; + async fn run(&mut self) -> super::Result<()> { + self.network + .init() + .await + .map_err(RuntimeError::NetworkError)?; event!(Level::INFO, "Container workload detected"); let containers = self.workload_definition.get_containers(&self.instance_id); - // Inform the scheduler that the workload is creating - // self.send_status(5, instance_id).await; - - // self.workloads - // .insert(instance_id.clone(), containers.clone()); - for container in containers { let id = container.id.unwrap(); // TODO Some / None @@ -47,11 +44,12 @@ impl Runtime for PodRuntime { .image_manager .pull(&container.image[..]) .await - .map_err(RuntimeError::OCI)?; + .map_err(RuntimeError::OciError)?; // New console socket for the container let socket_path = PathBuf::from(format!("/tmp/{}", &id)); - let console_socket = ConsoleSocket::new(&socket_path).map_err(RuntimeError::CRI)?; + let console_socket = + ConsoleSocket::new(&socket_path).map_err(RuntimeError::CriError)?; tokio::spawn(async move { match console_socket @@ -93,10 +91,6 @@ impl Runtime for PodRuntime { pub struct PodRuntimeManager {} impl RuntimeManager for PodRuntimeManager { - // fn create_network(&self, workload: InstanceScheduling) -> super::Result> { - // Ok(Box::new(PodNetwork {})) - // } - fn create_runtime( &self, workload: InstanceScheduling, @@ -108,11 +102,10 @@ impl RuntimeManager for PodRuntimeManager { Ok(Box::new(PodRuntime { image_manager: ImageManager::new(config.manager.clone()) - .map_err(RuntimeManagerError::OCI)?, + .map_err(RuntimeError::OciError)?, workload_definition, network: PodRuntimeNetwork::new(), - container_runtime: Runc::new(config.runner.clone()) - .map_err(RuntimeManagerError::CRI)?, + container_runtime: Runc::new(config.runner).map_err(RuntimeError::CriError)?, instance_id, })) } diff --git a/riklet/src/structs.rs b/riklet/src/structs.rs index 6f57cdaf..4b73b4b1 100644 --- a/riklet/src/structs.rs +++ b/riklet/src/structs.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; use shared::utils::get_random_hash; use tracing::{event, Level}; +#[allow(dead_code)] const DEFAULT_FUNCTION_RUNTIME_PORT: u16 = 3000; #[derive(Serialize, Deserialize, Debug, Clone)] @@ -119,8 +120,7 @@ impl WorkloadDefinition { self.spec .function .as_ref() - .map(|f| f.exposure.as_ref().map(|e| e.port)) - .flatten() + .and_then(|f| f.exposure.as_ref().map(|e| e.port)) .unwrap() // TODO unwrap } } diff --git a/riklet/src/utils.rs b/riklet/src/utils.rs index 3d65b340..7b32d93b 100644 --- a/riklet/src/utils.rs +++ b/riklet/src/utils.rs @@ -1,3 +1,6 @@ +use anyhow::Result; +use tracing_subscriber::{prelude::__tracing_subscriber_SubscriberExt, EnvFilter, Registry}; + pub fn banner() { println!( r#" @@ -10,3 +13,20 @@ pub fn banner() { "# ); } + +pub fn init_logger(log_level: Option) -> Result<()> { + let logger = tracing_subscriber::fmt::layer().json(); + // Try to get the log level from the environment variable `RUST_LOG`. + // If the variable is not defined, then use the cli argument or the default value 'info' if neither is defined + let env_filter = EnvFilter::try_from_default_env() + .or_else(|_| { + let level = log_level.unwrap_or_else(|| "info".to_string()); + EnvFilter::try_new(level.as_str()) + })? + .add_directive("h2=OFF".parse().unwrap()); // disable all events from the `h2` crate; + + let collector = Registry::default().with(logger).with(env_filter); + tracing::subscriber::set_global_default(collector)?; + + Ok(()) +}