Skip to content

Commit

Permalink
clean code
Browse files Browse the repository at this point in the history
  • Loading branch information
MaloPolese committed Mar 20, 2023
1 parent 60d7f95 commit b7264dc
Show file tree
Hide file tree
Showing 17 changed files with 322 additions and 843 deletions.
48 changes: 43 additions & 5 deletions proto/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::fmt::Display;

use common::{ResourceStatus, WorkloadRequestKind};
use common::{worker_status::Status, InstanceMetric, ResourceStatus, WorkloadRequestKind};
use serde::{Deserialize, Serialize};

use std::ops::Deref;
pub mod common {
tonic::include_proto!("common");
}
Expand Down Expand Up @@ -77,9 +77,9 @@ impl From<ResourceStatus> for InstanceStatus {
}
}

impl Into<i32> for InstanceStatus {
fn into(self) -> i32 {
match self {
impl From<InstanceStatus> for i32 {
fn from(value: InstanceStatus) -> Self {
match value {
InstanceStatus::Unknown(_) => 0,
InstanceStatus::Pending => 1,
InstanceStatus::Running => 2,
Expand Down Expand Up @@ -107,3 +107,41 @@ impl From<i32> for InstanceStatus {
}

pub extern crate protobuf;

pub enum WorkloadAction {
CREATE,
DELETE,
}

pub struct WorkerStatus(pub common::WorkerStatus);
impl WorkerStatus {
pub fn new(identifier: String, instance_id: String, status: InstanceStatus) -> Self {
Self(common::WorkerStatus {
identifier,
host_address: None,
status: Some(Status::Instance(InstanceMetric {
instance_id,
status: status.into(),
metrics: "".to_string(),
})),
})
}
}

impl Deref for WorkerStatus {
type Target = common::WorkerStatus;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<i32> for WorkloadAction {
fn from(value: i32) -> Self {
match value {
0 => WorkloadAction::CREATE,
1 => WorkloadAction::DELETE,
_ => panic!("Unknown workload action"),
}
}
}
2 changes: 1 addition & 1 deletion riklet/crates/oci/src/image_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use tracing::{event, Level};

#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct ImageManagerConfiguration {
pub oci_manager: UmociConfiguration,
pub image_puller: SkopeoConfiguration,
Expand Down
4 changes: 2 additions & 2 deletions riklet/src/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub enum ConfigurationError {

type Result<T> = std::result::Result<T, ConfigurationError>;

#[derive(Deserialize, Debug, Serialize, PartialEq, Clone)]
#[derive(Deserialize, Debug, Serialize, PartialEq, Eq, Clone)]
pub struct Configuration {
pub master_ip: String,
pub log_level: String,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl Configuration {
);
let contents = std::fs::read(path).map_err(ConfigurationError::Load)?;

Ok(toml::from_slice(&contents).map_err(ConfigurationError::Parse)?)
toml::from_slice(&contents).map_err(ConfigurationError::Parse)
}

/// Load the configuration file
Expand Down
77 changes: 13 additions & 64 deletions riklet/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
use crate::cli::config::{Configuration, ConfigurationError};
use crate::emitters::metrics_emitter::MetricsEmitter;
use crate::iptables::rule::Rule;
use crate::iptables::{Chain, Iptables, MutateIptables, Table};
use crate::network::net::{Net, NetworkInterfaceConfig};
use crate::runtime::{DynamicRuntimeManager, Runtime, RuntimeConfigurator, RuntimeManagerError};
use crate::structs::{Container, WorkloadDefinition};
use crate::runtime::{DynamicRuntimeManager, Runtime, RuntimeConfigurator, RuntimeError};
use crate::structs::WorkloadDefinition;
use crate::traits::EventEmitter;
use crate::utils::banner;
use proto::common::{InstanceMetric, WorkerRegistration, WorkerStatus};
use proto::common::WorkerRegistration;
use proto::worker::worker_client::WorkerClient;
use proto::worker::InstanceScheduling;
use proto::InstanceStatus;
use proto::{InstanceStatus, WorkerStatus, WorkloadAction};
use std::collections::HashMap;
use std::net::Ipv4Addr;
use std::path::{Path, PathBuf};

use std::ops::Deref;
use std::time::Duration;
use std::{fs, io, thread};
use thiserror::Error;
use tonic::{transport::Channel, Request, Streaming};
use tracing::{debug, event, Level};
use tracing::{event, Level};

const METRICS_UPDATER_INTERVAL: u64 = 15 * 1000;

Expand All @@ -39,58 +31,19 @@ pub enum RikletError {
ConnectionError(tonic::transport::Error),

#[error("Runtime error: {0}")]
RuntimeManagerError(RuntimeManagerError),
RuntimeManagerError(RuntimeError),
}
type Result<T> = std::result::Result<T, RikletError>;

enum WorkloadAction {
CREATE,
DELETE,
}

struct RikletWorkerStatus(WorkerStatus);
impl RikletWorkerStatus {
fn new(identifier: String, instance_id: String, status: InstanceStatus) -> Self {
Self(WorkerStatus {
identifier,
host_address: None,
status: Some(proto::common::worker_status::Status::Instance(
InstanceMetric {
instance_id,
status: status.into(),
metrics: "".to_string(),
},
)),
})
}
}

impl Deref for RikletWorkerStatus {
type Target = WorkerStatus;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl Into<WorkloadAction> for i32 {
fn into(self) -> WorkloadAction {
match self {
0 => WorkloadAction::CREATE,
1 => WorkloadAction::DELETE,
_ => panic!("Unknown workload action"),
}
}
}

#[derive(Debug)]
pub struct Riklet {
config: Configuration,
hostname: String,
client: WorkerClient<Channel>,
stream: Streaming<InstanceScheduling>,
// Can be pod or function runtimes
// The key is the instance id
runtimes: HashMap<String, Box<dyn Runtime>>,
// function_config: FnConfiguration,
}

impl Riklet {
Expand All @@ -105,7 +58,7 @@ impl Riklet {

println!("workload action: {}", &workload.action);

Ok(match &workload.action.into() {
match &workload.action.into() {
WorkloadAction::CREATE => {
self.create_workload(workload, dynamic_runtime_manager)
.await?
Expand All @@ -114,7 +67,9 @@ impl Riklet {
self.delete_workload(workload, dynamic_runtime_manager)
.await?
}
})
};

Ok(())
}

async fn create_workload(
Expand Down Expand Up @@ -143,8 +98,6 @@ impl Riklet {
) -> Result<()> {
event!(Level::DEBUG, "Destroying workload");
let instance_id: &String = &workload.instance_id;
// Destroy the runtime
// TODO

self.runtimes.remove(instance_id);

Expand All @@ -158,8 +111,7 @@ impl Riklet {
async fn send_status(&self, status: InstanceStatus, instance_id: &str) -> Result<()> {
event!(Level::DEBUG, "Sending status : {}", status);

let status =
RikletWorkerStatus::new(self.hostname.clone(), instance_id.to_string(), status);
let status = WorkerStatus::new(self.hostname.clone(), instance_id.to_string(), status);

MetricsEmitter::emit_event(self.client.clone(), vec![status.0])
.await
Expand Down Expand Up @@ -201,7 +153,6 @@ impl Riklet {
let hostname = gethostname::gethostname().into_string().unwrap();

let config = Configuration::load().map_err(RikletError::ConfigurationError)?;
// let function_config = FnConfiguration::load();

let mut client = WorkerClient::connect(config.master_ip.clone())
.await
Expand All @@ -214,8 +165,6 @@ impl Riklet {
});
let stream = client.register(request).await.unwrap().into_inner();

// TODO Network

Ok(Self {
hostname,
client,
Expand Down
Loading

0 comments on commit b7264dc

Please sign in to comment.