From 67ab8f98d7040d1d9fb57c2e800d9e1fe1bfc6fd Mon Sep 17 00:00:00 2001 From: bakytgerey_ashirbeko Date: Wed, 23 Oct 2024 21:27:47 +0600 Subject: [PATCH] Migrate ServerError to error_set --- Cargo.lock | 53 +++++ integration/tests/archiver/disk.rs | 7 +- server/Cargo.toml | 1 + server/src/archiver/disk.rs | 12 +- server/src/archiver/mod.rs | 8 +- server/src/archiver/s3.rs | 32 +-- .../samplers/message_sampler.rs | 6 +- .../samplers/retained_batch_sampler.rs | 6 +- .../message_conversion/schema_sampler.rs | 4 +- .../snapshots/message_snapshot.rs | 76 +++---- .../snapshots/retained_batch_snapshot.rs | 54 ++--- server/src/configs/config_provider.rs | 32 +-- server/src/configs/server.rs | 6 +- server/src/configs/validators.rs | 196 +++++++++--------- server/src/log/logger.rs | 12 +- server/src/quic/listener.rs | 6 +- server/src/server_error.rs | 133 +++++++----- server/src/tcp/connection_handler.rs | 12 +- 18 files changed, 379 insertions(+), 277 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index aec6d1e16..7dcd8beae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1317,6 +1317,15 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "document-features" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6969eaabd2421f8a2775cfd2471a2b634372b4a25d41e3bd647b79912850a0" +dependencies = [ + "litrs", +] + [[package]] name = "dtoa" version = "1.0.9" @@ -1329,6 +1338,15 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813" +[[package]] +name = "dyn-fmt" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c992f591dfce792a9bc2d1880ab67ffd4acc04551f8e551ca3b6233efb322f00" +dependencies = [ + "document-features", +] + [[package]] name = "either" version = "1.13.0" @@ -1383,6 +1401,28 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "error_set" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a8a70e1c5e3557e22af5af1e78f546303c9953638e60aee2c547322076cfabf" +dependencies = [ + "error_set_impl", +] + +[[package]] +name = "error_set_impl" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33f8c9888cc6d3349076683c776fc48d3b0b8685aa2ca05107cfa1df72445157" +dependencies = [ + "dyn-fmt", + "indices", + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "event-listener" version = "5.3.1" @@ -2127,6 +2167,12 @@ dependencies = [ "serde", ] +[[package]] +name = "indices" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e125c680c3871dc195cfd3ff94a877a78f31d5e9c1a3a62cce82689ece14bf7" + [[package]] name = "inlinable_string" version = "0.1.15" @@ -2352,6 +2398,12 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litrs" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5" + [[package]] name = "lock_api" version = "0.4.12" @@ -4015,6 +4067,7 @@ dependencies = [ "console-subscriber", "dashmap", "derive_more", + "error_set", "figlet-rs", "figment", "flume", diff --git a/integration/tests/archiver/disk.rs b/integration/tests/archiver/disk.rs index 2b9412854..4d40c1a77 100644 --- a/integration/tests/archiver/disk.rs +++ b/integration/tests/archiver/disk.rs @@ -1,7 +1,7 @@ use crate::archiver::DiskArchiverSetup; -use server::archiver::Archiver; use server::server_error::ServerError; use server::streaming::utils::file; +use server::{archiver::Archiver, server_error::ServerArchiverError}; use std::path::Path; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -89,7 +89,10 @@ async fn should_fail_when_file_to_archive_does_not_exist() { assert!(result.is_err()); let error = result.err().unwrap(); - assert!(matches!(error, ServerError::FileToArchiveNotFound(_))); + assert!(matches!( + error, + ServerArchiverError::FileToArchiveNotFound { .. } + )); } async fn create_file(path: &str, content: &str) { diff --git a/server/Cargo.toml b/server/Cargo.toml index b14db14c8..6eb81d096 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -25,6 +25,7 @@ clap = { version = "4.5.17", features = ["derive"] } console-subscriber = { version = "0.4.0", optional = true } dashmap = "6.0.1" derive_more = "1.0.0" +error_set = { version = "0.6.4" } figlet-rs = "0.1.5" figment = { version = "0.10.18", features = ["json", "toml", "env"] } flume = "0.11.0" diff --git a/server/src/archiver/disk.rs b/server/src/archiver/disk.rs index e9e2e9333..988310ade 100644 --- a/server/src/archiver/disk.rs +++ b/server/src/archiver/disk.rs @@ -1,6 +1,6 @@ use crate::archiver::Archiver; use crate::configs::server::DiskArchiverConfig; -use crate::server_error::ServerError; +use crate::server_error::ServerArchiverError; use async_trait::async_trait; use std::path::Path; use tokio::fs; @@ -19,7 +19,7 @@ impl DiskArchiver { #[async_trait] impl Archiver for DiskArchiver { - async fn init(&self) -> Result<(), ServerError> { + async fn init(&self) -> Result<(), ServerArchiverError> { if !Path::new(&self.config.path).exists() { info!("Creating disk archiver directory: {}", self.config.path); fs::create_dir_all(&self.config.path).await?; @@ -31,7 +31,7 @@ impl Archiver for DiskArchiver { &self, file: &str, base_directory: Option, - ) -> Result { + ) -> Result { debug!("Checking if file: {file} is archived on disk."); let base_directory = base_directory.as_deref().unwrap_or_default(); let path = Path::new(&self.config.path).join(base_directory).join(file); @@ -44,13 +44,15 @@ impl Archiver for DiskArchiver { &self, files: &[&str], base_directory: Option, - ) -> Result<(), ServerError> { + ) -> Result<(), ServerArchiverError> { debug!("Archiving files on disk: {:?}", files); for file in files { debug!("Archiving file: {file}"); let source = Path::new(file); if !source.exists() { - return Err(ServerError::FileToArchiveNotFound(file.to_string())); + return Err(ServerArchiverError::FileToArchiveNotFound { + file_path: file.to_string(), + }); } let base_directory = base_directory.as_deref().unwrap_or_default(); diff --git a/server/src/archiver/mod.rs b/server/src/archiver/mod.rs index 96dba7de2..f5d95ae7b 100644 --- a/server/src/archiver/mod.rs +++ b/server/src/archiver/mod.rs @@ -1,7 +1,7 @@ pub mod disk; pub mod s3; -use crate::server_error::ServerError; +use crate::server_error::ServerArchiverError; use async_trait::async_trait; use derive_more::Display; use serde::{Deserialize, Serialize}; @@ -31,17 +31,17 @@ impl FromStr for ArchiverKind { #[async_trait] pub trait Archiver: Sync + Send { - async fn init(&self) -> Result<(), ServerError>; + async fn init(&self) -> Result<(), ServerArchiverError>; async fn is_archived( &self, file: &str, base_directory: Option, - ) -> Result; + ) -> Result; async fn archive( &self, files: &[&str], base_directory: Option, - ) -> Result<(), ServerError>; + ) -> Result<(), ServerArchiverError>; } impl Debug for dyn Archiver { diff --git a/server/src/archiver/s3.rs b/server/src/archiver/s3.rs index 8c5c6f569..0d3998c8f 100644 --- a/server/src/archiver/s3.rs +++ b/server/src/archiver/s3.rs @@ -1,6 +1,6 @@ use crate::archiver::Archiver; use crate::configs::server::S3ArchiverConfig; -use crate::server_error::ServerError; +use crate::server_error::ServerArchiverError; use crate::streaming::utils::file; use async_trait::async_trait; use s3::creds::Credentials; @@ -16,7 +16,7 @@ pub struct S3Archiver { } impl S3Archiver { - pub fn new(config: S3ArchiverConfig) -> Result { + pub fn new(config: S3ArchiverConfig) -> Result { let credentials = Credentials::new( Some(&config.key_id), Some(&config.key_secret), @@ -24,7 +24,7 @@ impl S3Archiver { None, None, ) - .map_err(|_| ServerError::InvalidS3Credentials)?; + .map_err(|_| ServerArchiverError::InvalidS3Credentials)?; let bucket = Bucket::new( &config.bucket, @@ -42,14 +42,14 @@ impl S3Archiver { }, credentials, ) - .map_err(|_| ServerError::CannotInitializeS3Archiver)?; + .map_err(|_| ServerArchiverError::CannotInitializeS3Archiver)?; Ok(Self { bucket, tmp_upload_dir: config.tmp_upload_dir, }) } - async fn copy_file_to_tmp(&self, path: &str) -> Result { + async fn copy_file_to_tmp(&self, path: &str) -> Result { debug!( "Copying file: {path} to temporary S3 upload directory: {}", self.tmp_upload_dir @@ -68,11 +68,11 @@ impl S3Archiver { #[async_trait] impl Archiver for S3Archiver { - async fn init(&self) -> Result<(), ServerError> { + async fn init(&self) -> Result<(), ServerArchiverError> { let response = self.bucket.list("/".to_string(), None).await; if let Err(error) = response { error!("Cannot initialize S3 archiver: {error}"); - return Err(ServerError::CannotInitializeS3Archiver); + return Err(ServerArchiverError::CannotInitializeS3Archiver); } if Path::new(&self.tmp_upload_dir).exists() { @@ -94,7 +94,7 @@ impl Archiver for S3Archiver { &self, file: &str, base_directory: Option, - ) -> Result { + ) -> Result { debug!("Checking if file: {file} is archived on S3."); let base_directory = base_directory.as_deref().unwrap_or_default(); let destination = Path::new(&base_directory).join(file); @@ -119,10 +119,12 @@ impl Archiver for S3Archiver { &self, files: &[&str], base_directory: Option, - ) -> Result<(), ServerError> { + ) -> Result<(), ServerArchiverError> { for path in files { if !Path::new(path).exists() { - return Err(ServerError::FileToArchiveNotFound(path.to_string())); + return Err(ServerArchiverError::FileToArchiveNotFound { + file_path: path.to_string(), + }); } let source = self.copy_file_to_tmp(path).await?; @@ -137,8 +139,9 @@ impl Archiver for S3Archiver { .await; if let Err(error) = response { error!("Cannot archive file: {path} on S3: {}", error); - fs::remove_file(&source).await?; - return Err(ServerError::CannotArchiveFile(path.to_string())); + return Err(ServerArchiverError::CannotArchiveFile { + file_path: path.to_string(), + }); } let response = response.unwrap(); @@ -150,8 +153,9 @@ impl Archiver for S3Archiver { } error!("Cannot archive file: {path} on S3, received an invalid status code: {status}."); - fs::remove_file(&source).await?; - return Err(ServerError::CannotArchiveFile(path.to_string())); + return Err(ServerArchiverError::CannotArchiveFile { + file_path: path.to_string(), + }); } Ok(()) } diff --git a/server/src/compat/message_conversion/samplers/message_sampler.rs b/server/src/compat/message_conversion/samplers/message_sampler.rs index 4e6e98afe..4b3cd6dfa 100644 --- a/server/src/compat/message_conversion/samplers/message_sampler.rs +++ b/server/src/compat/message_conversion/samplers/message_sampler.rs @@ -1,7 +1,7 @@ use crate::compat::message_conversion::binary_schema::BinarySchema; use crate::compat::message_conversion::schema_sampler::BinarySchemaSampler; use crate::compat::message_conversion::snapshots::message_snapshot::MessageSnapshot; -use crate::server_error::ServerError; +use crate::server_error::ServerCompatError; use crate::streaming::utils::file; use async_trait::async_trait; use bytes::{BufMut, BytesMut}; @@ -27,7 +27,7 @@ unsafe impl Sync for MessageSampler {} #[async_trait] impl BinarySchemaSampler for MessageSampler { - async fn try_sample(&self) -> Result { + async fn try_sample(&self) -> Result { let mut index_file = file::open(&self.index_path).await?; let mut log_file = file::open(&self.log_path).await?; let log_file_size = log_file.metadata().await?.len(); @@ -46,7 +46,7 @@ impl BinarySchemaSampler for MessageSampler { let message = MessageSnapshot::try_from(buffer.freeze())?; if message.offset != self.segment_start_offset { - return Err(ServerError::InvalidMessageOffsetFormatConversion); + return Err(ServerCompatError::InvalidMessageOffsetFormatConversion); } Ok(BinarySchema::RetainedMessageSchema) } diff --git a/server/src/compat/message_conversion/samplers/retained_batch_sampler.rs b/server/src/compat/message_conversion/samplers/retained_batch_sampler.rs index a05fb191c..e50e46799 100644 --- a/server/src/compat/message_conversion/samplers/retained_batch_sampler.rs +++ b/server/src/compat/message_conversion/samplers/retained_batch_sampler.rs @@ -1,7 +1,7 @@ use crate::compat::message_conversion::binary_schema::BinarySchema; use crate::compat::message_conversion::schema_sampler::BinarySchemaSampler; use crate::compat::message_conversion::snapshots::retained_batch_snapshot::RetainedMessageBatchSnapshot; -use crate::server_error::ServerError; +use crate::server_error::ServerCompatError; use crate::streaming::utils::file; use async_trait::async_trait; use bytes::{BufMut, Bytes}; @@ -32,7 +32,7 @@ unsafe impl Sync for RetainedMessageBatchSampler {} #[async_trait] impl BinarySchemaSampler for RetainedMessageBatchSampler { - async fn try_sample(&self) -> Result { + async fn try_sample(&self) -> Result { let mut index_file = file::open(&self.index_path).await?; let mut log_file = file::open(&self.log_path).await?; let log_file_size = log_file.metadata().await?.len(); @@ -56,7 +56,7 @@ impl BinarySchemaSampler for RetainedMessageBatchSampler { } let batch = RetainedMessageBatchSnapshot::try_from(Bytes::from(buffer))?; if batch.base_offset != self.segment_start_offset { - return Err(ServerError::InvalidBatchBaseOffsetFormatConversion); + return Err(ServerCompatError::InvalidBatchBaseOffsetFormatConversion); } Ok(BinarySchema::RetainedMessageBatchSchema) } diff --git a/server/src/compat/message_conversion/schema_sampler.rs b/server/src/compat/message_conversion/schema_sampler.rs index 6c1f86f14..dad456adb 100644 --- a/server/src/compat/message_conversion/schema_sampler.rs +++ b/server/src/compat/message_conversion/schema_sampler.rs @@ -1,8 +1,8 @@ use crate::compat::message_conversion::binary_schema::BinarySchema; -use crate::server_error::ServerError; +use crate::server_error::ServerCompatError; use async_trait::async_trait; #[async_trait] pub trait BinarySchemaSampler: Send + Sync { - async fn try_sample(&self) -> Result; + async fn try_sample(&self) -> Result; } diff --git a/server/src/compat/message_conversion/snapshots/message_snapshot.rs b/server/src/compat/message_conversion/snapshots/message_snapshot.rs index dd425389b..3bea1be5d 100644 --- a/server/src/compat/message_conversion/snapshots/message_snapshot.rs +++ b/server/src/compat/message_conversion/snapshots/message_snapshot.rs @@ -1,5 +1,5 @@ use crate::compat::message_conversion::message_converter::Extendable; -use crate::server_error::ServerError; +use crate::server_error::ServerCompatError; use crate::streaming::sizeable::Sizeable; use bytes::{BufMut, Bytes, BytesMut}; use iggy::bytes_serializable::BytesSerializable; @@ -76,62 +76,62 @@ impl Sizeable for MessageSnapshot { } impl TryFrom for MessageSnapshot { - type Error = ServerError; + type Error = ServerCompatError; fn try_from(value: Bytes) -> Result { let offset = u64::from_le_bytes( value .get(0..8) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid offset bytes for message snapshot".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid offset bytes for message snapshot".to_owned(), + }, + )? .try_into()?, ); let state = MessageState::from_code(*value.get(8).ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid state for message snapshot".to_owned(), - ) + ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid state for message snapshot".to_owned(), + } })?)?; let timestamp = u64::from_le_bytes( value .get(9..17) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid timestamp bytes for message snapshot".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid timestamp bytes for message snapshot".to_owned(), + }, + )? .try_into()?, ); let id = u128::from_le_bytes( value .get(17..33) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid id bytes for message snapshot".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid id bytes for message snapshot".to_owned(), + }, + )? .try_into()?, ); let checksum = u32::from_le_bytes( value .get(33..37) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid checksum bytes for message snapshot".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid checksum bytes for message snapshot".to_owned(), + }, + )? .try_into()?, ); let headers_length = u32::from_le_bytes( value .get(37..41) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid headers_length bytes for message snapshot".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid headers_length bytes for message snapshot".to_owned(), + }, + )? .try_into()?, ); let headers = match headers_length { @@ -140,9 +140,9 @@ impl TryFrom for MessageSnapshot { let headers_payload = &value[41..(41 + headers_length as usize)]; let headers = HashMap::from_bytes(Bytes::copy_from_slice(headers_payload)) .map_err(|_| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid headers bytes for message snapshot".to_owned(), - ) + ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid headers bytes for message snapshot".to_owned(), + } })?; Some(headers) } @@ -152,11 +152,11 @@ impl TryFrom for MessageSnapshot { let payload_length = u32::from_le_bytes( value .get(position..(position + 4)) - .ok_or_else(|| { - ServerError::InvalidMessageFieldFormatConversionSampling( - "Invalid payload bytes for message snapshot".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::InvalidMessageFieldFormatConversionSampling { + reason: "Invalid payload bytes for message snapshot".to_owned(), + }, + )? .try_into()?, ); let payload = diff --git a/server/src/compat/message_conversion/snapshots/retained_batch_snapshot.rs b/server/src/compat/message_conversion/snapshots/retained_batch_snapshot.rs index f73d98638..62124a29f 100644 --- a/server/src/compat/message_conversion/snapshots/retained_batch_snapshot.rs +++ b/server/src/compat/message_conversion/snapshots/retained_batch_snapshot.rs @@ -1,6 +1,6 @@ use super::message_snapshot::MessageSnapshot; use crate::compat::message_conversion::message_converter::Extendable; -use crate::server_error::ServerError; +use crate::server_error::ServerCompatError; use crate::streaming::sizeable::Sizeable; use bytes::{BufMut, Bytes, BytesMut}; use iggy::error::IggyError; @@ -77,56 +77,56 @@ impl Extendable for RetainedMessageBatchSnapshot { } impl TryFrom for RetainedMessageBatchSnapshot { - type Error = ServerError; + type Error = ServerCompatError; fn try_from(value: Bytes) -> Result { let base_offset = u64::from_le_bytes( value .get(0..8) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch base offset".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::CannotReadMessageBatchFormatConversion { + reason: "Failed to read batch base offset".to_owned(), + }, + )? .try_into()?, ); let length = u32::from_le_bytes( value .get(8..12) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch length".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::CannotReadMessageBatchFormatConversion { + reason: "Failed to read batch length".to_owned(), + }, + )? .try_into()?, ); let last_offset_delta = u32::from_le_bytes( value .get(12..16) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch last_offset_delta".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::CannotReadMessageBatchFormatConversion { + reason: "Failed to read batch last_offset_delta".to_owned(), + }, + )? .try_into()?, ); let max_timestamp = u64::from_le_bytes( value .get(16..24) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch max_timestamp".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::CannotReadMessageBatchFormatConversion { + reason: "Failed to read batch max_timestamp".to_owned(), + }, + )? .try_into()?, ); let bytes = Bytes::from( value .get(24..length as usize) - .ok_or_else(|| { - ServerError::CannotReadMessageBatchFormatConversion( - "Failed to read batch payload".to_owned(), - ) - })? + .ok_or_else( + || ServerCompatError::CannotReadMessageBatchFormatConversion { + reason: "Failed to read batch payload".to_owned(), + }, + )? .to_owned(), ); Ok(RetainedMessageBatchSnapshot { diff --git a/server/src/configs/config_provider.rs b/server/src/configs/config_provider.rs index 6e28ee134..802f147cb 100644 --- a/server/src/configs/config_provider.rs +++ b/server/src/configs/config_provider.rs @@ -1,5 +1,5 @@ use crate::configs::server::ServerConfig; -use crate::server_error::ServerError; +use crate::server_error::ServerConfigError; use crate::IGGY_ROOT_PASSWORD_ENV; use async_trait::async_trait; use figment::{ @@ -24,7 +24,7 @@ const SECRET_KEYS: [&str; 6] = [ #[async_trait] pub trait ConfigProvider { - async fn load_config(&self) -> Result; + async fn load_config(&self) -> Result; } #[derive(Debug)] @@ -212,16 +212,16 @@ impl Provider for CustomEnvProvider { } } -pub fn resolve(config_provider_type: &str) -> Result, ServerError> { +pub fn resolve(config_provider_type: &str) -> Result, ServerConfigError> { match config_provider_type { DEFAULT_CONFIG_PROVIDER => { let path = env::var("IGGY_CONFIG_PATH").unwrap_or_else(|_| DEFAULT_CONFIG_PATH.to_string()); Ok(Box::new(FileConfigProvider::new(path))) } - _ => Err(ServerError::InvalidConfigurationProvider( - config_provider_type.to_string(), - )), + _ => Err(ServerConfigError::InvalidConfigurationProvider { + provider_type: config_provider_type.to_string(), + }), } } @@ -254,14 +254,13 @@ fn file_exists>(path: P) -> bool { #[async_trait] impl ConfigProvider for FileConfigProvider { - async fn load_config(&self) -> Result { + async fn load_config(&self) -> Result { println!("Loading config from path: '{}'...", self.path); if !file_exists(&self.path) { - return Err(ServerError::CannotLoadConfiguration(format!( - "Cannot find configuration file at path: '{}'.", - self.path, - ))); + return Err(ServerConfigError::CannotLoadConfiguration { + reason: format!("Cannot find configuration file at path: '{}'.", self.path,), + }); } let config_builder = Figment::new(); @@ -270,7 +269,9 @@ impl ConfigProvider for FileConfigProvider { "json" => config_builder.merge(Json::file(&self.path)), "toml" => config_builder.merge(Toml::file(&self.path)), e => { - return Err(ServerError::CannotLoadConfiguration(format!("Cannot load configuration: invalid file extension: {e}, only .json and .toml are supported."))); + return Err(ServerConfigError::CannotLoadConfiguration { + reason: format!("Cannot load configuration: invalid file extension: {e}, only .json and .toml are supported.")} + ); } }; @@ -284,10 +285,9 @@ impl ConfigProvider for FileConfigProvider { println!("Using Config: {config}"); Ok(config) } - Err(figment_error) => Err(ServerError::CannotLoadConfiguration(format!( - "Failed to load configuration: {}", - figment_error - ))), + Err(figment_error) => Err(ServerConfigError::CannotLoadConfiguration { + reason: format!("Failed to load configuration: {figment_error}"), + }), } } } diff --git a/server/src/configs/server.rs b/server/src/configs/server.rs index 0fe24d5da..a2890753b 100644 --- a/server/src/configs/server.rs +++ b/server/src/configs/server.rs @@ -4,7 +4,7 @@ use crate::configs::http::HttpConfig; use crate::configs::quic::QuicConfig; use crate::configs::system::SystemConfig; use crate::configs::tcp::TcpConfig; -use crate::server_error::ServerError; +use crate::server_error::ServerConfigError; use derive_more::Display; use iggy::utils::duration::IggyDuration; use iggy::validatable::Validatable; @@ -148,7 +148,9 @@ impl FromStr for TelemetryTransport { } impl ServerConfig { - pub async fn load(config_provider: &dyn ConfigProvider) -> Result { + pub async fn load( + config_provider: &dyn ConfigProvider, + ) -> Result { let server_config = config_provider.load_config().await?; server_config.validate()?; Ok(server_config) diff --git a/server/src/configs/validators.rs b/server/src/configs/validators.rs index 3f0a1d19f..3b1387b68 100644 --- a/server/src/configs/validators.rs +++ b/server/src/configs/validators.rs @@ -8,7 +8,7 @@ use super::system::CompressionConfig; use crate::archiver::ArchiverKind; use crate::configs::server::{PersonalAccessTokenConfig, ServerConfig}; use crate::configs::system::{CacheConfig, SegmentConfig}; -use crate::server_error::ServerError; +use crate::server_error::ServerConfigError; use crate::streaming::segments::segment; use iggy::compression::compression_algorithm::CompressionAlgorithm; use iggy::utils::byte_size::IggyByteSize; @@ -18,8 +18,8 @@ use iggy::validatable::Validatable; use sysinfo::{Pid, ProcessesToUpdate, System}; use tracing::{info, warn}; -impl Validatable for ServerConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for ServerConfig { + fn validate(&self) -> Result<(), ServerConfigError> { self.data_maintenance.validate()?; self.personal_access_token.validate()?; self.system.segment.validate()?; @@ -30,39 +30,39 @@ impl Validatable for ServerConfig { let topic_size = match self.system.topic.max_size { MaxTopicSize::Custom(size) => Ok(size.as_bytes_u64()), MaxTopicSize::Unlimited => Ok(u64::MAX), - MaxTopicSize::ServerDefault => Err(ServerError::InvalidConfiguration( - "Max topic size cannot be set to server default.".into(), - )), + MaxTopicSize::ServerDefault => Err(ServerConfigError::InvalidConfiguration { + reason: "Max topic size cannot be set to server default.".into(), + }), }?; if let IggyExpiry::ServerDefault = self.system.segment.message_expiry { - return Err(ServerError::InvalidConfiguration( - "Message expiry cannot be set to server default.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Message expiry cannot be set to server default.".into(), + }); } if self.http.enabled { if let IggyExpiry::ServerDefault = self.http.jwt.access_token_expiry { - return Err(ServerError::InvalidConfiguration( - "Access token expiry cannot be set to server default.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Access token expiry cannot be set to server default.".into(), + }); } } if topic_size < self.system.segment.size.as_bytes_u64() { - return Err(ServerError::InvalidConfiguration(format!( - "Max topic size cannot be lower than segment size. Max topic size: {}, segment size: {}.", - topic_size, - self.system.segment.size - ))); + return Err(ServerConfigError::InvalidConfiguration { + reason: format!("Max topic size cannot be lower than segment size. Max topic size: {}, segment size: {}.", + topic_size, self.system.segment.size, + ) + }); } Ok(()) } } -impl Validatable for CompressionConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for CompressionConfig { + fn validate(&self) -> Result<(), ServerConfigError> { let compression_alg = &self.default_algorithm; if *compression_alg != CompressionAlgorithm::None { // TODO(numinex): Change this message once server side compression is fully developed. @@ -76,36 +76,36 @@ impl Validatable for CompressionConfig { } } -impl Validatable for TelemetryConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for TelemetryConfig { + fn validate(&self) -> Result<(), ServerConfigError> { if !self.enabled { return Ok(()); } if self.service_name.trim().is_empty() { - return Err(ServerError::InvalidConfiguration( - "Telemetry service name cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Telemetry service name cannot be empty.".into(), + }); } if self.logs.endpoint.is_empty() { - return Err(ServerError::InvalidConfiguration( - "Telemetry logs endpoint cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Telemetry logs endpoint cannot be empty.".into(), + }); } if self.traces.endpoint.is_empty() { - return Err(ServerError::InvalidConfiguration( - "Telemetry traces endpoint cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Telemetry traces endpoint cannot be empty.".into(), + }); } Ok(()) } } -impl Validatable for CacheConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for CacheConfig { + fn validate(&self) -> Result<(), ServerConfigError> { let limit_bytes = self.size.clone().into(); let mut sys = System::new_all(); sys.refresh_all(); @@ -122,10 +122,12 @@ impl Validatable for CacheConfig { let pretty_free_memory = IggyByteSize::from(free_memory).as_human_string(); if limit_bytes > total_memory { - return Err(ServerError::CacheConfigValidationFailure(format!( - "Requested cache size exceeds 100% of total memory. Requested: {} ({:.2}% of total memory: {}).", - pretty_cache_limit, cache_percentage, pretty_total_memory - ))); + return Err(ServerConfigError::CacheConfigValidationFailure { + reason: format!( + "Requested cache size exceeds 100% of total memory. Requested: {} ({:.2}% of total memory: {}).", + pretty_cache_limit, cache_percentage, pretty_total_memory + ) + }); } if limit_bytes > (total_memory as f64 * 0.75) as u64 { @@ -148,33 +150,36 @@ impl Validatable for CacheConfig { } } -impl Validatable for SegmentConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for SegmentConfig { + fn validate(&self) -> Result<(), ServerConfigError> { if self.size.as_bytes_u64() as u32 > segment::MAX_SIZE_BYTES { - return Err(ServerError::InvalidConfiguration(format!( - "Segment size cannot be greater than: {} bytes.", - segment::MAX_SIZE_BYTES - ))); + return Err(ServerConfigError::InvalidConfiguration { + reason: format!( + "Segment size cannot be greater than: {} bytes.", + segment::MAX_SIZE_BYTES + ), + }); } Ok(()) } } -impl Validatable for MessageSaverConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for MessageSaverConfig { + fn validate(&self) -> Result<(), ServerConfigError> { if self.enabled && self.interval.is_zero() { - return Err(ServerError::InvalidConfiguration( - "Message saver interval size cannot be zero, it must be greater than 0.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Message saver interval size cannot be zero, it must be greater than 0." + .into(), + }); } Ok(()) } } -impl Validatable for DataMaintenanceConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for DataMaintenanceConfig { + fn validate(&self) -> Result<(), ServerConfigError> { self.archiver.validate()?; self.messages.validate()?; self.state.validate()?; @@ -182,8 +187,8 @@ impl Validatable for DataMaintenanceConfig { } } -impl Validatable for ArchiverConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for ArchiverConfig { + fn validate(&self) -> Result<(), ServerConfigError> { if !self.enabled { return Ok(()); } @@ -191,57 +196,57 @@ impl Validatable for ArchiverConfig { return match self.kind { ArchiverKind::Disk => { if self.disk.is_none() { - return Err(ServerError::InvalidConfiguration( - "Disk archiver configuration is missing.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Disk archiver configuration is missing.".into(), + }); } let disk = self.disk.as_ref().unwrap(); if disk.path.is_empty() { - return Err(ServerError::InvalidConfiguration( - "Disk archiver path cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Disk archiver path cannot be empty.".into(), + }); } Ok(()) } ArchiverKind::S3 => { if self.s3.is_none() { - return Err(ServerError::InvalidConfiguration( - "S3 archiver configuration is missing.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "S3 archiver configuration is missing.".into(), + }); } let s3 = self.s3.as_ref().unwrap(); if s3.key_id.is_empty() { - return Err(ServerError::InvalidConfiguration( - "S3 archiver key id cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "S3 archiver key id cannot be empty.".into(), + }); } if s3.key_secret.is_empty() { - return Err(ServerError::InvalidConfiguration( - "S3 archiver key secret cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "S3 archiver key secret cannot be empty.".into(), + }); } if s3.endpoint.is_none() && s3.region.is_none() { - return Err(ServerError::InvalidConfiguration( - "S3 archiver endpoint or region must be set.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "S3 archiver endpoint or region must be set.".into(), + }); } if s3.endpoint.as_deref().unwrap_or_default().is_empty() && s3.region.as_deref().unwrap_or_default().is_empty() { - return Err(ServerError::InvalidConfiguration( - "S3 archiver region or endpoint cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "S3 archiver region or endpoint cannot be empty.".into(), + }); } if s3.bucket.is_empty() { - return Err(ServerError::InvalidConfiguration( - "S3 archiver bucket cannot be empty.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "S3 archiver bucket cannot be empty.".into(), + }); } Ok(()) } @@ -249,44 +254,47 @@ impl Validatable for ArchiverConfig { } } -impl Validatable for MessagesMaintenanceConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for MessagesMaintenanceConfig { + fn validate(&self) -> Result<(), ServerConfigError> { if self.archiver_enabled && self.interval.is_zero() { - return Err(ServerError::InvalidConfiguration( - "Message maintenance interval size cannot be zero, it must be greater than 0." - .into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: + "Message maintenance interval size cannot be zero, it must be greater than 0." + .into(), + }); } Ok(()) } } -impl Validatable for StateMaintenanceConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for StateMaintenanceConfig { + fn validate(&self) -> Result<(), ServerConfigError> { if self.archiver_enabled && self.interval.is_zero() { - return Err(ServerError::InvalidConfiguration( - "State maintenance interval size cannot be zero, it must be greater than 0.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: + "State maintenance interval size cannot be zero, it must be greater than 0." + .into(), + }); } Ok(()) } } -impl Validatable for PersonalAccessTokenConfig { - fn validate(&self) -> Result<(), ServerError> { +impl Validatable for PersonalAccessTokenConfig { + fn validate(&self) -> Result<(), ServerConfigError> { if self.max_tokens_per_user == 0 { - return Err(ServerError::InvalidConfiguration( - "Max tokens per user cannot be zero, it must be greater than 0.".into(), - )); + return Err(ServerConfigError::InvalidConfiguration { + reason: "Max tokens per user cannot be zero, it must be greater than 0.".into(), + }); } if self.cleaner.enabled && self.cleaner.interval.is_zero() { - return Err(ServerError::InvalidConfiguration( - "Personal access token cleaner interval cannot be zero, it must be greater than 0." + return Err(ServerConfigError::InvalidConfiguration { + reason: "Personal access token cleaner interval cannot be zero, it must be greater than 0." .into(), - )); + }); } Ok(()) diff --git a/server/src/log/logger.rs b/server/src/log/logger.rs index b9605d415..c08666785 100644 --- a/server/src/log/logger.rs +++ b/server/src/log/logger.rs @@ -1,6 +1,6 @@ use crate::configs::server::{TelemetryConfig, TelemetryTransport}; use crate::configs::system::LoggingConfig; -use crate::server_error::ServerError; +use crate::server_error::ServerLogError; use opentelemetry::logs::LoggerProvider as _; use opentelemetry::trace::TracerProvider as _; use opentelemetry::{global, KeyValue}; @@ -254,7 +254,7 @@ impl Logging { &mut self, base_directory: String, config: &LoggingConfig, - ) -> Result<(), ServerError> { + ) -> Result<(), ServerLogError> { // Write to stdout and file at the same time. // Use the non_blocking appender to avoid blocking the threads. // Use the rolling appender to avoid having a huge log file. @@ -266,13 +266,13 @@ impl Logging { self.filtering_stdout_reload_handle .as_ref() - .ok_or(ServerError::FilterReloadFailure)? + .ok_or(ServerLogError::FilterReloadFailure)? .modify(|layer| *layer = filtering_level.boxed()) .expect("Failed to modify stdout filtering layer"); self.filtering_file_reload_handle .as_ref() - .ok_or(ServerError::FilterReloadFailure)? + .ok_or(ServerLogError::FilterReloadFailure)? .modify(|layer| *layer = filtering_level.boxed()) .expect("Failed to modify file filtering layer"); @@ -286,7 +286,7 @@ impl Logging { self.stdout_reload_handle .as_ref() - .ok_or(ServerError::StdoutReloadFailure)? + .ok_or(ServerLogError::StdoutReloadFailure)? .modify(|layer| *layer = stdout_layer) .expect("Failed to modify stdout layer"); @@ -312,7 +312,7 @@ impl Logging { self.file_guard = Some(file_guard); self.file_reload_handle .as_ref() - .ok_or(ServerError::FileReloadFailure)? + .ok_or(ServerLogError::FileReloadFailure)? .modify(|layer| *layer = file_layer) .expect("Failed to modify file layer"); let level = filtering_level.to_string(); diff --git a/server/src/quic/listener.rs b/server/src/quic/listener.rs index a0dfa060b..9ded6127a 100644 --- a/server/src/quic/listener.rs +++ b/server/src/quic/listener.rs @@ -1,7 +1,7 @@ use crate::binary::command; use crate::command::ServerCommand; use crate::quic::quic_sender::QuicSender; -use crate::server_error::ServerError; +use crate::server_error::ServerConnectionError; use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; @@ -48,7 +48,7 @@ pub fn start(endpoint: Endpoint, system: SharedSystem) { async fn handle_connection( incoming_connection: quinn::Connecting, system: SharedSystem, -) -> Result<(), ServerError> { +) -> Result<(), ServerConnectionError> { let connection = incoming_connection.await?; let address = connection.remote_address(); info!("Client has connected: {address}"); @@ -79,7 +79,7 @@ async fn accept_stream( connection: &Connection, system: &SharedSystem, client_id: u32, -) -> Result, ServerError> { +) -> Result, ServerConnectionError> { match connection.accept_bi().await { Err(quinn::ConnectionError::ApplicationClosed { .. }) => { info!("Connection closed"); diff --git a/server/src/server_error.rs b/server/src/server_error.rs index e1336eeec..d08935921 100644 --- a/server/src/server_error.rs +++ b/server/src/server_error.rs @@ -1,56 +1,85 @@ +use error_set::error_set; use quinn::{ConnectionError, ReadToEndError, WriteError}; use std::array::TryFromSliceError; -use thiserror::Error; use tokio::io; -#[derive(Debug, Error)] -pub enum ServerError { - #[error("IO error")] - IoError(#[from] io::Error), - #[error("Connection error")] - ConnectionError(#[from] ConnectionError), - #[error("Invalid configuration provider: {0}")] - InvalidConfigurationProvider(String), - #[error("Cannot load configuration: {0}")] - CannotLoadConfiguration(String), - #[error("Invalid configuration: {0}")] - InvalidConfiguration(String), - #[error("SDK error")] - SdkError(#[from] iggy::error::IggyError), - #[error("Write error")] - WriteError(#[from] WriteError), - #[error("Read to end error")] - ReadToEndError(#[from] ReadToEndError), - #[error("Try from slice error")] - TryFromSliceError(#[from] TryFromSliceError), - #[error("Logging filter reload failure")] - FilterReloadFailure, - #[error("Logging stdout reload failure")] - StdoutReloadFailure, - #[error("Logging file reload failure")] - FileReloadFailure, - #[error("Cache config validation failure: {0}")] - CacheConfigValidationFailure(String), - #[error("Command length error: {0}")] - CommandLengthError(String), - #[error("Cannot read message, when performing format conversion, {0}")] - InvalidMessageFieldFormatConversionSampling(String), - #[error("Invalid message offset, when performing format conversion")] - InvalidMessageOffsetFormatConversion, - #[error("Invalid batch base offset, when performing format conversion")] - InvalidBatchBaseOffsetFormatConversion, - #[error("Cannot read message batch, when performing format conversion, {0}")] - CannotReadMessageBatchFormatConversion(String), - #[error("Cannot remove old segment files")] - CannotRemoveOldSegmentFiles, - #[error("Cannot persist new segment files")] - CannotPersistNewSegmentFiles, - #[error("Cannot archive file: {0}")] - CannotArchiveFile(String), - #[error("Cannot initialize S3 archiver")] - CannotInitializeS3Archiver, - #[error("Invalid S3 credentials")] - InvalidS3Credentials, - #[error("File to archive not found: {0}")] - FileToArchiveNotFound(String), -} +error_set!( + ServerError = ServerConfigError || ServerArchiverError || ServerConnectionError || ServerLogError || ServerCompatError; + + ServerIoError = { + #[display("IO error")] + IoError(io::Error), + + #[display("Write error")] + WriteError(WriteError), + + #[display("Read to end error")] + ReadToEndError(ReadToEndError) + }; + + ServerConfigError = { + #[display("Invalid configuration provider: {}", provider_type)] + InvalidConfigurationProvider { provider_type: String }, + + #[display("Cannot load configuration: {}", reason)] + CannotLoadConfiguration { reason: String }, + + #[display("Invalid configuration: {}", reason)] + InvalidConfiguration { reason: String }, + + #[display("Cache config validation failure: {}", reason)] + CacheConfigValidationFailure { reason: String }, + }; + + ServerArchiverError = { + #[display("File to archive not found: {}", file_path)] + FileToArchiveNotFound { file_path: String }, + + #[display("Cannot initialize S3 archiver")] + CannotInitializeS3Archiver, + + #[display("Invalid S3 credentials")] + InvalidS3Credentials, + + #[display("Cannot archive file: {}", file_path)] + CannotArchiveFile { file_path: String }, + } || ServerIoError; + + ServerConnectionError = { + #[display("Connection error")] + ConnectionError(ConnectionError), + } || ServerIoError || ServerCommonError; + + ServerLogError = { + #[display("Logging filter reload failure")] + FilterReloadFailure, + + #[display("Logging stdout reload failure")] + StdoutReloadFailure, + + #[display("Logging file reload failure")] + FileReloadFailure, + }; + + ServerCompatError = { + #[display("Invalid message offset, when performing format conversion")] + InvalidMessageOffsetFormatConversion, + + #[display("Invalid batch base offset, when performing format conversion")] + InvalidBatchBaseOffsetFormatConversion, + + #[display("Cannot read message, when performing format conversion, {}", reason)] + InvalidMessageFieldFormatConversionSampling { reason: String }, + + #[display("Cannot read message batch, when performing format conversion, {}", reason)] + CannotReadMessageBatchFormatConversion { reason: String }, + } || ServerIoError || ServerCommonError; + + ServerCommonError = { + #[display("Try from slice error")] + TryFromSliceError(TryFromSliceError), + + #[display("SDK error")] + SdkError(iggy::error::IggyError), + }; +); diff --git a/server/src/tcp/connection_handler.rs b/server/src/tcp/connection_handler.rs index 32dc70fc6..984ac8fbb 100644 --- a/server/src/tcp/connection_handler.rs +++ b/server/src/tcp/connection_handler.rs @@ -1,7 +1,7 @@ use crate::binary::command; use crate::binary::sender::Sender; use crate::command::ServerCommand; -use crate::server_error::ServerError; +use crate::server_error::ServerConnectionError; use crate::streaming::session::Session; use crate::streaming::systems::system::SharedSystem; use bytes::{BufMut, BytesMut}; @@ -18,14 +18,14 @@ pub(crate) async fn handle_connection( session: Arc, sender: &mut dyn Sender, system: SharedSystem, -) -> Result<(), ServerError> { +) -> Result<(), ServerConnectionError> { let mut initial_buffer = [0u8; INITIAL_BYTES_LENGTH]; loop { let read_length = match sender.read(&mut initial_buffer).await { Ok(read_length) => read_length, Err(error) => { if error.as_code() == IggyError::ConnectionClosed.as_code() { - return Err(ServerError::from(error)); + return Err(ServerConnectionError::from(error)); } else { sender.send_error_response(error).await?; continue; @@ -65,9 +65,9 @@ pub(crate) async fn handle_connection( } } -pub(crate) fn handle_error(error: ServerError) { +pub(crate) fn handle_error(error: ServerConnectionError) { match error { - ServerError::IoError(error) => match error.kind() { + ServerConnectionError::IoError(error) => match error.kind() { ErrorKind::UnexpectedEof => { info!("Connection has been closed."); } @@ -84,7 +84,7 @@ pub(crate) fn handle_error(error: ServerError) { error!("Connection has failed: {error}"); } }, - ServerError::SdkError(sdk_error) => match sdk_error { + ServerConnectionError::SdkError(sdk_error) => match sdk_error { IggyError::ConnectionClosed => { debug!("Client closed connection."); }