Skip to content

Commit

Permalink
refac: storage errors
Browse files Browse the repository at this point in the history
  • Loading branch information
dharanad committed Aug 2, 2024
1 parent f98ebbc commit 3342f9d
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ slog = "2.7.0"
slog-term = "2.9.1"
rand = "0.8"
chrono = "0.4"
thiserror = "1.0"

[dev-dependencies]
tempfile = "3.10.1"
Expand Down
61 changes: 61 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
use thiserror::Error;

/// wrapper around library error
pub type Result<T> = std::result::Result<T, Error>;

#[derive(Error, Debug)]
pub enum Error {
#[error("storage error {0}")]
Store(#[from] StorageError),
#[error("network error {0}")]
Network(#[from] NetworkError),
#[error("file error {0}")]
FileError(#[from] FileError),
/// Some other error occurred.
#[error("unknown error {0}")]
Unknown(#[from] Box<dyn std::error::Error + Sync + Send>),
}

#[derive(Error, Debug)]
pub enum NetworkError {
#[error("opening connection failed")]
OpenConnectionError,
#[error("closing connection failed")]
CloseConnectionError,
#[error("listening on port = {0} failed")]
ListenError(u8),
}

#[derive(Error, Debug)]
pub enum StorageError {
#[error("file is empty")]
EmptyFile,
#[error("File is potentially malicious")]
MaliciousFile,
#[error("error data integrity check failed!")]
DataIntegrityError,
#[error("storing log failed")]
StoreError,
#[error("log compaction failed")]
CompactionError,
#[error("log retrieval failed")]
RetrieveError,
}

#[derive(Error, Debug)]
pub enum FileError {
#[error("write all operation failed")]
WriteError,
#[error("flush operation failed")]
FlushError,
#[error("creating file failed")]
CreateError,
#[error("opening file failed")]
OpenError,
#[error("reading file failed")]
ReadError,
#[error("removing file failed")]
RemoveFileError,
#[error("reading file metadata failed")]
MetaDataError
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod log;
pub mod network;
pub mod server;
pub mod storage;
pub mod error;

/// Helper function to parse the IP address delimited by ':' and return tuple of (ip, port)
fn parse_ip_address(addr: &str) -> (&str, &str) {
Expand Down
57 changes: 30 additions & 27 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Organization: SpacewalkHq
// License: MIT License

use std::error::Error;
use std::path::{Path, PathBuf};

use async_trait::async_trait;
Expand All @@ -10,16 +9,20 @@ use sha2::{Digest, Sha256};
use tokio::fs::{self, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::error::{Error, FileError, Result, StorageError};
use crate::error::StorageError::MaliciousFile;

const MAX_FILE_SIZE: u64 = 1_000_000;
const CHECKSUM_LEN: usize = 64;


#[async_trait]
pub trait Storage {
async fn store(&self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn retrieve(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>;
async fn compaction(&self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn delete(&self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn turned_malicious(&self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn store(&self, data: &[u8]) -> Result<()>;
async fn retrieve(&self) -> Result<Vec<u8>>;
async fn compaction(&self) -> Result<()>;
async fn delete(&self) -> Result<()>;
async fn turned_malicious(&self) -> Result<()>;
}

#[derive(Clone)]
Expand All @@ -38,48 +41,48 @@ impl LocalStorage {
}
}

async fn store_async(&self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn store_async(&self, data: &[u8]) -> Result<()> {
let checksum = calculate_checksum(data);
let data_with_checksum = [data, checksum.as_slice()].concat();

let mut file = File::create(&self.path).await?;
file.write_all(&data_with_checksum).await?;
file.flush().await?;
let mut file = File::create(&self.path).await.map_err(|_e| FileError::CreateError)?;
file.write_all(&data_with_checksum).await.map_err(|_x| FileError::WriteError)?;
file.flush().await.map_err(|_x| FileError::FlushError)?;
Ok(())
}

async fn retrieve_async(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let mut file = File::open(&self.path).await?;
async fn retrieve_async(&self) -> Result<Vec<u8>> {
let mut file = File::open(&self.path).await.map_err(|_x| FileError::OpenError)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
file.read_to_end(&mut buffer).await.map_err(|_x| FileError::ReadError)?;

if buffer.is_empty() {
return Err("File is empty".into());
return Err(Error::Store(StorageError::EmptyFile));
}

if buffer.len() < CHECKSUM_LEN {
return Err("File is potentially malicious".into());
return Err(Error::Store(StorageError::MaliciousFile));
}

let data = &buffer[..buffer.len() - 64];
let stored_checksum = retrieve_checksum(&buffer);
let calculated_checksum = calculate_checksum(data);

if stored_checksum != calculated_checksum {
return Err("Data integrity check failed!".into());
return Err(Error::Store(StorageError::DataIntegrityError));
}

Ok(data.to_vec())
}

async fn delete_async(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
fs::remove_file(&self.path).await?;
async fn delete_async(&self) -> Result<()> {
fs::remove_file(&self.path).await.map_err(|_x| FileError::RemoveFileError)?;
Ok(())
}

async fn compaction_async(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn compaction_async(&self) -> Result<()> {
// If file size is greater than 1MB, then compact it
let metadata = fs::metadata(&self.path).await?;
let metadata = fs::metadata(&self.path).await.map_err(|_x| FileError::MetaDataError)?;
println!("file size {}", metadata.len());
if metadata.len() > MAX_FILE_SIZE {
self.delete_async().await?;
Expand All @@ -90,29 +93,29 @@ impl LocalStorage {

#[async_trait]
impl Storage for LocalStorage {
async fn store(&self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn store(&self, data: &[u8]) -> Result<()> {
self.store_async(data).await
}

async fn retrieve(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
async fn retrieve(&self) -> Result<Vec<u8>> {
self.retrieve_async().await
}

async fn compaction(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn compaction(&self) -> Result<()> {
self.compaction_async().await
}

async fn delete(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn delete(&self) -> Result<()> {
self.delete_async().await
}

async fn turned_malicious(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn turned_malicious(&self) -> Result<()> {
// Check if the file is tampered with
self.retrieve().await?;
let metadata = fs::metadata(&self.path).await?;
let metadata = fs::metadata(&self.path).await.map_err(|_x| FileError::MetaDataError)?;

if metadata.len() > MAX_FILE_SIZE {
return Err("File is potentially malicious".into());
return Err(Error::Store(MaliciousFile));
}
Ok(())
}
Expand Down

0 comments on commit 3342f9d

Please sign in to comment.