Skip to content

Commit

Permalink
add raft add_node example (#29)
Browse files Browse the repository at this point in the history
Signed-off-by: dierbei <[email protected]>
  • Loading branch information
dierbei authored Aug 7, 2024
1 parent e6b96b0 commit adc94b3
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 12 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,12 @@ tempfile = "3.10.1"

[[example]]
name = "simple_run"
path = "examples/simple_run.rs"
path = "examples/simple_run.rs"

[[example]]
name = "simulate_node_failure"
path = "examples/simulate_node_failure.rs"

[[example]]
name = "simulate_add_node"
path = "examples/simulate_add_node.rs"
110 changes: 110 additions & 0 deletions examples/simulate_add_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Organization: SpacewalkHq
// License: MIT License

use raft_rs::log::get_logger;
use slog::error;
use std::collections::HashMap;
use std::thread;
use tokio::runtime::Runtime;
use tokio::time::Duration;

use raft_rs::network::{NetworkLayer, TCPManager};
use raft_rs::server::{Server, ServerConfig};

#[tokio::main]
async fn main() {
// Define cluster configuration
let mut cluster_nodes = vec![1, 2, 3, 4, 5];
let mut id_to_address_mapping = HashMap::new();
id_to_address_mapping.insert(1, "127.0.0.1:5001".to_string());
id_to_address_mapping.insert(2, "127.0.0.1:5002".to_string());
id_to_address_mapping.insert(3, "127.0.0.1:5003".to_string());
id_to_address_mapping.insert(4, "127.0.0.1:5004".to_string());
id_to_address_mapping.insert(5, "127.0.0.1:5005".to_string());

// Create server configs
let configs: Vec<_> = cluster_nodes
.iter()
.map(|&id| ServerConfig {
election_timeout: Duration::from_millis(1000),
address: "127.0.0.1".to_string(),
port: 5000 + id as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
default_leader: Some(1 as u32),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
})
.collect();

// Start servers in separate threads
let mut handles = vec![];
for (i, config) in configs.into_iter().enumerate() {
let id = cluster_nodes[i];
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(id, config);
rt.block_on(server.start());
}));
}

// Simulate adding a new node
// The following defines the basic configuration of the new node
tokio::time::sleep(Duration::from_secs(10)).await;
let new_node_id = 6;
let new_node_ip = "127.0.0.1";
let new_node_port = 5006;
let new_node_address = format!("{}:{}", new_node_ip, new_node_port);
cluster_nodes.push(new_node_id);
id_to_address_mapping.insert(new_node_id, new_node_address.to_string());
let new_node_conf = ServerConfig {
election_timeout: Duration::from_millis(1000),
address: new_node_ip.to_string(),
port: new_node_port as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
default_leader: Some(1 as u32),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
};

// Launching a new node
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(6, new_node_conf);
rt.block_on(server.start());
}));

// Simulate sending a Raft Join request after a few seconds
// Because we need to wait until the new node has started
tokio::time::sleep(Duration::from_secs(3)).await;
add_node_request(new_node_id, new_node_address, new_node_port).await;

// Wait for all servers to finish
for handle in handles {
handle.join().unwrap();
}
}

async fn add_node_request(new_node_id: u32, addr: String, port: u32) {
let log = get_logger();

let server_address = "127.0.0.1";
let network_manager = TCPManager::new(server_address.to_string(), port.try_into().unwrap());

let request_data = vec![
new_node_id.to_be_bytes().to_vec(),
0u32.to_be_bytes().to_vec(),
10u32.to_be_bytes().to_vec(),
addr.as_bytes().to_vec(),
]
.concat();

// Let's assume that 5001 is the port of the leader node.
if let Err(e) = network_manager
.send(server_address, "5001", &request_data)
.await
{
error!(log, "Failed to send client request: {}", e);
}
}
33 changes: 22 additions & 11 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,8 @@ impl Server {
}

async fn handle_rpc(&mut self, data: Vec<u8>) {
let term = u32::from_be_bytes(data[4..8].try_into().unwrap());
let message_type: u32 = u32::from_be_bytes(data[8..12].try_into().unwrap());

if term < self.state.current_term && message_type != 3 {
return;
}

let message_type = match message_type {
0 => MessageType::RequestVote,
1 => MessageType::RequestVoteResponse,
Expand Down Expand Up @@ -501,7 +496,11 @@ impl Server {
self.handle_repair_response(&data).await;
}
MessageType::JoinRequest => {
info!(self.log, "Received join request: {:?}", data);
info!(
self.log,
"Received join request: {:?}",
String::from_utf8_lossy(&data)
);
self.handle_join_request(&data).await;
}
MessageType::JoinResponse => {
Expand Down Expand Up @@ -841,7 +840,18 @@ impl Server {

let node_id = u32::from_be_bytes(data[0..4].try_into().unwrap());
let term = u32::from_be_bytes(data[4..8].try_into().unwrap());
let node_ip_address = String::from_utf8(data[8..].to_vec()).unwrap();
let node_ip_address = String::from_utf8(data[12..].to_vec()).unwrap();

info!(
self.log,
"Current cluster nodes: {:?}, want join node: {}",
self.config
.id_to_address_mapping
.values()
.cloned()
.collect::<Vec<_>>(),
node_ip_address
);

if self.config.cluster_nodes.contains(&node_id) {
error!(
Expand All @@ -856,10 +866,11 @@ impl Server {
return;
}

self.peers.push(node_id);
self.config.cluster_nodes.push(node_id);
self.config
.id_to_address_mapping
.insert(node_id, node_ip_address);
.insert(node_id, node_ip_address.clone());

let mut response = [
self.id.to_be_bytes(),
Expand Down Expand Up @@ -889,9 +900,9 @@ impl Server {

let leader_id = u32::from_be_bytes(data[0..4].try_into().unwrap());
let current_term = u32::from_be_bytes(data[4..8].try_into().unwrap());
let commit_index = u32::from_be_bytes(data[8..12].try_into().unwrap());
let previous_log_index = u32::from_be_bytes(data[12..16].try_into().unwrap());
let peers_count = u32::from_be_bytes(data[16..].try_into().unwrap());
let commit_index = u32::from_be_bytes(data[12..16].try_into().unwrap());
let previous_log_index = u32::from_be_bytes(data[16..20].try_into().unwrap());
let peers_count = u32::from_be_bytes(data[20..24].try_into().unwrap());

self.state.current_term = current_term;
self.state.commit_index = commit_index;
Expand Down

0 comments on commit adc94b3

Please sign in to comment.