diff --git a/harness/tests/integration_cases/test_raft_flow_control.rs b/harness/tests/integration_cases/test_raft_flow_control.rs index bc55583..5f05f27 100644 --- a/harness/tests/integration_cases/test_raft_flow_control.rs +++ b/harness/tests/integration_cases/test_raft_flow_control.rs @@ -259,3 +259,34 @@ fn test_msg_app_flow_control_with_freeing_resources() { 3: cap=256/start=0/count=2/buffer=[2,3] */ } + +// Test progress can be disabled with `adjust_max_inflight_msgs(, 0)`. +#[test] +fn test_disable_progress() { + let l = default_logger(); + let mut r = new_test_raft(1, vec![1, 2], 5, 1, new_storage(), &l); + r.become_candidate(); + r.become_leader(); + + r.mut_prs().get_mut(2).unwrap().become_replicate(); + + // Disable the progress 2. Internal `free`s shouldn't fail. + r.adjust_max_inflight_msgs(2, 0); + r.step(new_message(2, 1, MessageType::MsgHeartbeatResponse, 0)) + .unwrap(); + assert!(r.prs().get(2).unwrap().ins.full()); + assert_eq!(r.prs().get(2).unwrap().ins.count(), 0); + + // Progress 2 is disabled. + let msgs = r.read_messages(); + assert_eq!(msgs.len(), 0); + + // After the progress gets enabled and a heartbeat response is received, + // its leader can continue to append entries to it. + r.adjust_max_inflight_msgs(2, 10); + r.step(new_message(2, 1, MessageType::MsgHeartbeatResponse, 0)) + .unwrap(); + let msgs = r.read_messages(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].get_msg_type(), MessageType::MsgAppend); +} diff --git a/src/tracker/inflights.rs b/src/tracker/inflights.rs index a9626d3..42447fc 100644 --- a/src/tracker/inflights.rs +++ b/src/tracker/inflights.rs @@ -154,8 +154,10 @@ impl Inflights { /// Frees the first buffer entry. #[inline] pub fn free_first_one(&mut self) { - let start = self.buffer[self.start]; - self.free_to(start); + if self.count > 0 { + let start = self.buffer[self.start]; + self.free_to(start); + } } /// Frees all inflights.