Skip to content

Commit

Permalink
Remove instances of unwraps
Browse files Browse the repository at this point in the history
  • Loading branch information
rishitc committed Sep 12, 2024
1 parent 8afd4e7 commit e7c45a0
Showing 1 changed file with 95 additions and 26 deletions.
121 changes: 95 additions & 26 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// License : MIT License

use crate::cluster::{ClusterConfig, NodeMeta};
use crate::error::Error;
use crate::log::get_logger;
use crate::network::{NetworkLayer, TCPManager};
use crate::state_mechine::{self, StateMachine};
Expand Down Expand Up @@ -236,7 +237,14 @@ impl Server {
return;
}

let log_byte = self.storage.retrieve().await.unwrap();
let log_byte = match self.storage.retrieve().await {
Ok(data) => data,
Err(e) => {
error!(self.log, "Follower failed to read from storage: {}", e);
return;
}
};

let log_entry_size = std::mem::size_of::<LogEntry>();

// Data integrity check failed
Expand All @@ -258,10 +266,9 @@ impl Server {
2u32.to_be_bytes(),
]
.concat();
self.network_manager
.broadcast(&data, &addresses)
.await
.unwrap();
if let Err(e) = self.network_manager.broadcast(&data, &addresses).await {
error!(self.log, "Follower failed to broadcast message: {}", e)
}
return;
}

Expand All @@ -275,10 +282,20 @@ impl Server {
}
bytes_data = bytes_data[0..log_entry_size].to_vec();

let log_entry = self.deserialize_log_entries(&bytes_data);
let res = self.deserialize_log_entries(&bytes_data);
let Ok(log_entry) = res else {
error!(
self.log,
"Failed to deserialize log entry: {}",
res.err().unwrap()
);
return;
};

if log_entry.term > self.state.current_term {
self.state.current_term = log_entry.term;
}

state_machine
.lock()
.await
Expand Down Expand Up @@ -473,8 +490,10 @@ impl Server {
let append_batch = self.prepare_append_batch(self.id, self.state.current_term, self.state.previous_log_index, self.state.commit_index, self.write_buffer.clone());

for entry in self.write_buffer.clone() {
let data = bincode::serialize(&entry).unwrap();
self.persist_to_disk(self.id, &data).await;
match bincode::serialize(&entry) {
Ok(data) => self.persist_to_disk(self.id, &data).await,
Err(e) => error!(self.log, "Failed to serialize entry: {}", e),
}
}

let addresses: Vec<SocketAddr> = self.peers_address();
Expand All @@ -491,8 +510,10 @@ impl Server {
}

async fn receive_rpc(&mut self) {
let data = self.network_manager.receive().await.unwrap();
self.handle_rpc(data).await;
match self.network_manager.receive().await {
Ok(data) => self.handle_rpc(data).await,
Err(e) => error!(self.log, "Failed to receive rpc: {}", e),
};
}

fn prepare_append_batch(
Expand Down Expand Up @@ -671,11 +692,11 @@ impl Server {
]
.concat();

let voteresponse = self
let voter_response = self
.network_manager
.send(&candidate_address.unwrap(), &data)
.await;
if let Err(e) = voteresponse {
if let Err(e) = voter_response {
error!(self.log, "Failed to send vote response: {}", e);
}
}
Expand Down Expand Up @@ -759,7 +780,13 @@ impl Server {
};

// serialize log entry and append to log
let data = bincode::serialize(&log_entry).unwrap();
let data = match bincode::serialize(&log_entry) {
Ok(data) => data,
Err(e) => {
error!(self.log, "Failed to serialize the log: {}", e);
return;
}
};

let _ = self.persist_to_disk(id, &data).await;

Expand All @@ -773,12 +800,21 @@ impl Server {
]
.concat();

let leader_address = self.cluster_config.address(id).unwrap();
let leader_address = self.cluster_config.address(id);
if leader_address.is_none() {
// no dynamic membership changes
info!(self.log, "Leader address not found");
return;
}
info!(
self.log,
"Sending append entries response to leader: {}", id
);
if let Err(e) = self.network_manager.send(&leader_address, &response).await {
if let Err(e) = self
.network_manager
.send(&leader_address.unwrap(), &response)
.await
{
info!(self.log, "Failed to send append entries response: {}", e);
}
}
Expand Down Expand Up @@ -872,7 +908,9 @@ impl Server {
self.state.state = RaftState::Follower;
self.state.current_term = term;
}
} else if self.id != self.config.default_leader.unwrap() {
} else if self.config.default_leader.is_some()
&& self.id != self.config.default_leader.unwrap()
{
self.state.state = RaftState::Follower;
self.state.current_term = term;
} else {
Expand All @@ -894,7 +932,13 @@ impl Server {

let peer_id = u32::from_be_bytes(data[0..4].try_into().unwrap());

let log_byte = self.storage.retrieve().await.unwrap();
let log_byte = match self.storage.retrieve().await {
Ok(data) => data,
Err(e) => {
error!(self.log, "Failed to read log from storage: {}", e);
return;
}
};
let log_entry_size = std::mem::size_of::<LogEntry>();

// Data integrity check failed
Expand Down Expand Up @@ -933,8 +977,17 @@ impl Server {
response = [response.clone(), entry.to_be_bytes().to_vec()].concat();
}

let peer_address = self.cluster_config.address(peer_id).unwrap();
if let Err(e) = self.network_manager.send(&peer_address, &response).await {
let peer_address = self.cluster_config.address(peer_id);
if peer_address.is_none() {
// no dynamic membership changes
info!(self.log, "Peer address not found");
return;
}
if let Err(e) = self
.network_manager
.send(&peer_address.unwrap(), &response)
.await
{
error!(self.log, "Failed to send repair response: {}", e);
}
}
Expand Down Expand Up @@ -1007,7 +1060,11 @@ impl Server {
response.extend_from_slice(&self.state.previous_log_index.to_be_bytes());
response.extend_from_slice(&self.peer_count().to_be_bytes());

let peer_address = self.cluster_config.address(node_id).unwrap();
let Some(peer_address) = self.cluster_config.address(node_id) else {
// no dynamic membership changes
info!(self.log, "Peer address not found");
return;
};
if let Err(e) = self.network_manager.send(&peer_address, &response).await {
error!(self.log, "Failed to send join response: {}", e);
}
Expand Down Expand Up @@ -1071,11 +1128,16 @@ impl Server {
8u32.to_be_bytes(),
]
.concat();
let leader_address = self.cluster_config.address(leader_id).unwrap();
let leader_address = self.cluster_config.address(leader_id);
if leader_address.is_none() {
// no dynamic membership changes
info!(self.log, "Leader address not found");
return;
}

if let Err(e) = self
.network_manager
.send(&leader_address, &request_data)
.send(&leader_address.unwrap(), &request_data)
.await
{
error!(self.log, "Failed to send repair request: {}", e);
Expand Down Expand Up @@ -1151,7 +1213,15 @@ impl Server {
// let mut state_machine_lock = state_machine.lock().await;
if self.state.state == RaftState::Follower {
// deserialize log entries and append to log
let log_entry = self.deserialize_log_entries(data);
let res = self.deserialize_log_entries(data);
let Ok(log_entry) = res else {
error!(
self.log,
"Failed to deserialize log entry: {}",
res.err().unwrap()
);
return;
};
state_machine
.lock()
.await
Expand All @@ -1175,9 +1245,8 @@ impl Server {
);
}

fn deserialize_log_entries(&self, data: &[u8]) -> LogEntry {
// convert data to logEntry using bincode
bincode::deserialize(data).unwrap()
fn deserialize_log_entries(&self, data: &[u8]) -> Result<LogEntry, Error> {
bincode::deserialize(data).map_err(Error::BincodeError)
}

fn is_quorum(&self, votes: u32) -> bool {
Expand Down

0 comments on commit e7c45a0

Please sign in to comment.