Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove instances of unwraps #38

Merged
merged 1 commit into from
Sep 13, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 95 additions & 26 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// License : MIT License

use crate::cluster::{ClusterConfig, NodeMeta};
use crate::error::Error;
use crate::log::get_logger;
use crate::network::{NetworkLayer, TCPManager};
use crate::state_mechine::{self, StateMachine};
Expand Down Expand Up @@ -236,7 +237,14 @@ impl Server {
return;
}

let log_byte = self.storage.retrieve().await.unwrap();
let log_byte = match self.storage.retrieve().await {
Ok(data) => data,
Err(e) => {
error!(self.log, "Follower failed to read from storage: {}", e);
return;
}
};

let log_entry_size = std::mem::size_of::<LogEntry>();

// Data integrity check failed
Expand All @@ -258,10 +266,9 @@ impl Server {
2u32.to_be_bytes(),
]
.concat();
self.network_manager
.broadcast(&data, &addresses)
.await
.unwrap();
if let Err(e) = self.network_manager.broadcast(&data, &addresses).await {
error!(self.log, "Follower failed to broadcast message: {}", e)
}
return;
}

Expand All @@ -275,10 +282,20 @@ impl Server {
}
bytes_data = bytes_data[0..log_entry_size].to_vec();

let log_entry = self.deserialize_log_entries(&bytes_data);
let res = self.deserialize_log_entries(&bytes_data);
let Ok(log_entry) = res else {
error!(
self.log,
"Failed to deserialize log entry: {}",
res.err().unwrap()
);
return;
};

if log_entry.term > self.state.current_term {
self.state.current_term = log_entry.term;
}

state_machine
.lock()
.await
Expand Down Expand Up @@ -473,8 +490,10 @@ impl Server {
let append_batch = self.prepare_append_batch(self.id, self.state.current_term, self.state.previous_log_index, self.state.commit_index, self.write_buffer.clone());

for entry in self.write_buffer.clone() {
let data = bincode::serialize(&entry).unwrap();
self.persist_to_disk(self.id, &data).await;
match bincode::serialize(&entry) {
Ok(data) => self.persist_to_disk(self.id, &data).await,
Err(e) => error!(self.log, "Failed to serialize entry: {}", e),
}
}

let addresses: Vec<SocketAddr> = self.peers_address();
Expand All @@ -491,8 +510,10 @@ impl Server {
}

async fn receive_rpc(&mut self) {
let data = self.network_manager.receive().await.unwrap();
self.handle_rpc(data).await;
match self.network_manager.receive().await {
Ok(data) => self.handle_rpc(data).await,
Err(e) => error!(self.log, "Failed to receive rpc: {}", e),
};
}

fn prepare_append_batch(
Expand Down Expand Up @@ -671,11 +692,11 @@ impl Server {
]
.concat();

let voteresponse = self
let voter_response = self
.network_manager
.send(&candidate_address.unwrap(), &data)
.await;
if let Err(e) = voteresponse {
if let Err(e) = voter_response {
error!(self.log, "Failed to send vote response: {}", e);
}
}
Expand Down Expand Up @@ -759,7 +780,13 @@ impl Server {
};

// serialize log entry and append to log
let data = bincode::serialize(&log_entry).unwrap();
let data = match bincode::serialize(&log_entry) {
Ok(data) => data,
Err(e) => {
error!(self.log, "Failed to serialize the log: {}", e);
return;
}
};

let _ = self.persist_to_disk(id, &data).await;

Expand All @@ -773,12 +800,21 @@ impl Server {
]
.concat();

let leader_address = self.cluster_config.address(id).unwrap();
let leader_address = self.cluster_config.address(id);
if leader_address.is_none() {
// no dynamic membership changes
info!(self.log, "Leader address not found");
return;
}
info!(
self.log,
"Sending append entries response to leader: {}", id
);
if let Err(e) = self.network_manager.send(&leader_address, &response).await {
if let Err(e) = self
.network_manager
.send(&leader_address.unwrap(), &response)
.await
{
info!(self.log, "Failed to send append entries response: {}", e);
}
}
Expand Down Expand Up @@ -872,7 +908,9 @@ impl Server {
self.state.state = RaftState::Follower;
self.state.current_term = term;
}
} else if self.id != self.config.default_leader.unwrap() {
} else if self.config.default_leader.is_some()
&& self.id != self.config.default_leader.unwrap()
{
self.state.state = RaftState::Follower;
self.state.current_term = term;
} else {
Expand All @@ -894,7 +932,13 @@ impl Server {

let peer_id = u32::from_be_bytes(data[0..4].try_into().unwrap());

let log_byte = self.storage.retrieve().await.unwrap();
let log_byte = match self.storage.retrieve().await {
Ok(data) => data,
Err(e) => {
error!(self.log, "Failed to read log from storage: {}", e);
return;
}
};
let log_entry_size = std::mem::size_of::<LogEntry>();

// Data integrity check failed
Expand Down Expand Up @@ -933,8 +977,17 @@ impl Server {
response = [response.clone(), entry.to_be_bytes().to_vec()].concat();
}

let peer_address = self.cluster_config.address(peer_id).unwrap();
if let Err(e) = self.network_manager.send(&peer_address, &response).await {
let peer_address = self.cluster_config.address(peer_id);
if peer_address.is_none() {
// no dynamic membership changes
info!(self.log, "Peer address not found");
return;
}
if let Err(e) = self
.network_manager
.send(&peer_address.unwrap(), &response)
.await
{
error!(self.log, "Failed to send repair response: {}", e);
}
}
Expand Down Expand Up @@ -1007,7 +1060,11 @@ impl Server {
response.extend_from_slice(&self.state.previous_log_index.to_be_bytes());
response.extend_from_slice(&self.peer_count().to_be_bytes());

let peer_address = self.cluster_config.address(node_id).unwrap();
let Some(peer_address) = self.cluster_config.address(node_id) else {
// no dynamic membership changes
info!(self.log, "Peer address not found");
return;
};
if let Err(e) = self.network_manager.send(&peer_address, &response).await {
error!(self.log, "Failed to send join response: {}", e);
}
Expand Down Expand Up @@ -1071,11 +1128,16 @@ impl Server {
8u32.to_be_bytes(),
]
.concat();
let leader_address = self.cluster_config.address(leader_id).unwrap();
let leader_address = self.cluster_config.address(leader_id);
if leader_address.is_none() {
// no dynamic membership changes
info!(self.log, "Leader address not found");
return;
}

if let Err(e) = self
.network_manager
.send(&leader_address, &request_data)
.send(&leader_address.unwrap(), &request_data)
.await
{
error!(self.log, "Failed to send repair request: {}", e);
Expand Down Expand Up @@ -1151,7 +1213,15 @@ impl Server {
// let mut state_machine_lock = state_machine.lock().await;
if self.state.state == RaftState::Follower {
// deserialize log entries and append to log
let log_entry = self.deserialize_log_entries(data);
let res = self.deserialize_log_entries(data);
let Ok(log_entry) = res else {
error!(
self.log,
"Failed to deserialize log entry: {}",
res.err().unwrap()
);
return;
};
state_machine
.lock()
.await
Expand All @@ -1175,9 +1245,8 @@ impl Server {
);
}

fn deserialize_log_entries(&self, data: &[u8]) -> LogEntry {
// convert data to logEntry using bincode
bincode::deserialize(data).unwrap()
fn deserialize_log_entries(&self, data: &[u8]) -> Result<LogEntry, Error> {
bincode::deserialize(data).map_err(Error::BincodeError)
}

fn is_quorum(&self, votes: u32) -> bool {
Expand Down
Loading