Skip to content

Commit

Permalink
Refactor main loop in raft node thread for better readability
Browse files Browse the repository at this point in the history
Signed-off-by: hhwyt <[email protected]>
  • Loading branch information
hhwyt committed Sep 4, 2024
1 parent 2aefbf6 commit 44c14f6
Showing 1 changed file with 40 additions and 37 deletions.
77 changes: 40 additions & 37 deletions examples/five_mem_node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,51 +64,54 @@ fn main() {
let rx_stop_clone = Arc::clone(&rx_stop);
let logger = logger.clone();
// Here we spawn the node on a new thread and keep a handle so we can join on them later.
let handle = thread::spawn(move || loop {
thread::sleep(Duration::from_millis(10));
let handle = thread::spawn(move || {
// The main loop of the node.
loop {
// Step raft messages.
match node.my_mailbox.try_recv() {
Ok(msg) => node.step(msg, &logger),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return,
thread::sleep(Duration::from_millis(10));
loop {
// Step raft messages.
match node.my_mailbox.try_recv() {
Ok(msg) => node.step(msg, &logger),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => return,
}
}
}

let raft_group = match node.raft_group {
Some(ref mut r) => r,
// When Node::raft_group is `None` it means the node is not initialized.
_ => continue,
};
let raft_group = match node.raft_group {
Some(ref mut r) => r,
// When Node::raft_group is `None` it means the node is not initialized.
_ => continue,
};

if t.elapsed() >= Duration::from_millis(100) {
// Tick the raft.
raft_group.tick();
t = Instant::now();
}
if t.elapsed() >= Duration::from_millis(100) {
// Tick the raft.
raft_group.tick();
t = Instant::now();
}

// Let the leader pick pending proposals from the global queue.
if raft_group.raft.state == StateRole::Leader {
// Handle new proposals.
let mut proposals = proposals.lock().unwrap();
for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {
propose(raft_group, p);
// Let the leader pick pending proposals from the global queue.
if raft_group.raft.state == StateRole::Leader {
// Handle new proposals.
let mut proposals = proposals.lock().unwrap();
for p in proposals.iter_mut().skip_while(|p| p.proposed > 0) {
propose(raft_group, p);
}
}
}

// Handle readies from the raft.
on_ready(
raft_group,
&mut node.kv_pairs,
&node.mailboxes,
&proposals,
&logger,
);
// Handle readies from the raft.
on_ready(
raft_group,
&mut node.kv_pairs,
&node.mailboxes,
&proposals,
&logger,
);

// Check control signals from the main thread.
if check_signals(&rx_stop_clone) {
return;
};
// Check control signals from the main thread.
if check_signals(&rx_stop_clone) {
return;
};
}
});
handles.push(handle);
}
Expand Down

0 comments on commit 44c14f6

Please sign in to comment.