Skip to content

Commit

Permalink
Migrate ServerError to error_set
Browse files Browse the repository at this point in the history
  • Loading branch information
bakytgerey_ashirbeko committed Oct 23, 2024
1 parent 214f0ca commit 67ab8f9
Show file tree
Hide file tree
Showing 18 changed files with 379 additions and 277 deletions.
53 changes: 53 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions integration/tests/archiver/disk.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::archiver::DiskArchiverSetup;
use server::archiver::Archiver;
use server::server_error::ServerError;
use server::streaming::utils::file;
use server::{archiver::Archiver, server_error::ServerArchiverError};
use std::path::Path;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Expand Down Expand Up @@ -89,7 +89,10 @@ async fn should_fail_when_file_to_archive_does_not_exist() {

assert!(result.is_err());
let error = result.err().unwrap();
assert!(matches!(error, ServerError::FileToArchiveNotFound(_)));
assert!(matches!(
error,
ServerArchiverError::FileToArchiveNotFound { .. }
));
}

async fn create_file(path: &str, content: &str) {
Expand Down
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ clap = { version = "4.5.17", features = ["derive"] }
console-subscriber = { version = "0.4.0", optional = true }
dashmap = "6.0.1"
derive_more = "1.0.0"
error_set = { version = "0.6.4" }
figlet-rs = "0.1.5"
figment = { version = "0.10.18", features = ["json", "toml", "env"] }
flume = "0.11.0"
Expand Down
12 changes: 7 additions & 5 deletions server/src/archiver/disk.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::archiver::Archiver;
use crate::configs::server::DiskArchiverConfig;
use crate::server_error::ServerError;
use crate::server_error::ServerArchiverError;
use async_trait::async_trait;
use std::path::Path;
use tokio::fs;
Expand All @@ -19,7 +19,7 @@ impl DiskArchiver {

#[async_trait]
impl Archiver for DiskArchiver {
async fn init(&self) -> Result<(), ServerError> {
async fn init(&self) -> Result<(), ServerArchiverError> {
if !Path::new(&self.config.path).exists() {
info!("Creating disk archiver directory: {}", self.config.path);
fs::create_dir_all(&self.config.path).await?;
Expand All @@ -31,7 +31,7 @@ impl Archiver for DiskArchiver {
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ServerError> {
) -> Result<bool, ServerArchiverError> {
debug!("Checking if file: {file} is archived on disk.");
let base_directory = base_directory.as_deref().unwrap_or_default();
let path = Path::new(&self.config.path).join(base_directory).join(file);
Expand All @@ -44,13 +44,15 @@ impl Archiver for DiskArchiver {
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError> {
) -> Result<(), ServerArchiverError> {
debug!("Archiving files on disk: {:?}", files);
for file in files {
debug!("Archiving file: {file}");
let source = Path::new(file);
if !source.exists() {
return Err(ServerError::FileToArchiveNotFound(file.to_string()));
return Err(ServerArchiverError::FileToArchiveNotFound {
file_path: file.to_string(),
});
}

let base_directory = base_directory.as_deref().unwrap_or_default();
Expand Down
8 changes: 4 additions & 4 deletions server/src/archiver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod disk;
pub mod s3;

use crate::server_error::ServerError;
use crate::server_error::ServerArchiverError;
use async_trait::async_trait;
use derive_more::Display;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -31,17 +31,17 @@ impl FromStr for ArchiverKind {

#[async_trait]
pub trait Archiver: Sync + Send {
async fn init(&self) -> Result<(), ServerError>;
async fn init(&self) -> Result<(), ServerArchiverError>;
async fn is_archived(
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ServerError>;
) -> Result<bool, ServerArchiverError>;
async fn archive(
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError>;
) -> Result<(), ServerArchiverError>;
}

impl Debug for dyn Archiver {
Expand Down
32 changes: 18 additions & 14 deletions server/src/archiver/s3.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::archiver::Archiver;
use crate::configs::server::S3ArchiverConfig;
use crate::server_error::ServerError;
use crate::server_error::ServerArchiverError;
use crate::streaming::utils::file;
use async_trait::async_trait;
use s3::creds::Credentials;
Expand All @@ -16,15 +16,15 @@ pub struct S3Archiver {
}

impl S3Archiver {
pub fn new(config: S3ArchiverConfig) -> Result<Self, ServerError> {
pub fn new(config: S3ArchiverConfig) -> Result<Self, ServerArchiverError> {
let credentials = Credentials::new(
Some(&config.key_id),
Some(&config.key_secret),
None,
None,
None,
)
.map_err(|_| ServerError::InvalidS3Credentials)?;
.map_err(|_| ServerArchiverError::InvalidS3Credentials)?;

let bucket = Bucket::new(
&config.bucket,
Expand All @@ -42,14 +42,14 @@ impl S3Archiver {
},
credentials,
)
.map_err(|_| ServerError::CannotInitializeS3Archiver)?;
.map_err(|_| ServerArchiverError::CannotInitializeS3Archiver)?;
Ok(Self {
bucket,
tmp_upload_dir: config.tmp_upload_dir,
})
}

async fn copy_file_to_tmp(&self, path: &str) -> Result<String, ServerError> {
async fn copy_file_to_tmp(&self, path: &str) -> Result<String, ServerArchiverError> {
debug!(
"Copying file: {path} to temporary S3 upload directory: {}",
self.tmp_upload_dir
Expand All @@ -68,11 +68,11 @@ impl S3Archiver {

#[async_trait]
impl Archiver for S3Archiver {
async fn init(&self) -> Result<(), ServerError> {
async fn init(&self) -> Result<(), ServerArchiverError> {
let response = self.bucket.list("/".to_string(), None).await;
if let Err(error) = response {
error!("Cannot initialize S3 archiver: {error}");
return Err(ServerError::CannotInitializeS3Archiver);
return Err(ServerArchiverError::CannotInitializeS3Archiver);
}

if Path::new(&self.tmp_upload_dir).exists() {
Expand All @@ -94,7 +94,7 @@ impl Archiver for S3Archiver {
&self,
file: &str,
base_directory: Option<String>,
) -> Result<bool, ServerError> {
) -> Result<bool, ServerArchiverError> {
debug!("Checking if file: {file} is archived on S3.");
let base_directory = base_directory.as_deref().unwrap_or_default();
let destination = Path::new(&base_directory).join(file);
Expand All @@ -119,10 +119,12 @@ impl Archiver for S3Archiver {
&self,
files: &[&str],
base_directory: Option<String>,
) -> Result<(), ServerError> {
) -> Result<(), ServerArchiverError> {
for path in files {
if !Path::new(path).exists() {
return Err(ServerError::FileToArchiveNotFound(path.to_string()));
return Err(ServerArchiverError::FileToArchiveNotFound {
file_path: path.to_string(),
});
}

let source = self.copy_file_to_tmp(path).await?;
Expand All @@ -137,8 +139,9 @@ impl Archiver for S3Archiver {
.await;
if let Err(error) = response {
error!("Cannot archive file: {path} on S3: {}", error);
fs::remove_file(&source).await?;
return Err(ServerError::CannotArchiveFile(path.to_string()));
return Err(ServerArchiverError::CannotArchiveFile {
file_path: path.to_string(),
});
}

let response = response.unwrap();
Expand All @@ -150,8 +153,9 @@ impl Archiver for S3Archiver {
}

error!("Cannot archive file: {path} on S3, received an invalid status code: {status}.");
fs::remove_file(&source).await?;
return Err(ServerError::CannotArchiveFile(path.to_string()));
return Err(ServerArchiverError::CannotArchiveFile {
file_path: path.to_string(),
});
}
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::compat::message_conversion::binary_schema::BinarySchema;
use crate::compat::message_conversion::schema_sampler::BinarySchemaSampler;
use crate::compat::message_conversion::snapshots::message_snapshot::MessageSnapshot;
use crate::server_error::ServerError;
use crate::server_error::ServerCompatError;
use crate::streaming::utils::file;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
Expand All @@ -27,7 +27,7 @@ unsafe impl Sync for MessageSampler {}

#[async_trait]
impl BinarySchemaSampler for MessageSampler {
async fn try_sample(&self) -> Result<BinarySchema, ServerError> {
async fn try_sample(&self) -> Result<BinarySchema, ServerCompatError> {
let mut index_file = file::open(&self.index_path).await?;
let mut log_file = file::open(&self.log_path).await?;
let log_file_size = log_file.metadata().await?.len();
Expand All @@ -46,7 +46,7 @@ impl BinarySchemaSampler for MessageSampler {

let message = MessageSnapshot::try_from(buffer.freeze())?;
if message.offset != self.segment_start_offset {
return Err(ServerError::InvalidMessageOffsetFormatConversion);
return Err(ServerCompatError::InvalidMessageOffsetFormatConversion);
}
Ok(BinarySchema::RetainedMessageSchema)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::compat::message_conversion::binary_schema::BinarySchema;
use crate::compat::message_conversion::schema_sampler::BinarySchemaSampler;
use crate::compat::message_conversion::snapshots::retained_batch_snapshot::RetainedMessageBatchSnapshot;
use crate::server_error::ServerError;
use crate::server_error::ServerCompatError;
use crate::streaming::utils::file;
use async_trait::async_trait;
use bytes::{BufMut, Bytes};
Expand Down Expand Up @@ -32,7 +32,7 @@ unsafe impl Sync for RetainedMessageBatchSampler {}

#[async_trait]
impl BinarySchemaSampler for RetainedMessageBatchSampler {
async fn try_sample(&self) -> Result<BinarySchema, ServerError> {
async fn try_sample(&self) -> Result<BinarySchema, ServerCompatError> {
let mut index_file = file::open(&self.index_path).await?;
let mut log_file = file::open(&self.log_path).await?;
let log_file_size = log_file.metadata().await?.len();
Expand All @@ -56,7 +56,7 @@ impl BinarySchemaSampler for RetainedMessageBatchSampler {
}
let batch = RetainedMessageBatchSnapshot::try_from(Bytes::from(buffer))?;
if batch.base_offset != self.segment_start_offset {
return Err(ServerError::InvalidBatchBaseOffsetFormatConversion);
return Err(ServerCompatError::InvalidBatchBaseOffsetFormatConversion);
}
Ok(BinarySchema::RetainedMessageBatchSchema)
}
Expand Down
4 changes: 2 additions & 2 deletions server/src/compat/message_conversion/schema_sampler.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::compat::message_conversion::binary_schema::BinarySchema;
use crate::server_error::ServerError;
use crate::server_error::ServerCompatError;
use async_trait::async_trait;

#[async_trait]
pub trait BinarySchemaSampler: Send + Sync {
async fn try_sample(&self) -> Result<BinarySchema, ServerError>;
async fn try_sample(&self) -> Result<BinarySchema, ServerCompatError>;
}
Loading

0 comments on commit 67ab8f9

Please sign in to comment.