Skip to content

Commit

Permalink
dynamic cluster membership (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
vaibhawvipul authored Aug 1, 2024
1 parent 6c5c352 commit 7462d6b
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 10 deletions.
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,22 @@ It is asynchronous(built on tokio runtime) and supports zero-copy. It does not a
- [x] Leadership preference
- [x] Log compaction
- [x] Tigerbeetle style replica repair
- [x] Dynamic cluster membership changes support

## To-Do
- [ ] Production-ready
- [ ] Test replica repair thoroughly
- [ ] Test for dynamic cluster membership changes
- [ ] io_uring support
- [ ] CI integration improvements
- [ ] Complete batch write implementation
- [ ] Improve Log compaction
- [ ] Improve error handling
- [ ] RDMA support
- [ ] Add more comprehensive tests
- [ ] Enhance documentation
- [ ] Implement dynamic cluster membership changes
- [ ] Deterministic Simulation Testing
- [ ] Benchmarking


## How to Run the Project
1. Ensure you have Rust installed. If not, follow the instructions [here](https://www.rust-lang.org/tools/install).
2. Clone the repository:
Expand Down
120 changes: 113 additions & 7 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ enum MesageType {
ClientResponse,
RepairRequest,
RepairResponse,
// dynamic membership changes
JoinRequest,
JoinResponse,
}

#[derive(Debug)]
Expand Down Expand Up @@ -439,6 +442,8 @@ impl Server {
7 => MesageType::ClientResponse,
8 => MesageType::RepairRequest,
9 => MesageType::RepairResponse,
10 => MesageType::JoinRequest,
11 => MesageType::JoinResponse,
_ => return,
};

Expand Down Expand Up @@ -475,12 +480,17 @@ impl Server {
}
}
MesageType::RepairRequest => {
// TODO: get implementation from user based on the application
info!(self.log, "Received repair request: {:?}", data);
self.handle_repair_request(&data).await;
}
MesageType::RepairResponse => {
// TODO: get implementation from user based on the application
info!(self.log, "Received repair response: {:?}", data);
self.handle_repair_response(&data).await;
}
MesageType::JoinRequest => {
info!(self.log, "Received join request: {:?}", data);
self.handle_join_request(&data).await;
}
MesageType::JoinResponse => {
self.handle_join_response(&data).await;
}
}
}
Expand Down Expand Up @@ -716,8 +726,11 @@ impl Server {
// Noop
}

#[allow(dead_code)]
async fn handle_repair_request(&mut self, data: &[u8]) {
if self.state.state != RaftState::Follower || self.state.state != RaftState::Leader {
return;
}

let peer_id = u32::from_be_bytes(data[0..4].try_into().unwrap());

let log_byte = self.storage.retrieve().await;
Expand All @@ -736,7 +749,6 @@ impl Server {
repair_data.extend_from_slice(entry);
}

// send log entries to peer
let mut response = [
self.id.to_be_bytes(),
self.state.current_term.to_be_bytes(),
Expand All @@ -760,9 +772,13 @@ impl Server {
}
}

#[allow(dead_code)]
async fn handle_repair_response(&mut self, data: &[u8]) {
if self.state.state != RaftState::Leader {
return;
}

if self.storage.turned_malicious().await.is_err() {
self.state.state = RaftState::Follower;
return;
}

Expand All @@ -777,6 +793,96 @@ impl Server {
}
}

async fn handle_join_request(&mut self, data: &[u8]) {
if self.state.state != RaftState::Leader {
return;
}

let node_id = u32::from_be_bytes(data[0..4].try_into().unwrap());
let term = u32::from_be_bytes(data[4..8].try_into().unwrap());
let node_ip_address = String::from_utf8(data[8..].to_vec()).unwrap();

if self.config.cluster_nodes.contains(&node_id) {
error!(
self.log,
"Node already exists in the cluster, Ignoring join request."
);
return;
}

if term != 0 {
error!(self.log, "Invalid term for join request, term should be 0.");
return;
}

self.config.cluster_nodes.push(node_id);
self.config
.id_to_address_mapping
.insert(node_id, node_ip_address);

let mut response = [
self.id.to_be_bytes(),
self.state.current_term.to_be_bytes(),
11u32.to_be_bytes(),
]
.concat();
response.extend_from_slice(&self.state.commit_index.to_be_bytes());
response.extend_from_slice(&self.state.previous_log_index.to_be_bytes());
response.extend_from_slice(&self.peers.len().to_be_bytes());

let peer_address = self.config.id_to_address_mapping.get(&node_id).unwrap();
let (peer_ip, peer_port) = parse_ip_address(peer_address);
if let Err(e) = self
.network_manager
.send(peer_ip, peer_port, &response)
.await
{
error!(self.log, "Failed to send join response: {}", e);
}
}

async fn handle_join_response(&mut self, data: &[u8]) {
if self.state.state != RaftState::Follower {
return;
}

let leader_id = u32::from_be_bytes(data[0..4].try_into().unwrap());
let current_term = u32::from_be_bytes(data[4..8].try_into().unwrap());
let commit_index = u32::from_be_bytes(data[8..12].try_into().unwrap());
let previous_log_index = u32::from_be_bytes(data[12..16].try_into().unwrap());
let peers_count = u32::from_be_bytes(data[16..].try_into().unwrap());

self.state.current_term = current_term;
self.state.commit_index = commit_index;
self.state.previous_log_index = previous_log_index;

self.peers.clear();
for i in 0..peers_count {
self.peers.push(i);
}

let request_data = [
self.id.to_be_bytes(),
self.state.current_term.to_be_bytes(),
8u32.to_be_bytes(),
]
.concat();
let leader_address = self.config.id_to_address_mapping.get(&leader_id).unwrap();
let (leader_ip, leader_port) = parse_ip_address(leader_address);
if let Err(e) = self
.network_manager
.send(leader_ip, leader_port, &request_data)
.await
{
error!(self.log, "Failed to send repair request: {}", e);
}

info!(
self.log,
"Joined the cluster with leader: {}, own id: {}", leader_id, self.id
);
}

async fn persist_to_disk(&mut self, id: u32, data: &[u8]) {
info!(
self.log,
Expand Down

0 comments on commit 7462d6b

Please sign in to comment.