Skip to content

Commit

Permalink
follow review to the error management
Browse files Browse the repository at this point in the history
  • Loading branch information
Keksoj committed Jul 24, 2023
1 parent 7ce7b86 commit 2ff0385
Show file tree
Hide file tree
Showing 19 changed files with 476 additions and 459 deletions.
4 changes: 2 additions & 2 deletions command/src/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::proto::command::TlsVersion;
#[derive(thiserror::Error, Debug)]
pub enum CertificateError {
#[error("Could not parse PEM certificate from bytes: {0}")]
ParseError(String),
InvalidCertificate(String),
}

#[derive(Debug)]
Expand Down Expand Up @@ -104,7 +104,7 @@ impl<'de> serde::Deserialize<'de> for Fingerprint {

pub fn calculate_fingerprint(certificate: &[u8]) -> Result<Vec<u8>, CertificateError> {
let parsed_certificate = parse(certificate)
.map_err(|parse_error| CertificateError::ParseError(parse_error.to_string()))?;
.map_err(|parse_error| CertificateError::InvalidCertificate(parse_error.to_string()))?;
let fingerprint = Sha256::digest(parsed_certificate.contents())
.iter()
.cloned()
Expand Down
69 changes: 30 additions & 39 deletions command/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,29 @@ use crate::{buffer::growable::Buffer, ready::Ready};
#[derive(thiserror::Error, Debug)]
pub enum ChannelError {
#[error("io read error")]
IoError(std::io::Error),
Read(std::io::Error),
#[error("no byte written on the channel")]
NoByteWritten,
#[error("no byte left to read on the channel")]
NoByteToRead,
#[error("message too large for the capacity of the back fuffer ({0}. Consider increasing the back buffer size")]
MessageTooLarge(usize),
#[error("channel could not write on the back buffer")]
WriteError(std::io::Error),
#[error("channel buffer is full, cannot grow more, ignoring")]
Write(std::io::Error),
#[error("channel buffer is full, cannot grow more")]
BufferFull,
#[error("Timeout is reached: {0:?}")]
TimeoutReached(Duration),
#[error("Could not read anything on the channel")]
NothingRead,
#[error("invalid utf-8 encoding in command message, ignoring: {0}")]
Utf8Error(String),
#[error("invalid char set in command message, ignoring: {0}")]
InvalidCharSet(String),
#[error("Error deserializing message")]
SerdeError(serde_json::error::Error),
Serde(serde_json::error::Error),
#[error("Could not change the blocking status ef the unix stream with file descriptor {fd}: {error}")]
BlockingStatusError { fd: i32, error: String },
#[error("Connection error, continuing")]
ConnectionError,
#[error("Could not connect to socket with path {path}: {error}")]
FromPathError { path: String, error: String },
BlockingStatus { fd: i32, error: String },
#[error("Connection error: {0:?}")]
Connection(Option<std::io::Error>),
}

/// A wrapper around unix socket using the mio crate.
Expand All @@ -69,11 +67,8 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
buffer_size: usize,
max_buffer_size: usize,
) -> Result<Channel<Tx, Rx>, ChannelError> {
let unix_stream =
MioUnixStream::connect(path).map_err(|error| ChannelError::FromPathError {
path: path.to_owned(),
error: error.to_string(),
})?;
let unix_stream = MioUnixStream::connect(path)
.map_err(|io_error| ChannelError::Connection(Some(io_error)))?;
Ok(Channel::new(unix_stream, buffer_size, max_buffer_size))
}

Expand Down Expand Up @@ -113,12 +108,12 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
unsafe {
let fd = self.sock.as_raw_fd();
let stream = StdUnixStream::from_raw_fd(fd);
stream.set_nonblocking(nonblocking).map_err(|error| {
ChannelError::BlockingStatusError {
stream
.set_nonblocking(nonblocking)
.map_err(|error| ChannelError::BlockingStatus {
fd,
error: error.to_string(),
}
})?;
})?;
let _fd = stream.into_raw_fd();
}
self.blocking = !nonblocking;
Expand Down Expand Up @@ -168,7 +163,7 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
/// Handles readability by filling the front buffer with the socket data.
pub fn readable(&mut self) -> Result<usize, ChannelError> {
if !(self.interest & self.readiness).is_readable() {
return Err(ChannelError::ConnectionError);
return Err(ChannelError::Connection(None));
}

let mut count = 0usize;
Expand All @@ -191,10 +186,10 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
self.readiness.remove(Ready::READABLE);
break;
}
_other_error => {
_ => {
self.interest = Ready::EMPTY;
self.readiness = Ready::EMPTY;
return Err(ChannelError::IoError(read_error));
return Err(ChannelError::Read(read_error));
}
},
Ok(bytes_read) => {
Expand All @@ -210,7 +205,7 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
/// Handles writability by writing the content of the back buffer onto the socket
pub fn writable(&mut self) -> Result<usize, ChannelError> {
if !(self.interest & self.readiness).is_writable() {
return Err(ChannelError::ConnectionError);
return Err(ChannelError::Connection(None));
}

let mut count = 0usize;
Expand All @@ -236,10 +231,10 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
self.readiness.remove(Ready::WRITABLE);
break;
}
_other_error => {
_ => {
self.interest = Ready::EMPTY;
self.readiness = Ready::EMPTY;
return Err(ChannelError::IoError(write_error));
return Err(ChannelError::Read(write_error));
}
},
}
Expand Down Expand Up @@ -314,7 +309,7 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
match self
.sock
.read(self.front_buf.space())
.map_err(ChannelError::IoError)?
.map_err(ChannelError::Read)?
{
0 => return Err(ChannelError::NoByteToRead),
bytes_read => self.front_buf.fill(bytes_read),
Expand All @@ -326,9 +321,9 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {

fn read_and_parse_from_front_buffer(&mut self, position: usize) -> Result<Rx, ChannelError> {
let utf8_str = from_utf8(&self.front_buf.data()[..position])
.map_err(|from_error| ChannelError::Utf8Error(from_error.to_string()))?;
.map_err(|from_error| ChannelError::InvalidCharSet(from_error.to_string()))?;

let json_parsed = serde_json::from_str(utf8_str).map_err(ChannelError::SerdeError)?;
let json_parsed = serde_json::from_str(utf8_str).map_err(ChannelError::Serde)?;

self.front_buf.consume(position + 1);
Ok(json_parsed)
Expand Down Expand Up @@ -370,13 +365,11 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
self.back_buf.grow(new_length);
}

self.back_buf
.write(&message)
.map_err(ChannelError::WriteError)?;
self.back_buf.write(&message).map_err(ChannelError::Write)?;

self.back_buf
.write(&b"\0"[..])
.map_err(ChannelError::WriteError)?;
.map_err(ChannelError::Write)?;

self.interest.insert(Ready::WRITABLE);

Expand Down Expand Up @@ -406,13 +399,11 @@ impl<Tx: Debug + Serialize, Rx: Debug + DeserializeOwned> Channel<Tx, Rx> {
self.back_buf.grow(new_len);
}

self.back_buf
.write(&message)
.map_err(ChannelError::WriteError)?;
self.back_buf.write(&message).map_err(ChannelError::Write)?;

self.back_buf
.write(&b"\0"[..])
.map_err(ChannelError::WriteError)?;
.map_err(ChannelError::Write)?;

loop {
let size = self.back_buf.available_data();
Expand Down Expand Up @@ -440,7 +431,7 @@ impl<Tx: Debug + DeserializeOwned + Serialize, Rx: Debug + DeserializeOwned + Se
buffer_size: usize,
max_buffer_size: usize,
) -> Result<(Channel<Tx, Rx>, Channel<Rx, Tx>), ChannelError> {
let (command, proxy) = MioUnixStream::pair().map_err(ChannelError::IoError)?;
let (command, proxy) = MioUnixStream::pair().map_err(ChannelError::Read)?;
let proxy_channel = Channel::new(proxy, buffer_size, max_buffer_size);
let mut command_channel = Channel::new(command, buffer_size, max_buffer_size);
command_channel.blocking()?;
Expand All @@ -452,7 +443,7 @@ impl<Tx: Debug + DeserializeOwned + Serialize, Rx: Debug + DeserializeOwned + Se
buffer_size: usize,
max_buffer_size: usize,
) -> Result<(Channel<Tx, Rx>, Channel<Rx, Tx>), ChannelError> {
let (command, proxy) = MioUnixStream::pair().map_err(ChannelError::IoError)?;
let (command, proxy) = MioUnixStream::pair().map_err(ChannelError::Read)?;
let proxy_channel = Channel::new(proxy, buffer_size, max_buffer_size);
let command_channel = Channel::new(command, buffer_size, max_buffer_size);
Ok((command_channel, proxy_channel))
Expand Down
Loading

0 comments on commit 2ff0385

Please sign in to comment.