Skip to content

Commit

Permalink
feat: Add ClusterConfig (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
dharanad authored Aug 11, 2024
1 parent d99a863 commit 83a2e52
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 210 deletions.
36 changes: 18 additions & 18 deletions examples/simple_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@

// make this file executable with `chmod +x examples/simple_run.rs`

use raft_rs::cluster::{ClusterConfig, NodeMeta};
use raft_rs::log::get_logger;
use slog::{error, info};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::thread;
use tokio::runtime::Runtime;
use tokio::time::Duration;
Expand All @@ -17,23 +20,22 @@ use raft_rs::server::{Server, ServerConfig};
async fn main() {
// Define cluster configuration
let cluster_nodes = vec![1, 2, 3, 4, 5];
let mut id_to_address_mapping = HashMap::new();
id_to_address_mapping.insert(1, "127.0.0.1:5001".to_string());
id_to_address_mapping.insert(2, "127.0.0.1:5002".to_string());
id_to_address_mapping.insert(3, "127.0.0.1:5003".to_string());
id_to_address_mapping.insert(4, "127.0.0.1:5004".to_string());
id_to_address_mapping.insert(5, "127.0.0.1:5005".to_string());

let peers = vec![
NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())),
NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())),
NodeMeta::from((3, SocketAddr::from_str("127.0.0.1:5003").unwrap())),
NodeMeta::from((4, SocketAddr::from_str("127.0.0.1:5004").unwrap())),
NodeMeta::from((5, SocketAddr::from_str("127.0.0.1:5005").unwrap())),
];
let cluster_config = ClusterConfig::new(peers);
// Create server configs
let configs: Vec<_> = cluster_nodes
.iter()
.map(|&id| ServerConfig {
election_timeout: Duration::from_millis(200),
address: "127.0.0.1".to_string(),
port: 5000 + id as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
default_leader: Some(1 as u32),
address: SocketAddr::from_str(format!("127.0.0.1:{}", 5000 + id).as_str()).unwrap(),
default_leader: Some(1u32),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
})
Expand All @@ -43,9 +45,10 @@ async fn main() {
let mut handles = vec![];
for (i, config) in configs.into_iter().enumerate() {
let id = cluster_nodes[i];
let cc = cluster_config.clone();
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(id, config);
let mut server = Server::new(id, config, cc);
rt.block_on(server.start());
}));
}
Expand All @@ -63,8 +66,8 @@ async fn main() {
async fn client_request(client_id: u32, data: u32) {
let log = get_logger();

let server_address = "127.0.0.1"; // Assuming server 1 is the leader
let network_manager = TCPManager::new(server_address.to_string(), 5001);
let server_address = SocketAddr::from_str("127.0.0.1:5001").unwrap(); // Assuming server 1 is the leader
let network_manager = TCPManager::new(server_address);

let request_data = vec![
client_id.to_be_bytes().to_vec(),
Expand All @@ -74,10 +77,7 @@ async fn client_request(client_id: u32, data: u32) {
]
.concat();

if let Err(e) = network_manager
.send(server_address, "5001", &request_data)
.await
{
if let Err(e) = network_manager.send(&server_address, &request_data).await {
error!(log, "Failed to send client request: {}", e);
}

Expand Down
58 changes: 29 additions & 29 deletions examples/simulate_add_node.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
// Organization: SpacewalkHq
// License: MIT License

use raft_rs::cluster::{ClusterConfig, NodeMeta};
use raft_rs::log::get_logger;
use slog::error;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use std::thread;
use tokio::runtime::Runtime;
use tokio::time::Duration;
Expand All @@ -15,22 +18,20 @@ use raft_rs::server::{Server, ServerConfig};
async fn main() {
// Define cluster configuration
let mut cluster_nodes = vec![1, 2, 3, 4, 5];
let mut id_to_address_mapping = HashMap::new();
id_to_address_mapping.insert(1, "127.0.0.1:5001".to_string());
id_to_address_mapping.insert(2, "127.0.0.1:5002".to_string());
id_to_address_mapping.insert(3, "127.0.0.1:5003".to_string());
id_to_address_mapping.insert(4, "127.0.0.1:5004".to_string());
id_to_address_mapping.insert(5, "127.0.0.1:5005".to_string());

let peers = vec![
NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())),
NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())),
NodeMeta::from((3, SocketAddr::from_str("127.0.0.1:5003").unwrap())),
NodeMeta::from((4, SocketAddr::from_str("127.0.0.1:5004").unwrap())),
NodeMeta::from((5, SocketAddr::from_str("127.0.0.1:5005").unwrap())),
];
let mut cluster_config = ClusterConfig::new(peers);
// Create server configs
let configs: Vec<_> = cluster_nodes
.iter()
.map(|&id| ServerConfig {
election_timeout: Duration::from_millis(1000),
address: "127.0.0.1".to_string(),
port: 5000 + id as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
address: SocketAddr::from_str(format!("127.0.0.1:{}", 5000 + id).as_str()).unwrap(),
default_leader: Some(1 as u32),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
Expand All @@ -41,9 +42,10 @@ async fn main() {
let mut handles = vec![];
for (i, config) in configs.into_iter().enumerate() {
let id = cluster_nodes[i];
let cc = cluster_config.clone();
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(id, config);
let mut server = Server::new(id, config, cc);
rt.block_on(server.start());
}));
}
Expand All @@ -52,17 +54,18 @@ async fn main() {
// The following defines the basic configuration of the new node
tokio::time::sleep(Duration::from_secs(10)).await;
let new_node_id = 6;
let new_node_ip = "127.0.0.1";
let new_node_port = 5006;
let new_node_address = format!("{}:{}", new_node_ip, new_node_port);
let new_node_address =
SocketAddr::from_str(format!("127.0.0.1:{}", new_node_port).as_str()).unwrap();
cluster_nodes.push(new_node_id);
id_to_address_mapping.insert(new_node_id, new_node_address.to_string());
// id_to_address_mapping.insert(new_node_id, new_node_address.to_string());
cluster_config.add_server(NodeMeta::from((
new_node_id,
new_node_address.clone().into(),
)));
let new_node_conf = ServerConfig {
election_timeout: Duration::from_millis(1000),
address: new_node_ip.to_string(),
port: new_node_port as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
address: new_node_address.clone().into(),
default_leader: Some(1 as u32),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
Expand All @@ -71,40 +74,37 @@ async fn main() {
// Launching a new node
handles.push(thread::spawn(move || {
let rt = Runtime::new().unwrap();
let mut server = Server::new(6, new_node_conf);
let mut server = Server::new(6, new_node_conf, cluster_config);
rt.block_on(server.start());
}));

// Simulate sending a Raft Join request after a few seconds
// Because we need to wait until the new node has started
tokio::time::sleep(Duration::from_secs(3)).await;
add_node_request(new_node_id, new_node_address, new_node_port).await;
add_node_request(new_node_id, new_node_address).await;

// Wait for all servers to finish
for handle in handles {
handle.join().unwrap();
}
}

async fn add_node_request(new_node_id: u32, addr: String, port: u32) {
async fn add_node_request(new_node_id: u32, addr: SocketAddr) {
let log = get_logger();

let server_address = "127.0.0.1";
let network_manager = TCPManager::new(server_address.to_string(), port.try_into().unwrap());
let server_address = addr;
let network_manager = TCPManager::new(server_address);

let request_data = vec![
new_node_id.to_be_bytes().to_vec(),
0u32.to_be_bytes().to_vec(),
10u32.to_be_bytes().to_vec(),
addr.as_bytes().to_vec(),
addr.ip().to_string().as_bytes().to_vec(),
]
.concat();

// Let's assume that 5001 is the port of the leader node.
if let Err(e) = network_manager
.send(server_address, "5001", &request_data)
.await
{
if let Err(e) = network_manager.send(&server_address, &request_data).await {
error!(log, "Failed to send client request: {}", e);
}
}
34 changes: 18 additions & 16 deletions examples/simulate_node_failure.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Organization: SpacewalkHq
// License: MIT License

use raft_rs::cluster::{ClusterConfig, NodeMeta};
use raft_rs::log::get_logger;
use raft_rs::server::{Server, ServerConfig};
use rand::Rng;
use slog::{info, warn};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::str::FromStr;
use tokio::time::{sleep, Duration};

#[tokio::main]
Expand All @@ -14,22 +17,21 @@ async fn main() {

// Define cluster configuration
let cluster_nodes = vec![1, 2, 3, 4, 5];
let mut id_to_address_mapping = HashMap::new();
id_to_address_mapping.insert(1, "127.0.0.1:5001".to_string());
id_to_address_mapping.insert(2, "127.0.0.1:5002".to_string());
id_to_address_mapping.insert(3, "127.0.0.1:5003".to_string());
id_to_address_mapping.insert(4, "127.0.0.1:5004".to_string());
id_to_address_mapping.insert(5, "127.0.0.1:5005".to_string());
let peers = vec![
NodeMeta::from((1, SocketAddr::from_str("127.0.0.1:5001").unwrap())),
NodeMeta::from((2, SocketAddr::from_str("127.0.0.1:5002").unwrap())),
NodeMeta::from((3, SocketAddr::from_str("127.0.0.1:5003").unwrap())),
NodeMeta::from((4, SocketAddr::from_str("127.0.0.1:5004").unwrap())),
NodeMeta::from((5, SocketAddr::from_str("127.0.0.1:5005").unwrap())),
];
let cluster_config = ClusterConfig::new(peers);

// Create server configs
let configs: Vec<_> = cluster_nodes
.iter()
.map(|&id| ServerConfig {
election_timeout: Duration::from_millis(200),
address: "127.0.0.1".to_string(),
port: 5000 + id as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
address: SocketAddr::from_str(format!("127.0.0.1:{}", 5000 + id).as_str()).unwrap(),
default_leader: Some(1),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
Expand All @@ -40,8 +42,9 @@ async fn main() {
let mut server_handles = vec![];
for (i, config) in configs.into_iter().enumerate() {
let id = cluster_nodes[i];
let cc = cluster_config.clone();
let server_handle = tokio::spawn(async move {
let mut server = Server::new(id, config);
let mut server = Server::new(id, config, cc);
server.start().await;
});
server_handles.push(server_handle);
Expand All @@ -65,16 +68,15 @@ async fn main() {
warn!(log, "Restarting server {}", server_to_stop);
let config = ServerConfig {
election_timeout: Duration::from_millis(200),
address: "127.0.0.1".to_string(),
port: 5000 + server_to_stop as u16,
cluster_nodes: cluster_nodes.clone(),
id_to_address_mapping: id_to_address_mapping.clone(),
address: SocketAddr::from_str(format!("127.0.0.1:{}", 5000 + server_to_stop).as_str())
.unwrap(),
default_leader: Some(1),
leadership_preferences: HashMap::new(),
storage_location: Some("logs/".to_string()),
};
let cc = cluster_config.clone();
let server_handle = tokio::spawn(async move {
let mut server = Server::new(server_to_stop.try_into().unwrap(), config);
let mut server = Server::new(server_to_stop.try_into().unwrap(), config, cc);
server.start().await;
});
server_handles[server_to_stop - 1] = server_handle;
Expand Down
75 changes: 75 additions & 0 deletions src/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use std::collections::HashMap;
use std::net::SocketAddr;

#[derive(Debug, Clone)]
pub struct NodeMeta {
pub id: u32,
pub address: SocketAddr,
}

impl NodeMeta {
fn new(id: u32, address: SocketAddr) -> NodeMeta {
Self { id, address }
}
}

impl From<(u32, SocketAddr)> for NodeMeta {
fn from((id, address): (u32, SocketAddr)) -> Self {
Self::new(id, address)
}
}

#[derive(Debug, Clone)]
pub struct ClusterConfig {
peers: Vec<NodeMeta>,
id_node_map: HashMap<u32, NodeMeta>,
}

impl ClusterConfig {
pub fn new(peers: Vec<NodeMeta>) -> ClusterConfig {
let id_node_map = peers
.clone()
.into_iter()
.map(|x| (x.id, x))
.collect::<HashMap<u32, NodeMeta>>();
ClusterConfig { peers, id_node_map }
}

pub fn peers(&self) -> &[NodeMeta] {
&self.peers
}

// Return meta of peers for a node
pub fn peers_for(&self, id: u32) -> Vec<&NodeMeta> {
self.peers.iter().filter(|x| x.id != id).collect::<Vec<_>>()
}

// Return address of peers for a node
pub fn peer_address_for(&self, id: u32) -> Vec<SocketAddr> {
self.peers
.iter()
.filter(|x| x.id != id)
.map(|x| x.address)
.collect::<Vec<_>>()
}

pub fn address(&self, id: u32) -> Option<SocketAddr> {
self.id_node_map.get(&id).map(|x| x.address)
}
pub fn meta(&self, node_id: u32) -> Option<&NodeMeta> {
self.id_node_map.get(&node_id)
}

pub fn contains_server(&self, node_id: u32) -> bool {
self.id_node_map.contains_key(&node_id)
}

pub fn add_server(&mut self, n: NodeMeta) {
self.peers.push(n.clone());
self.id_node_map.insert(n.id, n);
}

pub fn peer_count(&self, id: u32) -> usize {
self.peers_for(id).len()
}
}
7 changes: 1 addition & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
// organization : SpacewalkHq
// License : MIT License

pub mod cluster;
pub mod error;
pub mod log;
pub mod network;
pub mod server;
pub mod storage;

/// Helper function to parse the IP address delimited by ':' and return tuple of (ip, port)
fn parse_ip_address(addr: &str) -> (&str, &str) {
let tokens = addr.split(':').collect::<Vec<&str>>();
(tokens[0], tokens[1])
}
Loading

0 comments on commit 83a2e52

Please sign in to comment.