diff --git a/src/error.rs b/src/error.rs index b90334c..fb2fa73 100644 --- a/src/error.rs +++ b/src/error.rs @@ -18,6 +18,9 @@ pub enum Error { // To handle all std lib io error #[error("File error {0}")] Io(#[from] std::io::Error), + // To handle deserialisation error + #[error("Bincode error {0}")] + Bincode(#[from] bincode::Error), /// Some other error occurred. #[error("unknown error {0}")] Unknown(#[from] Box), diff --git a/src/server.rs b/src/server.rs index e372a14..4be8ea3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,6 +2,7 @@ // License : MIT License use crate::cluster::{ClusterConfig, NodeMeta}; +use crate::error::Error; use crate::log::get_logger; use crate::network::{NetworkLayer, TCPManager}; use crate::state_mechine::{self, StateMachine}; @@ -236,7 +237,14 @@ impl Server { return; } - let log_byte = self.storage.retrieve().await.unwrap(); + let log_byte = match self.storage.retrieve().await { + Ok(data) => data, + Err(e) => { + error!(self.log, "Follower failed to read from storage: {}", e); + return; + } + }; + let log_entry_size = std::mem::size_of::(); // Data integrity check failed @@ -258,10 +266,9 @@ impl Server { 2u32.to_be_bytes(), ] .concat(); - self.network_manager - .broadcast(&data, &addresses) - .await - .unwrap(); + if let Err(e) = self.network_manager.broadcast(&data, &addresses).await { + error!(self.log, "Follower failed to broadcast message: {}", e) + } return; } @@ -275,9 +282,14 @@ impl Server { } bytes_data = bytes_data[0..log_entry_size].to_vec(); - let log_entry = self.deserialize_log_entries(&bytes_data); - if log_entry.term > self.state.current_term { - self.state.current_term = log_entry.term; + match self.deserialize_log_entries(&bytes_data) { + Ok(log_entry) => { + if log_entry.term > self.state.current_term { + self.state.current_term = log_entry.term; + } + self.state.log.push_front(log_entry); + } + Err(e) => error!(self.log, "Failed to deserialize log entry: {}", e), } state_machine .lock() @@ -473,8 +485,10 @@ impl Server { let append_batch = self.prepare_append_batch(self.id, self.state.current_term, self.state.previous_log_index, self.state.commit_index, self.write_buffer.clone()); for entry in self.write_buffer.clone() { - let data = bincode::serialize(&entry).unwrap(); - self.persist_to_disk(self.id, &data).await; + match bincode::serialize(&entry) { + Ok(data) => self.persist_to_disk(self.id, &data).await, + Err(e) => error!(self.log, "Failed to serialize entry: {}", e), + } } let addresses: Vec = self.peers_address(); @@ -491,8 +505,10 @@ impl Server { } async fn receive_rpc(&mut self) { - let data = self.network_manager.receive().await.unwrap(); - self.handle_rpc(data).await; + match self.network_manager.receive().await { + Ok(data) => self.handle_rpc(data).await, + Err(e) => error!(self.log, "Failed to receive rpc: {}", e), + }; } fn prepare_append_batch( @@ -671,11 +687,11 @@ impl Server { ] .concat(); - let voteresponse = self + let voter_response = self .network_manager .send(&candidate_address.unwrap(), &data) .await; - if let Err(e) = voteresponse { + if let Err(e) = voter_response { error!(self.log, "Failed to send vote response: {}", e); } } @@ -759,7 +775,13 @@ impl Server { }; // serialize log entry and append to log - let data = bincode::serialize(&log_entry).unwrap(); + let data = match bincode::serialize(&log_entry) { + Ok(data) => data, + Err(e) => { + error!(self.log, "Failed to serialize the log: {}", e); + return; + } + }; let _ = self.persist_to_disk(id, &data).await; @@ -773,12 +795,21 @@ impl Server { ] .concat(); - let leader_address = self.cluster_config.address(id).unwrap(); + let leader_address = self.cluster_config.address(id); + if leader_address.is_none() { + // no dynamic membership changes + info!(self.log, "Leader address not found"); + return; + } info!( self.log, "Sending append entries response to leader: {}", id ); - if let Err(e) = self.network_manager.send(&leader_address, &response).await { + if let Err(e) = self + .network_manager + .send(&leader_address.unwrap(), &response) + .await + { info!(self.log, "Failed to send append entries response: {}", e); } } @@ -872,7 +903,9 @@ impl Server { self.state.state = RaftState::Follower; self.state.current_term = term; } - } else if self.id != self.config.default_leader.unwrap() { + } else if self.config.default_leader.is_some() + && self.id != self.config.default_leader.unwrap() + { self.state.state = RaftState::Follower; self.state.current_term = term; } else { @@ -894,7 +927,13 @@ impl Server { let peer_id = u32::from_be_bytes(data[0..4].try_into().unwrap()); - let log_byte = self.storage.retrieve().await.unwrap(); + let log_byte = match self.storage.retrieve().await { + Ok(data) => data, + Err(e) => { + error!(self.log, "Failed to read log from storage: {}", e); + return; + } + }; let log_entry_size = std::mem::size_of::(); // Data integrity check failed @@ -933,8 +972,17 @@ impl Server { response = [response.clone(), entry.to_be_bytes().to_vec()].concat(); } - let peer_address = self.cluster_config.address(peer_id).unwrap(); - if let Err(e) = self.network_manager.send(&peer_address, &response).await { + let peer_address = self.cluster_config.address(peer_id); + if peer_address.is_none() { + // no dynamic membership changes + info!(self.log, "Peer address not found"); + return; + } + if let Err(e) = self + .network_manager + .send(&peer_address.unwrap(), &response) + .await + { error!(self.log, "Failed to send repair response: {}", e); } } @@ -1007,8 +1055,17 @@ impl Server { response.extend_from_slice(&self.state.previous_log_index.to_be_bytes()); response.extend_from_slice(&self.peer_count().to_be_bytes()); - let peer_address = self.cluster_config.address(node_id).unwrap(); - if let Err(e) = self.network_manager.send(&peer_address, &response).await { + let peer_address = self.cluster_config.address(node_id); + if peer_address.is_none() { + // no dynamic membership changes + info!(self.log, "Peer address not found"); + return; + } + if let Err(e) = self + .network_manager + .send(&peer_address.unwrap(), &response) + .await + { error!(self.log, "Failed to send join response: {}", e); } @@ -1071,11 +1128,16 @@ impl Server { 8u32.to_be_bytes(), ] .concat(); - let leader_address = self.cluster_config.address(leader_id).unwrap(); + let leader_address = self.cluster_config.address(leader_id); + if leader_address.is_none() { + // no dynamic membership changes + info!(self.log, "Leader address not found"); + return; + } if let Err(e) = self .network_manager - .send(&leader_address, &request_data) + .send(&leader_address.unwrap(), &request_data) .await { error!(self.log, "Failed to send repair request: {}", e); @@ -1175,9 +1237,8 @@ impl Server { ); } - fn deserialize_log_entries(&self, data: &[u8]) -> LogEntry { - // convert data to logEntry using bincode - bincode::deserialize(data).unwrap() + fn deserialize_log_entries(&self, data: &[u8]) -> Result { + bincode::deserialize(data).map_err(Error::Bincode) } fn is_quorum(&self, votes: u32) -> bool {