Skip to content

Commit

Permalink
adding simulate_node_failure example
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul committed Aug 2, 2024
1 parent 50b9f16 commit 857e899
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ hex = "0.4"
sha2 = "0.10.8"
slog = "2.7.0"
slog-term = "2.9.1"
rand = "0.8"
chrono = "0.4"

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion examples/simple_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() {
let configs: Vec<_> = cluster_nodes
.iter()
.map(|&id| ServerConfig {
election_timeout: Duration::from_secs(5),
election_timeout: Duration::from_millis(200),
address: "127.0.0.1".to_string(),
port: 5000 + id as u16,
cluster_nodes: cluster_nodes.clone(),
Expand Down
89 changes: 89 additions & 0 deletions examples/simulate_node_failure.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Organization: SpacewalkHq
// License: MIT License

use raft_rs::log::get_logger;
use slog::{info, warn};
use std::collections::HashMap;
use rand::Rng;
use tokio::time::{sleep, Duration};
use raft_rs::server::{Server, ServerConfig};

#[tokio::main]
async fn main() {
let log = get_logger();

// Define cluster configuration
let cluster_nodes = vec![1, 2, 3, 4, 5];
let mut id_to_address_mapping = HashMap::new();
id_to_address_mapping.insert(1, "127.0.0.1:5001".to_string());
id_to_address_mapping.insert(2, "127.0.0.1:5002".to_string());
id_to_address_mapping.insert(3, "127.0.0.1:5003".to_string());
id_to_address_mapping.insert(4, "127.0.0.1:5004".to_string());
id_to_address_mapping.insert(5, "127.0.0.1:5005".to_string());

// Create server configs
let configs: Vec<_> = cluster_nodes
.iter()
.map(|&id| ServerConfig {
election_timeout: Duration::from_millis(200),
address: "127.0.0.1".to_string(),
port: 5000 + id as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
default_leader: Some(1),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
})
.collect();

// Start servers asynchronously
let mut server_handles = vec![];
for (i, config) in configs.into_iter().enumerate() {
let id = cluster_nodes[i];
let server_handle = tokio::spawn(async move {
let mut server = Server::new(id, config);
server.start().await;
});
server_handles.push(server_handle);
}

// Simulate stopping and restarting servers
let mut rng = rand::thread_rng();
for _ in 0..10 {
let sleep_time = rng.gen_range(3..=5);
sleep(Duration::from_secs(sleep_time)).await;

let server_to_stop = rng.gen_range(1..=5);
warn!(log, "Stopping server {}", server_to_stop);

// Cancel the selected server's task
server_handles[server_to_stop - 1].abort();

// Simulate Raft leader election process
sleep(Duration::from_secs(3)).await;

warn!(log, "Restarting server {}", server_to_stop);
let config = ServerConfig {
election_timeout: Duration::from_millis(200),
address: "127.0.0.1".to_string(),
port: 5000 + server_to_stop as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
default_leader: Some(1),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
};
let server_handle = tokio::spawn(async move {
let mut server = Server::new(server_to_stop.try_into().unwrap(), config);
server.start().await;
});
server_handles[server_to_stop - 1] = server_handle;
}

// Wait for all server tasks to complete (if they haven't been aborted)
for handle in server_handles {
let _ = handle.await;
}

info!(log, "Test completed successfully.");
}
43 changes: 38 additions & 5 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Server {
previous_log_index: 0,
next_index: vec![0; peers.len()],
match_index: vec![0; peers.len()],
election_timeout: config.election_timeout + Duration::from_secs(2 * id as u64),
election_timeout: config.election_timeout + Duration::from_millis(20 * id as u64),
last_heartbeat: Instant::now(),
votes_received: HashMap::new(),
};
Expand Down Expand Up @@ -161,6 +161,10 @@ impl Server {
}
}

pub fn is_leader(&self) -> bool {
self.state.state == RaftState::Leader
}

async fn follower(&mut self) {
if self.state.state != RaftState::Follower {
return;
Expand Down Expand Up @@ -286,6 +290,7 @@ impl Server {
.clone()
})
.collect();
info!(self.log, "Starting election, id: {}, term: {}", self.id, self.state.current_term);
let _ = self.network_manager.broadcast(&data, addresses).await;

loop {
Expand All @@ -308,6 +313,7 @@ impl Server {
_ = rpc_future => {
if self.is_quorum(self.state.votes_received.len() as u32) {
info!(self.log, "Quorum reached");
info!(self.log, "I am the leader {}", self.id);
self.state.state = RaftState::Leader;
break;
}
Expand All @@ -328,7 +334,6 @@ impl Server {
return;
}
info!(self.log, "Server {} is the leader", self.id);
info!(self.log, "Leader state: {:?}", self.state);

let mut heartbeat_interval = tokio::time::interval(Duration::from_millis(300));

Expand Down Expand Up @@ -377,7 +382,7 @@ impl Server {
self.debounce_timer = Instant::now();
}

info!(self.log, "Leader state: {:?}", self.state);
// debug!(self.log, "Leader id: {}, Leader state: {:?}", self.id, self.state);
},
}
}
Expand Down Expand Up @@ -462,7 +467,7 @@ impl Server {
self.handle_append_entries_response(&data).await;
}
MessageType::Heartbeat => {
self.handle_heartbeat().await;
self.handle_heartbeat(&data).await;
}
MessageType::HeartbeatResponse => {
self.handle_heartbeat_response().await;
Expand Down Expand Up @@ -716,10 +721,38 @@ impl Server {
}
}

async fn handle_heartbeat(&mut self) {
async fn handle_heartbeat(&mut self, data: &[u8]) {
if self.state.state != RaftState::Follower || self.state.state != RaftState::Candidate {
return;
}
let term = u32::from_be_bytes(data[4..8].try_into().unwrap());
if term < self.state.current_term {
return;
}

// if a leader gets a heartbeat from a leader with a higher term, it should step down
if term > self.state.current_term {
self.state.state = RaftState::Follower;
}

// if a leader gets a heartbeat from a leader same term, it should step down if it has a higher id
if term == self.state.current_term {
let leader_id = u32::from_be_bytes(data[0..4].try_into().unwrap());
if self.config.default_leader.is_none() {
if self.id < leader_id {
self.state.state = RaftState::Follower;
self.state.current_term = term;
}
} else {
if self.id != self.config.default_leader.unwrap() {
self.state.state = RaftState::Follower;
self.state.current_term = term;
} else {
self.state.state = RaftState::Leader;
}
}
}

self.state.last_heartbeat = Instant::now();
}

Expand Down

0 comments on commit 857e899

Please sign in to comment.