diff --git a/README.md b/README.md index 141bfa0..0a82404 100644 --- a/README.md +++ b/README.md @@ -21,23 +21,22 @@ It is asynchronous(built on tokio runtime) and supports zero-copy. It does not a - [x] Leadership preference - [x] Log compaction - [x] Tigerbeetle style replica repair +- [x] Dynamic cluster membership changes support ## To-Do - [ ] Production-ready - [ ] Test replica repair thoroughly +- [ ] Test for dynamic cluster membership changes - [ ] io_uring support -- [ ] CI integration improvements - [ ] Complete batch write implementation - [ ] Improve Log compaction - [ ] Improve error handling - [ ] RDMA support - [ ] Add more comprehensive tests - [ ] Enhance documentation -- [ ] Implement dynamic cluster membership changes - [ ] Deterministic Simulation Testing - [ ] Benchmarking - ## How to Run the Project 1. Ensure you have Rust installed. If not, follow the instructions [here](https://www.rust-lang.org/tools/install). 2. Clone the repository: diff --git a/src/server.rs b/src/server.rs index e3853dc..12bf61f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -30,6 +30,9 @@ enum MesageType { ClientResponse, RepairRequest, RepairResponse, + // dynamic membership changes + JoinRequest, + JoinResponse, } #[derive(Debug)] @@ -439,6 +442,8 @@ impl Server { 7 => MesageType::ClientResponse, 8 => MesageType::RepairRequest, 9 => MesageType::RepairResponse, + 10 => MesageType::JoinRequest, + 11 => MesageType::JoinResponse, _ => return, }; @@ -475,12 +480,17 @@ impl Server { } } MesageType::RepairRequest => { - // TODO: get implementation from user based on the application - info!(self.log, "Received repair request: {:?}", data); + self.handle_repair_request(&data).await; } MesageType::RepairResponse => { - // TODO: get implementation from user based on the application - info!(self.log, "Received repair response: {:?}", data); + self.handle_repair_response(&data).await; + } + MesageType::JoinRequest => { + info!(self.log, "Received join request: {:?}", data); + self.handle_join_request(&data).await; + } + MesageType::JoinResponse => { + self.handle_join_response(&data).await; } } } @@ -716,8 +726,11 @@ impl Server { // Noop } - #[allow(dead_code)] async fn handle_repair_request(&mut self, data: &[u8]) { + if self.state.state != RaftState::Follower || self.state.state != RaftState::Leader { + return; + } + let peer_id = u32::from_be_bytes(data[0..4].try_into().unwrap()); let log_byte = self.storage.retrieve().await; @@ -736,7 +749,6 @@ impl Server { repair_data.extend_from_slice(entry); } - // send log entries to peer let mut response = [ self.id.to_be_bytes(), self.state.current_term.to_be_bytes(), @@ -760,9 +772,13 @@ impl Server { } } - #[allow(dead_code)] async fn handle_repair_response(&mut self, data: &[u8]) { + if self.state.state != RaftState::Leader { + return; + } + if self.storage.turned_malicious().await.is_err() { + self.state.state = RaftState::Follower; return; } @@ -777,6 +793,96 @@ impl Server { } } + async fn handle_join_request(&mut self, data: &[u8]) { + if self.state.state != RaftState::Leader { + return; + } + + let node_id = u32::from_be_bytes(data[0..4].try_into().unwrap()); + let term = u32::from_be_bytes(data[4..8].try_into().unwrap()); + let node_ip_address = String::from_utf8(data[8..].to_vec()).unwrap(); + + if self.config.cluster_nodes.contains(&node_id) { + error!( + self.log, + "Node already exists in the cluster, Ignoring join request." + ); + return; + } + + if term != 0 { + error!(self.log, "Invalid term for join request, term should be 0."); + return; + } + + self.config.cluster_nodes.push(node_id); + self.config + .id_to_address_mapping + .insert(node_id, node_ip_address); + + let mut response = [ + self.id.to_be_bytes(), + self.state.current_term.to_be_bytes(), + 11u32.to_be_bytes(), + ] + .concat(); + response.extend_from_slice(&self.state.commit_index.to_be_bytes()); + response.extend_from_slice(&self.state.previous_log_index.to_be_bytes()); + response.extend_from_slice(&self.peers.len().to_be_bytes()); + + let peer_address = self.config.id_to_address_mapping.get(&node_id).unwrap(); + let (peer_ip, peer_port) = parse_ip_address(peer_address); + if let Err(e) = self + .network_manager + .send(peer_ip, peer_port, &response) + .await + { + error!(self.log, "Failed to send join response: {}", e); + } + } + + async fn handle_join_response(&mut self, data: &[u8]) { + if self.state.state != RaftState::Follower { + return; + } + + let leader_id = u32::from_be_bytes(data[0..4].try_into().unwrap()); + let current_term = u32::from_be_bytes(data[4..8].try_into().unwrap()); + let commit_index = u32::from_be_bytes(data[8..12].try_into().unwrap()); + let previous_log_index = u32::from_be_bytes(data[12..16].try_into().unwrap()); + let peers_count = u32::from_be_bytes(data[16..].try_into().unwrap()); + + self.state.current_term = current_term; + self.state.commit_index = commit_index; + self.state.previous_log_index = previous_log_index; + + self.peers.clear(); + for i in 0..peers_count { + self.peers.push(i); + } + + let request_data = [ + self.id.to_be_bytes(), + self.state.current_term.to_be_bytes(), + 8u32.to_be_bytes(), + ] + .concat(); + let leader_address = self.config.id_to_address_mapping.get(&leader_id).unwrap(); + let (leader_ip, leader_port) = parse_ip_address(leader_address); + if let Err(e) = self + .network_manager + .send(leader_ip, leader_port, &request_data) + .await + { + error!(self.log, "Failed to send repair request: {}", e); + } + + info!( + self.log, + "Joined the cluster with leader: {}, own id: {}", leader_id, self.id + ); + } + async fn persist_to_disk(&mut self, id: u32, data: &[u8]) { info!( self.log,