diff --git a/Cargo.lock b/Cargo.lock index 8e974345..9d780019 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -501,6 +501,7 @@ dependencies = [ name = "controller" version = "1.0.0" dependencies = [ + "anyhow", "async-trait", "backoff", "chrono", @@ -517,6 +518,7 @@ dependencies = [ "rusqlite", "serde", "serde_json", + "thiserror", "tiny_http", "tokio", "tokio-stream", diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 143a0fa0..48532e75 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -34,6 +34,8 @@ rstest = "0.16.0" uuid = { version = "1.3.1", features = ["serde", "v4"] } backoff = { version = "0.4.0", features = ["tokio"]} rand = "0.8.4" +thiserror = "1.0.40" +anyhow = "1.0.71" # Instrumentation tracing = { workspace = true } diff --git a/controller/src/api/external/mod.rs b/controller/src/api/external/mod.rs index 8e526af5..03c39da1 100644 --- a/controller/src/api/external/mod.rs +++ b/controller/src/api/external/mod.rs @@ -11,6 +11,8 @@ use tiny_http::{Request, Server as TinyServer}; use tracing::{event, Level}; +use super::RikError; + pub struct Server { internal_sender: Sender, } @@ -20,11 +22,11 @@ impl Server { Server { internal_sender } } - pub fn run(&self, db: Arc) { - self.run_server(db); + pub fn run(&self, db: Arc) -> Result<(), RikError> { + self.run_server(db) } - fn run_server(&self, db: Arc) { + fn run_server(&self, db: Arc) -> Result<(), RikError> { let host = String::from("0.0.0.0"); dotenv().ok(); let port: usize = match std::env::var("PORT") { @@ -41,28 +43,38 @@ impl Server { let db = db.clone(); let internal_sender = self.internal_sender.clone(); - let guard = thread::spawn(move || loop { - let router = routes::Router::new(); - let connection = db.open().unwrap(); + let guard = thread::spawn(move || -> Result<(), RikError> { + loop { + let router = routes::Router::new(); + let connection = db.open().map_err(RikError::DatabaseError)?; - let mut req: Request = server.recv().unwrap(); + let mut req: Request = server.recv().unwrap(); - if let Some(res) = router.handle(&mut req, &connection, &internal_sender) { - req.respond(res).unwrap(); - continue; + if let Some(res) = router.handle(&mut req, &connection, &internal_sender) { + req.respond(res).unwrap(); + continue; + } + event!( + Level::INFO, + "Route {} ({}) could not be found", + req.url(), + req.method() + ); + req.respond(tiny_http::Response::empty(tiny_http::StatusCode::from(404))) + .unwrap(); } - event!( - Level::INFO, - "Route {} ({}) could not be found", - req.url(), - req.method() - ); - req.respond(tiny_http::Response::empty(tiny_http::StatusCode::from(404))) - .unwrap(); }); guards.push(guard); } + + for guard in guards { + guard + .join() + .expect("Couldn't join on the associated thread")? + } + event!(Level::INFO, "Server running on http://{}:{}", host, port); + Ok(()) } } diff --git a/controller/src/api/external/routes/instance.rs b/controller/src/api/external/routes/instance.rs index 746f6e23..ddd6b0c9 100644 --- a/controller/src/api/external/routes/instance.rs +++ b/controller/src/api/external/routes/instance.rs @@ -1,12 +1,10 @@ use definition::workload::WorkloadDefinition; use route_recognizer; use rusqlite::Connection; -use std::io; -use std::str::FromStr; use std::sync::mpsc::Sender; use tracing::{event, Level}; -use crate::api; +use crate::api::external::routes::ContentType; use crate::api::external::services::element::elements_set_right_name; use crate::api::external::services::instance::send_create_instance; use crate::api::types::element::OnlyId; @@ -14,19 +12,23 @@ use crate::api::types::instance::InstanceDefinition; use crate::api::{ApiChannel, Crud}; use crate::core::instance::Instance; use crate::database::RikRepository; +use tiny_http::Header; + +use super::HttpResult; pub fn get( _: &mut tiny_http::Request, _: &route_recognizer::Params, connection: &Connection, _: &Sender, -) -> Result>>, api::RikError> { +) -> HttpResult { if let Ok(mut instances) = RikRepository::find_all(connection, "/instance") { instances = elements_set_right_name(instances.clone()); - let instances_json = serde_json::to_string(&instances).unwrap(); + let instances_json = serde_json::to_string(&instances)?; + event!(Level::INFO, "instances.get, instances found"); Ok(tiny_http::Response::from_string(instances_json) - .with_header(tiny_http::Header::from_str("Content-Type: application/json").unwrap()) + .with_header::
(ContentType::JSON.into()) .with_status_code(tiny_http::StatusCode::from(200))) } else { Ok(tiny_http::Response::from_string("Cannot find instances") @@ -39,9 +41,9 @@ pub fn create( _: &route_recognizer::Params, connection: &Connection, internal_sender: &Sender, -) -> Result>>, api::RikError> { +) -> HttpResult { let mut content = String::new(); - req.as_reader().read_to_string(&mut content).unwrap(); + req.as_reader().read_to_string(&mut content)?; let mut instance: InstanceDefinition = serde_json::from_str(&content)?; @@ -99,8 +101,8 @@ pub fn create( } Ok( - tiny_http::Response::from_string(serde_json::to_string(&instance_names).unwrap()) - .with_header(tiny_http::Header::from_str("Content-Type: application/json").unwrap()) + tiny_http::Response::from_string(serde_json::to_string(&instance_names)?) + .with_header::
(ContentType::JSON.into()) .with_status_code(tiny_http::StatusCode::from(201)), ) } @@ -110,14 +112,13 @@ pub fn delete( _: &route_recognizer::Params, connection: &Connection, internal_sender: &Sender, -) -> Result>>, api::RikError> { +) -> HttpResult { let mut content = String::new(); - req.as_reader().read_to_string(&mut content).unwrap(); + req.as_reader().read_to_string(&mut content)?; let OnlyId { id: delete_id } = serde_json::from_str(&content)?; if let Ok(instance) = RikRepository::find_one(connection, &delete_id, "/instance") { - let instance_def: InstanceDefinition = - serde_json::from_value(instance.value.clone()).unwrap(); + let instance_def: InstanceDefinition = serde_json::from_value(instance.value.clone())?; let workload_def_rs = RikRepository::find_one(connection, &instance_def.workload_id, "/workload"); @@ -134,16 +135,13 @@ pub fn delete( )) .with_status_code(tiny_http::StatusCode::from(404))); } - let workload_def: WorkloadDefinition = - serde_json::from_value(workload_def_rs.unwrap().value).unwrap(); - internal_sender - .send(ApiChannel { - action: Crud::Delete, - workload_id: Some(instance_def.workload_id), - workload_definition: Some(workload_def), - instance_id: Some(delete_id), - }) - .unwrap(); + let workload_def: WorkloadDefinition = serde_json::from_value(workload_def_rs?.value)?; + internal_sender.send(ApiChannel { + action: Crud::Delete, + workload_id: Some(instance_def.workload_id), + workload_definition: Some(workload_def), + instance_id: Some(delete_id), + })?; event!( Level::INFO, diff --git a/controller/src/api/external/routes/mod.rs b/controller/src/api/external/routes/mod.rs index 9d27ca5b..0a1c7eeb 100644 --- a/controller/src/api/external/routes/mod.rs +++ b/controller/src/api/external/routes/mod.rs @@ -1,11 +1,12 @@ use route_recognizer; use rusqlite::Connection; use std::io; +use std::str::FromStr; use std::sync::mpsc::Sender; use tiny_http::Method; +use tiny_http::Response; use tracing::{event, Level}; -use crate::api; use crate::api::ApiChannel; mod instance; @@ -17,7 +18,23 @@ type Handler = fn( &route_recognizer::Params, &Connection, &Sender, -) -> Result>>, api::RikError>; +) -> Result>>, anyhow::Error>; + +type HttpResult>> = Result, anyhow::Error>; + +pub enum ContentType { + JSON, +} + +impl Into for ContentType { + fn into(self) -> tiny_http::Header { + match self { + ContentType::JSON => { + tiny_http::Header::from_str("Content-Type: application/json").unwrap() + } + } + } +} pub struct Router { routes: Vec<(tiny_http::Method, route_recognizer::Router)>, diff --git a/controller/src/api/external/routes/tenant.rs b/controller/src/api/external/routes/tenant.rs index 8467b0b7..183b0985 100644 --- a/controller/src/api/external/routes/tenant.rs +++ b/controller/src/api/external/routes/tenant.rs @@ -1,11 +1,11 @@ use route_recognizer; use rusqlite::Connection; -use std::io; -use std::str::FromStr; use std::sync::mpsc::Sender; +use tiny_http::Header; use tracing::{event, Level}; -use crate::api; +use super::HttpResult; +use crate::api::external::routes::ContentType; use crate::api::external::services::element::elements_set_right_name; use crate::api::types::element::OnlyId; use crate::api::types::tenant::Tenant; @@ -17,13 +17,13 @@ pub fn get( _: &route_recognizer::Params, connection: &Connection, _: &Sender, -) -> Result>>, api::RikError> { +) -> HttpResult { if let Ok(mut tenants) = RikRepository::find_all(connection, "/tenant") { tenants = elements_set_right_name(tenants.clone()); - let tenants_json = serde_json::to_string(&tenants).unwrap(); + let tenants_json = serde_json::to_string(&tenants)?; event!(Level::INFO, "tenants.get, tenants found"); Ok(tiny_http::Response::from_string(tenants_json) - .with_header(tiny_http::Header::from_str("Content-Type: application/json").unwrap()) + .with_header::
(ContentType::JSON.into()) .with_status_code(tiny_http::StatusCode::from(200))) } else { Ok(tiny_http::Response::from_string("Cannot find tenant") @@ -36,15 +36,15 @@ pub fn create( _: &route_recognizer::Params, connection: &Connection, _: &Sender, -) -> Result>>, api::RikError> { +) -> HttpResult { let mut content = String::new(); - req.as_reader().read_to_string(&mut content).unwrap(); + req.as_reader().read_to_string(&mut content)?; let tenant: Tenant = serde_json::from_str(&content)?; if RikRepository::insert(connection, &tenant.name, &tenant.value).is_ok() { event!(Level::INFO, "Create tenant"); Ok(tiny_http::Response::from_string(content) - .with_header(tiny_http::Header::from_str("Content-Type: application/json").unwrap()) + .with_header::
(ContentType::JSON.into()) .with_status_code(tiny_http::StatusCode::from(200))) } else { event!(Level::ERROR, "Cannot create tenant"); @@ -58,13 +58,13 @@ pub fn delete( _: &route_recognizer::Params, connection: &Connection, _: &Sender, -) -> Result>>, api::RikError> { +) -> HttpResult { let mut content = String::new(); - req.as_reader().read_to_string(&mut content).unwrap(); + req.as_reader().read_to_string(&mut content)?; let OnlyId { id: delete_id } = serde_json::from_str(&content)?; if let Ok(tenant) = RikRepository::find_one(connection, &delete_id, "/tenant") { - RikRepository::delete(connection, &tenant.id).unwrap(); + RikRepository::delete(connection, &tenant.id)?; event!(Level::INFO, "Delete tenant"); Ok(tiny_http::Response::from_string("").with_status_code(tiny_http::StatusCode::from(204))) } else { diff --git a/controller/src/api/external/routes/workload.rs b/controller/src/api/external/routes/workload.rs index 6e56ae51..aad47a40 100644 --- a/controller/src/api/external/routes/workload.rs +++ b/controller/src/api/external/routes/workload.rs @@ -1,4 +1,5 @@ -use crate::api; +use super::HttpResult; +use crate::api::external::routes::ContentType; use crate::api::external::services::element::elements_set_right_name; use crate::api::types::element::OnlyId; use crate::api::{ApiChannel, Crud}; @@ -8,14 +9,10 @@ use definition::workload::WorkloadDefinition; use route_recognizer; use rusqlite::Connection; use serde_json::json; -use std::io; -use std::str::FromStr; use std::sync::mpsc::Sender; -use tiny_http::Response; +use tiny_http::Header; use tracing::{event, Level}; -type HttpResult>> = Result, api::RikError>; - pub fn get( _: &mut tiny_http::Request, _: &route_recognizer::Params, @@ -24,11 +21,11 @@ pub fn get( ) -> HttpResult { if let Ok(mut workloads) = RikRepository::find_all(connection, "/workload") { workloads = elements_set_right_name(workloads.clone()); - let workloads_json = serde_json::to_string(&workloads).unwrap(); + let workloads_json = serde_json::to_string(&workloads)?; event!(Level::INFO, "workloads.get, workloads found"); Ok(tiny_http::Response::from_string(workloads_json) - .with_header(tiny_http::Header::from_str("Content-Type: application/json").unwrap()) + .with_header::
(ContentType::JSON.into()) .with_status_code(tiny_http::StatusCode::from(200))) } else { Ok(tiny_http::Response::from_string("Cannot find workloads") @@ -65,7 +62,7 @@ pub fn get_instances( let instances_json = json!({ "instances": instances }).to_string(); return Ok(tiny_http::Response::from_string(instances_json) - .with_header(tiny_http::Header::from_str("Content-Type: application/json").unwrap()) + .with_header::
(ContentType::JSON.into()) .with_status_code(tiny_http::StatusCode::from(200))); } @@ -101,19 +98,17 @@ pub fn create( .with_status_code(tiny_http::StatusCode::from(404))); } - if let Ok(inserted_id) = RikRepository::insert( - connection, - &name, - &serde_json::to_string(&workload).unwrap(), - ) { + if let Ok(inserted_id) = + RikRepository::insert(connection, &name, &serde_json::to_string(&workload)?) + { let workload_id: OnlyId = OnlyId { id: inserted_id }; event!( Level::INFO, "workload.create, workload successfully created" ); Ok( - tiny_http::Response::from_string(serde_json::to_string(&workload_id).unwrap()) - .with_header(tiny_http::Header::from_str("Content-Type: application/json").unwrap()) + tiny_http::Response::from_string(serde_json::to_string(&workload_id)?) + .with_header::
(ContentType::JSON.into()) .with_status_code(tiny_http::StatusCode::from(200)), ) } else { diff --git a/controller/src/api/mod.rs b/controller/src/api/mod.rs index 16fb5d2e..61799ccb 100644 --- a/controller/src/api/mod.rs +++ b/controller/src/api/mod.rs @@ -3,6 +3,9 @@ pub mod types; use definition::workload::WorkloadDefinition; use std::fmt::{Debug, Display, Formatter, Result}; +use thiserror::Error; + +use crate::database::DatabaseError; #[derive(Debug)] pub enum Crud { @@ -20,45 +23,16 @@ impl From for Crud { } } -#[derive(Debug)] +#[derive(Debug, Error)] pub enum RikError { - IoError(std::io::Error), - HttpRequestError(serde_json::Error), - InternalCommunicationError(String), - InvalidName(String), -} -impl Display for RikError { - fn fmt(&self, f: &mut Formatter) -> Result { - match *self { - RikError::IoError(ref e) => write!(f, "{}", e), - RikError::HttpRequestError(ref e) => write!(f, "{}", e), - RikError::InternalCommunicationError(ref e) => write!(f, "{}", e), - RikError::InvalidName(ref e) => write!(f, "{}", e), - } - } -} + #[error("Database error {0}")] + DatabaseError(DatabaseError), -impl std::error::Error for RikError { - fn cause(&self) -> Option<&dyn std::error::Error> { - match *self { - RikError::IoError(ref e) => Some(e), - RikError::HttpRequestError(ref e) => Some(e), - // TODO: Implement other errors - _ => None, - } - } -} - -impl From for RikError { - fn from(e: std::io::Error) -> RikError { - RikError::IoError(e) - } -} + #[error("Internal communication error: {0}")] + InternalCommunicationError(String), -impl From for RikError { - fn from(e: serde_json::Error) -> RikError { - RikError::HttpRequestError(e) - } + #[error("Invalid name: {0}")] + InvalidName(String), } pub struct ApiChannel { diff --git a/controller/src/database/mod.rs b/controller/src/database/mod.rs index 09a78970..aae8771c 100644 --- a/controller/src/database/mod.rs +++ b/controller/src/database/mod.rs @@ -1,10 +1,22 @@ use crate::api::types::element::Element; use dotenv::dotenv; -use rusqlite::{params, Connection, Result}; +use rusqlite::{params, Connection}; use std::sync::Arc; +use thiserror::Error; use uuid::Uuid; +#[derive(Debug, Error)] +pub enum DatabaseError { + #[error("Sql Error: {0}")] + SqlError(rusqlite::Error), + + #[error("Io error: {0}")] + IoError(std::io::Error), +} + +type Result = std::result::Result; + #[allow(dead_code)] pub struct RikDataBase { name: String, @@ -19,15 +31,17 @@ impl RikDataBase { pub fn init_tables(&self) -> Result<()> { let connection = self.open()?; // only work with sqlite for now - connection.execute_batch( - "CREATE TABLE IF NOT EXISTS cluster ( + connection + .execute_batch( + "CREATE TABLE IF NOT EXISTS cluster ( id TEXT PRIMARY KEY, name TEXT NOT NULL, value BLOB NOT NULL ); CREATE INDEX IF NOT EXISTS cluster_name_index ON cluster (name); CREATE INDEX IF NOT EXISTS cluster_name_id_index ON cluster (name,id);", - )?; + ) + .map_err(DatabaseError::SqlError)?; Ok(()) } @@ -37,10 +51,10 @@ impl RikDataBase { dotenv().ok(); let file_path = std::env::var("DATABASE_LOCATION").unwrap_or("/var/lib/rik/data/".to_string()); - std::fs::create_dir_all(&file_path).unwrap(); + std::fs::create_dir_all(&file_path).map_err(DatabaseError::IoError)?; let database_path = format!("{}{}.db", file_path, self.name); - Connection::open(database_path) + Connection::open(database_path).map_err(DatabaseError::SqlError) } } @@ -58,34 +72,36 @@ impl RikRepository { } pub fn delete(connection: &Connection, id: &String) -> Result<()> { - connection.execute("DELETE FROM cluster WHERE id = (?1)", params![id])?; + connection + .execute("DELETE FROM cluster WHERE id = (?1)", params![id]) + .map_err(DatabaseError::SqlError)?; Ok(()) } pub fn find_one(connection: &Connection, id: &String, element_type: &str) -> Result { - let mut stmt = connection.prepare(&format!( - "SELECT id, name, value FROM cluster WHERE id = '{}' AND name LIKE '{}%'", - id, element_type - ))?; - match stmt.query_row([], |row| { + let mut stmt = connection + .prepare(&format!( + "SELECT id, name, value FROM cluster WHERE id = '{}' AND name LIKE '{}%'", + id, element_type + )) + .map_err(DatabaseError::SqlError)?; + stmt.query_row([], |row| { Ok(Element::new(row.get(0)?, row.get(1)?, row.get(2)?)) - }) { - Ok(element) => Ok(element), - Err(err) => Err(err), - } + }) + .map_err(DatabaseError::SqlError) } pub fn check_duplicate_name(connection: &Connection, name: &str) -> Result { - let mut stmt = connection.prepare(&format!( - "SELECT id, name, value FROM cluster WHERE name LIKE '{}%'", - name - ))?; - match stmt.query_row([], |row| { + let mut stmt = connection + .prepare(&format!( + "SELECT id, name, value FROM cluster WHERE name LIKE '{}%'", + name + )) + .map_err(DatabaseError::SqlError)?; + stmt.query_row([], |row| { Ok(Element::new(row.get(0)?, row.get(1)?, row.get(2)?)) - }) { - Ok(element) => Ok(element), - Err(err) => Err(err), - } + }) + .map_err(DatabaseError::SqlError) } // TODO: add pagination @@ -104,16 +120,18 @@ impl RikRepository { let mut elements: Vec = Vec::new(); for element in elements_iter { - elements.push(element?); + elements.push(element.map_err(DatabaseError::SqlError)?); } Ok(elements) } pub fn update(connection: &Connection, id: &String, value: &String) -> Result<()> { - connection.execute( - "UPDATE cluster SET value=(?1) WHERE id = (?2)", - params![value, id], - )?; + connection + .execute( + "UPDATE cluster SET value=(?1) WHERE id = (?2)", + params![value, id], + ) + .map_err(DatabaseError::SqlError)?; Ok(()) } @@ -133,7 +151,7 @@ impl RikRepository { "INSERT INTO cluster (id, name, value) VALUES (?1, ?2, ?3)", params![id, name, value], ) - .unwrap(); + .map_err(DatabaseError::SqlError)?; Ok(id.to_string()) } } @@ -141,8 +159,10 @@ impl RikRepository { #[cfg(test)] mod test { - use crate::database::{RikDataBase, RikRepository}; - use crate::tests::fixtures::db_connection; + use crate::{ + database::{RikDataBase, RikRepository}, + tests::fixtures::db_connection, + }; use rstest::rstest; use uuid::Uuid; diff --git a/controller/src/main.rs b/controller/src/main.rs index 8ba0b626..074d10fc 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -6,9 +6,9 @@ mod tests; use std::sync::mpsc::channel; use std::thread; -use crate::database::RikDataBase; +use crate::{api::RikError, database::RikDataBase}; use api::{external, ApiChannel}; -use tracing::{event, metadata::LevelFilter, Level}; +use tracing::{error, event, metadata::LevelFilter, Level}; use tracing_subscriber::{ fmt, prelude::__tracing_subscriber_SubscriberExt, util::SubscriberInitExt, EnvFilter, }; @@ -32,7 +32,9 @@ async fn main() { logger_setup(); event!(Level::INFO, "Starting Rik"); let db = RikDataBase::new(String::from("rik")); - db.init_tables().unwrap(); + if let Err(e) = db.init_tables() { + error!("Error while table initialization {}", e) + } let (legacy_sender, legacy_receiver) = channel::(); @@ -42,18 +44,26 @@ async fn main() { let external_api = external::Server::new(legacy_sender); let mut threads = Vec::new(); - threads.push(thread::spawn(move || { + threads.push(thread::spawn(move || -> Result<(), RikError> { let future = async move { internal_api.listen_notification(legacy_receiver).await }; Builder::new_multi_thread() .enable_all() .build() .unwrap() - .block_on(future) + .block_on(future); + Ok(()) })); - threads.push(thread::spawn(move || external_api.run(db))); + threads.push(thread::spawn(move || -> Result<(), RikError> { + external_api.run(db) + })); for thread in threads { - thread.join().unwrap(); + if let Err(e) = thread + .join() + .expect("Couldn't join on the associated thread") + { + error!("An error occured {}", e) + } } }