Skip to content

Commit

Permalink
Remove instances of unwraps
Browse files Browse the repository at this point in the history
  • Loading branch information
rishitc committed Sep 10, 2024
1 parent 8afd4e7 commit 3a7ce80
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 28 deletions.
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub enum Error {
// To handle all std lib io error
#[error("File error {0}")]
Io(#[from] std::io::Error),
// To handle deserialisation error
#[error("Bincode error {0}")]
Bincode(#[from] bincode::Error),
/// Some other error occurred.
#[error("unknown error {0}")]
Unknown(#[from] Box<dyn std::error::Error + Sync + Send>),
Expand Down
117 changes: 89 additions & 28 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// License : MIT License

use crate::cluster::{ClusterConfig, NodeMeta};
use crate::error::Error;
use crate::log::get_logger;
use crate::network::{NetworkLayer, TCPManager};
use crate::state_mechine::{self, StateMachine};
Expand Down Expand Up @@ -236,7 +237,14 @@ impl Server {
return;
}

let log_byte = self.storage.retrieve().await.unwrap();
let log_byte = match self.storage.retrieve().await {
Ok(data) => data,
Err(e) => {
error!(self.log, "Follower failed to read from storage: {}", e);
return;
}
};

let log_entry_size = std::mem::size_of::<LogEntry>();

// Data integrity check failed
Expand All @@ -258,10 +266,9 @@ impl Server {
2u32.to_be_bytes(),
]
.concat();
self.network_manager
.broadcast(&data, &addresses)
.await
.unwrap();
if let Err(e) = self.network_manager.broadcast(&data, &addresses).await {
error!(self.log, "Follower failed to broadcast message: {}", e)
}
return;
}

Expand All @@ -275,9 +282,14 @@ impl Server {
}
bytes_data = bytes_data[0..log_entry_size].to_vec();

let log_entry = self.deserialize_log_entries(&bytes_data);
if log_entry.term > self.state.current_term {
self.state.current_term = log_entry.term;
match self.deserialize_log_entries(&bytes_data) {
Ok(log_entry) => {
if log_entry.term > self.state.current_term {
self.state.current_term = log_entry.term;
}
self.state.log.push_front(log_entry);
}
Err(e) => error!(self.log, "Failed to deserialize log entry: {}", e),
}
state_machine
.lock()
Expand Down Expand Up @@ -473,8 +485,10 @@ impl Server {
let append_batch = self.prepare_append_batch(self.id, self.state.current_term, self.state.previous_log_index, self.state.commit_index, self.write_buffer.clone());

for entry in self.write_buffer.clone() {
let data = bincode::serialize(&entry).unwrap();
self.persist_to_disk(self.id, &data).await;
match bincode::serialize(&entry) {
Ok(data) => self.persist_to_disk(self.id, &data).await,
Err(e) => error!(self.log, "Failed to serialize entry: {}", e),
}
}

let addresses: Vec<SocketAddr> = self.peers_address();
Expand All @@ -491,8 +505,10 @@ impl Server {
}

async fn receive_rpc(&mut self) {
let data = self.network_manager.receive().await.unwrap();
self.handle_rpc(data).await;
match self.network_manager.receive().await {
Ok(data) => self.handle_rpc(data).await,
Err(e) => error!(self.log, "Failed to receive rpc: {}", e),
};
}

fn prepare_append_batch(
Expand Down Expand Up @@ -671,11 +687,11 @@ impl Server {
]
.concat();

let voteresponse = self
let voter_response = self
.network_manager
.send(&candidate_address.unwrap(), &data)
.await;
if let Err(e) = voteresponse {
if let Err(e) = voter_response {
error!(self.log, "Failed to send vote response: {}", e);
}
}
Expand Down Expand Up @@ -759,7 +775,13 @@ impl Server {
};

// serialize log entry and append to log
let data = bincode::serialize(&log_entry).unwrap();
let data = match bincode::serialize(&log_entry) {
Ok(data) => data,
Err(e) => {
error!(self.log, "Failed to serialize the log: {}", e);
return;
}
};

let _ = self.persist_to_disk(id, &data).await;

Expand All @@ -773,12 +795,21 @@ impl Server {
]
.concat();

let leader_address = self.cluster_config.address(id).unwrap();
let leader_address = self.cluster_config.address(id);
if leader_address.is_none() {
// no dynamic membership changes
info!(self.log, "Leader address not found");
return;
}
info!(
self.log,
"Sending append entries response to leader: {}", id
);
if let Err(e) = self.network_manager.send(&leader_address, &response).await {
if let Err(e) = self
.network_manager
.send(&leader_address.unwrap(), &response)
.await
{
info!(self.log, "Failed to send append entries response: {}", e);
}
}
Expand Down Expand Up @@ -872,7 +903,9 @@ impl Server {
self.state.state = RaftState::Follower;
self.state.current_term = term;
}
} else if self.id != self.config.default_leader.unwrap() {
} else if self.config.default_leader.is_some()
&& self.id != self.config.default_leader.unwrap()
{
self.state.state = RaftState::Follower;
self.state.current_term = term;
} else {
Expand All @@ -894,7 +927,13 @@ impl Server {

let peer_id = u32::from_be_bytes(data[0..4].try_into().unwrap());

let log_byte = self.storage.retrieve().await.unwrap();
let log_byte = match self.storage.retrieve().await {
Ok(data) => data,
Err(e) => {
error!(self.log, "Failed to read log from storage: {}", e);
return;
}
};
let log_entry_size = std::mem::size_of::<LogEntry>();

// Data integrity check failed
Expand Down Expand Up @@ -933,8 +972,17 @@ impl Server {
response = [response.clone(), entry.to_be_bytes().to_vec()].concat();
}

let peer_address = self.cluster_config.address(peer_id).unwrap();
if let Err(e) = self.network_manager.send(&peer_address, &response).await {
let peer_address = self.cluster_config.address(peer_id);
if peer_address.is_none() {
// no dynamic membership changes
info!(self.log, "Peer address not found");
return;
}
if let Err(e) = self
.network_manager
.send(&peer_address.unwrap(), &response)
.await
{
error!(self.log, "Failed to send repair response: {}", e);
}
}
Expand Down Expand Up @@ -1007,8 +1055,17 @@ impl Server {
response.extend_from_slice(&self.state.previous_log_index.to_be_bytes());
response.extend_from_slice(&self.peer_count().to_be_bytes());

let peer_address = self.cluster_config.address(node_id).unwrap();
if let Err(e) = self.network_manager.send(&peer_address, &response).await {
let peer_address = self.cluster_config.address(node_id);
if peer_address.is_none() {
// no dynamic membership changes
info!(self.log, "Peer address not found");
return;
}
if let Err(e) = self
.network_manager
.send(&peer_address.unwrap(), &response)
.await
{
error!(self.log, "Failed to send join response: {}", e);
}

Expand Down Expand Up @@ -1071,11 +1128,16 @@ impl Server {
8u32.to_be_bytes(),
]
.concat();
let leader_address = self.cluster_config.address(leader_id).unwrap();
let leader_address = self.cluster_config.address(leader_id);
if leader_address.is_none() {
// no dynamic membership changes
info!(self.log, "Leader address not found");
return;
}

if let Err(e) = self
.network_manager
.send(&leader_address, &request_data)
.send(&leader_address.unwrap(), &request_data)
.await
{
error!(self.log, "Failed to send repair request: {}", e);
Expand Down Expand Up @@ -1175,9 +1237,8 @@ impl Server {
);
}

fn deserialize_log_entries(&self, data: &[u8]) -> LogEntry {
// convert data to logEntry using bincode
bincode::deserialize(data).unwrap()
fn deserialize_log_entries(&self, data: &[u8]) -> Result<LogEntry, Error> {
bincode::deserialize(data).map_err(Error::Bincode)
}

fn is_quorum(&self, votes: u32) -> bool {
Expand Down

0 comments on commit 3a7ce80

Please sign in to comment.