From adc94b312e7339b9b19223dea70c3ea8508911fa Mon Sep 17 00:00:00 2001 From: HeDui Date: Wed, 7 Aug 2024 16:14:38 +0800 Subject: [PATCH] add raft add_node example (#29) Signed-off-by: dierbei --- Cargo.toml | 10 +++- examples/simulate_add_node.rs | 110 ++++++++++++++++++++++++++++++++++ src/server.rs | 33 ++++++---- 3 files changed, 141 insertions(+), 12 deletions(-) create mode 100644 examples/simulate_add_node.rs diff --git a/Cargo.toml b/Cargo.toml index db04673..dfc2413 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,4 +30,12 @@ tempfile = "3.10.1" [[example]] name = "simple_run" -path = "examples/simple_run.rs" \ No newline at end of file +path = "examples/simple_run.rs" + +[[example]] +name = "simulate_node_failure" +path = "examples/simulate_node_failure.rs" + +[[example]] +name = "simulate_add_node" +path = "examples/simulate_add_node.rs" \ No newline at end of file diff --git a/examples/simulate_add_node.rs b/examples/simulate_add_node.rs new file mode 100644 index 0000000..382105a --- /dev/null +++ b/examples/simulate_add_node.rs @@ -0,0 +1,110 @@ +// Organization: SpacewalkHq +// License: MIT License + +use raft_rs::log::get_logger; +use slog::error; +use std::collections::HashMap; +use std::thread; +use tokio::runtime::Runtime; +use tokio::time::Duration; + +use raft_rs::network::{NetworkLayer, TCPManager}; +use raft_rs::server::{Server, ServerConfig}; + +#[tokio::main] +async fn main() { + // Define cluster configuration + let mut cluster_nodes = vec![1, 2, 3, 4, 5]; + let mut id_to_address_mapping = HashMap::new(); + id_to_address_mapping.insert(1, "127.0.0.1:5001".to_string()); + id_to_address_mapping.insert(2, "127.0.0.1:5002".to_string()); + id_to_address_mapping.insert(3, "127.0.0.1:5003".to_string()); + id_to_address_mapping.insert(4, "127.0.0.1:5004".to_string()); + id_to_address_mapping.insert(5, "127.0.0.1:5005".to_string()); + + // Create server configs + let configs: Vec<_> = cluster_nodes + .iter() + .map(|&id| ServerConfig { + election_timeout: Duration::from_millis(1000), + address: "127.0.0.1".to_string(), + port: 5000 + id as u16, + cluster_nodes: cluster_nodes.clone(), + id_to_address_mapping: id_to_address_mapping.clone(), + default_leader: Some(1 as u32), + leadership_preferences: HashMap::new(), + storage_location: Some("logs/".to_string()), + }) + .collect(); + + // Start servers in separate threads + let mut handles = vec![]; + for (i, config) in configs.into_iter().enumerate() { + let id = cluster_nodes[i]; + handles.push(thread::spawn(move || { + let rt = Runtime::new().unwrap(); + let mut server = Server::new(id, config); + rt.block_on(server.start()); + })); + } + + // Simulate adding a new node + // The following defines the basic configuration of the new node + tokio::time::sleep(Duration::from_secs(10)).await; + let new_node_id = 6; + let new_node_ip = "127.0.0.1"; + let new_node_port = 5006; + let new_node_address = format!("{}:{}", new_node_ip, new_node_port); + cluster_nodes.push(new_node_id); + id_to_address_mapping.insert(new_node_id, new_node_address.to_string()); + let new_node_conf = ServerConfig { + election_timeout: Duration::from_millis(1000), + address: new_node_ip.to_string(), + port: new_node_port as u16, + cluster_nodes: cluster_nodes.clone(), + id_to_address_mapping: id_to_address_mapping.clone(), + default_leader: Some(1 as u32), + leadership_preferences: HashMap::new(), + storage_location: Some("logs/".to_string()), + }; + + // Launching a new node + handles.push(thread::spawn(move || { + let rt = Runtime::new().unwrap(); + let mut server = Server::new(6, new_node_conf); + rt.block_on(server.start()); + })); + + // Simulate sending a Raft Join request after a few seconds + // Because we need to wait until the new node has started + tokio::time::sleep(Duration::from_secs(3)).await; + add_node_request(new_node_id, new_node_address, new_node_port).await; + + // Wait for all servers to finish + for handle in handles { + handle.join().unwrap(); + } +} + +async fn add_node_request(new_node_id: u32, addr: String, port: u32) { + let log = get_logger(); + + let server_address = "127.0.0.1"; + let network_manager = TCPManager::new(server_address.to_string(), port.try_into().unwrap()); + + let request_data = vec![ + new_node_id.to_be_bytes().to_vec(), + 0u32.to_be_bytes().to_vec(), + 10u32.to_be_bytes().to_vec(), + addr.as_bytes().to_vec(), + ] + .concat(); + + // Let's assume that 5001 is the port of the leader node. + if let Err(e) = network_manager + .send(server_address, "5001", &request_data) + .await + { + error!(log, "Failed to send client request: {}", e); + } +} diff --git a/src/server.rs b/src/server.rs index 88a63b2..3125a4a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -439,13 +439,8 @@ impl Server { } async fn handle_rpc(&mut self, data: Vec) { - let term = u32::from_be_bytes(data[4..8].try_into().unwrap()); let message_type: u32 = u32::from_be_bytes(data[8..12].try_into().unwrap()); - if term < self.state.current_term && message_type != 3 { - return; - } - let message_type = match message_type { 0 => MessageType::RequestVote, 1 => MessageType::RequestVoteResponse, @@ -501,7 +496,11 @@ impl Server { self.handle_repair_response(&data).await; } MessageType::JoinRequest => { - info!(self.log, "Received join request: {:?}", data); + info!( + self.log, + "Received join request: {:?}", + String::from_utf8_lossy(&data) + ); self.handle_join_request(&data).await; } MessageType::JoinResponse => { @@ -841,7 +840,18 @@ impl Server { let node_id = u32::from_be_bytes(data[0..4].try_into().unwrap()); let term = u32::from_be_bytes(data[4..8].try_into().unwrap()); - let node_ip_address = String::from_utf8(data[8..].to_vec()).unwrap(); + let node_ip_address = String::from_utf8(data[12..].to_vec()).unwrap(); + + info!( + self.log, + "Current cluster nodes: {:?}, want join node: {}", + self.config + .id_to_address_mapping + .values() + .cloned() + .collect::>(), + node_ip_address + ); if self.config.cluster_nodes.contains(&node_id) { error!( @@ -856,10 +866,11 @@ impl Server { return; } + self.peers.push(node_id); self.config.cluster_nodes.push(node_id); self.config .id_to_address_mapping - .insert(node_id, node_ip_address); + .insert(node_id, node_ip_address.clone()); let mut response = [ self.id.to_be_bytes(), @@ -889,9 +900,9 @@ impl Server { let leader_id = u32::from_be_bytes(data[0..4].try_into().unwrap()); let current_term = u32::from_be_bytes(data[4..8].try_into().unwrap()); - let commit_index = u32::from_be_bytes(data[8..12].try_into().unwrap()); - let previous_log_index = u32::from_be_bytes(data[12..16].try_into().unwrap()); - let peers_count = u32::from_be_bytes(data[16..].try_into().unwrap()); + let commit_index = u32::from_be_bytes(data[12..16].try_into().unwrap()); + let previous_log_index = u32::from_be_bytes(data[16..20].try_into().unwrap()); + let peers_count = u32::from_be_bytes(data[20..24].try_into().unwrap()); self.state.current_term = current_term; self.state.commit_index = commit_index;