Skip to content

Commit

Permalink
implement broadcast job
Browse files Browse the repository at this point in the history
  • Loading branch information
eaneto committed Apr 26, 2024
1 parent e6ee61c commit c409093
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 44 deletions.
52 changes: 49 additions & 3 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use bytes::{BufMut, BytesMut};
use clap::Parser;
use clokwerk::{AsyncScheduler, TimeUnits};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{TcpListener, TcpStream},
Expand Down Expand Up @@ -69,11 +68,12 @@ async fn main() {
// other node's response and become a follower.
server.lock().await.start_election().await;

// TODO: Health job
// TODO: Broadcast job
let server_clone = server.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
//server.lock().await.
server_clone.lock().await.broadcast_current_log().await;
}
});

Expand Down Expand Up @@ -182,6 +182,25 @@ async fn handle_connection(
log_length,
last_term,
} => request_vote(server, node_id, current_term, log_length, last_term).await,
Command::LogRequest {
leader_id,
term,
prefix_length,
prefix_term,
leader_commit,
suffix,
} => {
log_request(
server,
leader_id,
term,
prefix_length,
prefix_term,
leader_commit,
suffix,
)
.await
}
_ => {
debug!("command not created yet");
let message = "Command not created yet";
Expand Down Expand Up @@ -336,6 +355,33 @@ async fn request_vote(
buf
}

async fn log_request(
server: Arc<Mutex<raft::Server>>,
leader_id: u64,
term: u64,
prefix_length: usize,
prefix_term: u64,
leader_commit: u64,
suffix: Vec<raft::LogEntry>,
) -> Vec<u8> {
let log_request = raft::LogRequest {
leader_id,
term,
prefix_length,
prefix_term,
leader_commit,
suffix,
};
let log_response = server.lock().await.receive_log_request(log_request).await;
let encoded_log_response = bincode::serialize(&log_response).unwrap();

let mut buf = Vec::new();
buf.extend((0_u8).to_be_bytes());
buf.extend(encoded_log_response.len().to_be_bytes());
buf.extend(encoded_log_response);
buf
}

/// Builds the standard error response, the first byte is always an u8
/// 1 to represent the error, the next 8 bytes are the message size
/// followed by the actual error message.
Expand Down
23 changes: 23 additions & 0 deletions src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ pub fn parse_command(buf: &[u8]) -> Result<Command, &str> {
FETCH_COMMAND_BYTE => parse_fetch_log_command(buf),
ACK_COMMAND_BYTE => parse_ack_command(buf),
REQUEST_VOTE_COMMAND_BYTE => parse_request_vote_command(buf),
LOG_REQUEST_COMMAND_BYTE => parse_log_request_command(buf),
_ => Ok(Command::Unknown),
}
}
Expand Down Expand Up @@ -244,6 +245,28 @@ fn parse_request_vote_command(buf: &[u8]) -> Result<Command, &str> {
});
}

fn parse_log_request_command(buf: &[u8]) -> Result<Command, &str> {
let length = match buf.get(1..9) {
Some(length) => length,
None => {
return Err("Unparseable command, unable to parse length");
}
};

let length = usize::from_be_bytes(length.try_into().unwrap());
let log_request_buffer = buf.get(9..(9 + length)).unwrap();
let log_request: raft::LogRequest = bincode::deserialize(log_request_buffer).unwrap();

return Ok(Command::LogRequest {
leader_id: log_request.leader_id,
term: log_request.term,
prefix_length: log_request.prefix_length,
prefix_term: log_request.prefix_term,
leader_commit: log_request.leader_commit,
suffix: log_request.suffix,
});
}

#[cfg(test)]
mod tests {
use crate::command::*;
Expand Down
144 changes: 103 additions & 41 deletions src/raft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct Server {
// Need to be stored on disk
current_term: u64,
voted_for: Option<u64>,
// TODO: Maybe this could be a skiplist
log: Vec<LogEntry>,
// Can be stored in-memory
state: State,
Expand Down Expand Up @@ -230,23 +231,31 @@ impl Server {

// Log replication

async fn broadcast_message(&mut self, message: BytesMut) {
pub async fn broadcast_message(&mut self, message: BytesMut) {
if matches!(self.state, State::Leader) {
self.log.push(LogEntry {
term: self.current_term,
message,
});
self.acked_length.insert(self.id, self.current_term);
for (_, node) in &self.nodes {
self.replicate_log(node).await;
}
self.broadcast_current_log().await;
} else {
unimplemented!("Leader forwarding not implemented yet")
}
}

pub async fn broadcast_current_log(&mut self) {
if matches!(self.state, State::Leader) {
for (_, node) in &self.nodes.clone() {
if let Ok(response) = self.replicate_log(node).await {
self.process_log_response(response).await;
}
}
}
}

// Can only be called by the leader
async fn replicate_log(&self, node: &Node) {
async fn replicate_log(&self, node: &Node) -> Result<LogResponse, &str> {
let prefix_length = self.sent_length.get(&node.id).unwrap().clone() as usize;
let suffix = &self.log[prefix_length..];
let prefix_term = if prefix_length > 0 {
Expand All @@ -262,44 +271,63 @@ impl Server {
leader_commit: self.commit_length,
suffix: suffix.to_vec(),
};
// TODO: Send request

self.send_log_request(&node, &request).await
}

async fn _receive_log_request(&mut self, log_request: LogRequest) {
if log_request.term > self.current_term {
self.current_term = log_request.term;
self.voted_for = None;
// TODO: Cancel election timer
return;
}
if log_request.term == self.current_term {
self.state = State::Follower;
self.current_leader = log_request.leader_id;
}
async fn send_log_request(
&self,
node: &Node,
log_request: &LogRequest,
) -> Result<LogResponse, &str> {
let command_byte = 5_u8.to_be_bytes();
let encoded_request = bincode::serialize(log_request).unwrap();
let mut buf = Vec::new();
buf.extend(command_byte);
buf.extend(encoded_request.len().to_be_bytes());
buf.extend(encoded_request);

let ok = (self.log.len() >= log_request.prefix_length)
&& (log_request.prefix_length == 0
|| self.log[log_request.prefix_length - 1].term == log_request.prefix_term);
if log_request.term == self.current_term && ok {
let ack = log_request.prefix_length + log_request.suffix.len();
self.send_append_entries(log_request).await;
let response = LogResponse {
node_id: self.id,
term: self.current_term,
ack: ack as u64,
successful: true,
// TODO: Retry
let mut stream = match TcpStream::connect(&node.address).await {
Ok(stream) => stream,
Err(_) => {
error!("Can't connect to node at {}", &node.address);
return Err("Can't connect to node");
}
};

// What should be done in case of failure?
match stream.write_all(&buf).await {
Ok(()) => {
trace!("Successfully sent request to node {}", &node.id)
}
Err(_) => {
error!("Unable to send request to node {}", &node.id);
return Err("Unable to send request to node");
}
};
let mut buf = [0; 1024];
match stream.read(&mut buf).await {
Ok(_) => (),
Err(_) => {
error!("Can't read response from client {}", &node.id);
return Err("Can't read response from client");
}
};
if buf[0] == 0 {
let length = buf.get(1..9).unwrap();
let length = usize::from_be_bytes(length.try_into().unwrap());
let encoded_response = match buf.get(9..(9 + length)) {
Some(response) => response,
None => return Err("Incomplete response, unable to parse log response"),
};
Ok(bincode::deserialize(encoded_response).unwrap())
} else {
let response = LogResponse {
node_id: self.id,
term: self.current_term,
ack: 0,
successful: false,
};
Err("Response is not successful")
}
}

async fn _receive_log_response(&mut self, log_response: LogResponse) {
async fn process_log_response(&mut self, log_response: LogResponse) {
if log_response.term > self.current_term {
self.current_term = log_response.term;
self.state = State::Follower;
Expand All @@ -323,11 +351,45 @@ impl Server {
self.sent_length.get(&log_response.node_id).unwrap() - 1,
);
let node = self.nodes.get(&log_response.node_id).unwrap();
// TODO: ?
self.replicate_log(node).await;

Check warning on line 355 in src/raft.rs

View workflow job for this annotation

GitHub Actions / Rog

unused `Result` that must be used

Check warning on line 355 in src/raft.rs

View workflow job for this annotation

GitHub Actions / Rog

unused `Result` that must be used
}
}
}

pub async fn receive_log_request(&mut self, log_request: LogRequest) -> LogResponse {
if log_request.term > self.current_term {
self.current_term = log_request.term;
self.voted_for = None;
// TODO: Cancel election timer
}
if log_request.term == self.current_term {
self.state = State::Follower;
self.current_leader = log_request.leader_id;
}

let ok = (self.log.len() >= log_request.prefix_length)
&& (log_request.prefix_length == 0
|| self.log[log_request.prefix_length - 1].term == log_request.prefix_term);
if log_request.term == self.current_term && ok {
let ack = log_request.prefix_length + log_request.suffix.len();
self.send_append_entries(log_request).await;
LogResponse {
node_id: self.id,
term: self.current_term,
ack: ack as u64,
successful: true,
}
} else {
LogResponse {
node_id: self.id,
term: self.current_term,
ack: 0,
successful: false,
}
}
}

async fn _commit_log_entries(&mut self) {
while self.commit_length < self.log.len() as u64 {
let mut acks = 0;
Expand Down Expand Up @@ -402,12 +464,12 @@ pub struct LogEntry {
// RPC append entries
#[derive(Serialize, Deserialize)]
pub struct LogRequest {
leader_id: u64,
term: u64,
prefix_length: usize,
prefix_term: u64,
leader_commit: u64,
suffix: Vec<LogEntry>,
pub leader_id: u64,
pub term: u64,
pub prefix_length: usize,
pub prefix_term: u64,
pub leader_commit: u64,
pub suffix: Vec<LogEntry>,
}

#[derive(Serialize, Deserialize)]
Expand Down

0 comments on commit c409093

Please sign in to comment.