Skip to content

Commit

Permalink
fix amnesia recovery boot run (#4768)
Browse files Browse the repository at this point in the history
  • Loading branch information
bingyanglin authored Jan 23, 2025
1 parent 656e74b commit 07983d5
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 13 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/commit_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl CommitConsumerMonitor {
}
}

pub(crate) fn highest_handled_commit(&self) -> CommitIndex {
pub fn highest_handled_commit(&self) -> CommitIndex {
*self.highest_handled_commit.borrow()
}

Expand Down
44 changes: 35 additions & 9 deletions crates/iota-core/src/consensus_manager/mysticeti_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority};
use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority};
use fastcrypto::ed25519;
use iota_config::NodeConfig;
use iota_metrics::{RegistryID, RegistryService, monitored_mpsc::unbounded_channel};
Expand Down Expand Up @@ -49,6 +49,7 @@ pub struct MysticetiManager {
client: Arc<LazyMysticetiClient>,
// TODO: switch to parking_lot::Mutex.
consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
}

impl MysticetiManager {
Expand All @@ -74,6 +75,7 @@ impl MysticetiManager {
client,
consensus_handler: Mutex::new(None),
boot_counter: Mutex::new(0),
consumer_monitor: ArcSwapOption::empty(),
}
}

Expand Down Expand Up @@ -158,9 +160,36 @@ impl ConsensusManagerTrait for MysticetiManager {
);
let monitor = consumer.monitor();

// TODO(mysticeti): Investigate if we need to return potential errors from
// AuthorityNode and add retries here?
let boot_counter = *self.boot_counter.lock().await;
// If there is a previous consumer monitor, it indicates that the consensus
// engine has been restarted, due to an epoch change. However, that on its
// own doesn't tell us much whether it participated on an active epoch or an old
// one. We need to check if it has handled any commits to determine this.
// If indeed any commits did happen, then we assume that node did participate on
// previous run.
let participated_on_previous_run =
if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
previous_monitor.highest_handled_commit() > 0
} else {
false
};

// Increment the boot counter only if the consensus successfully participated in
// the previous run. This is typical during normal epoch changes, where
// the node restarts as expected, and the boot counter is incremented to prevent
// amnesia recovery on the next start. If the node is recovering from a
// restore process and catching up across multiple epochs, it won't handle any
// commits until it reaches the last active epoch. In this scenario, we
// do not increment the boot counter, as we need amnesia recovery to run.
let mut boot_counter = self.boot_counter.lock().await;
if participated_on_previous_run {
*boot_counter += 1;
} else {
info!(
"Node has not participated in previous run. Boot counter will not increment {}",
*boot_counter
);
}

let authority = ConsensusAuthority::start(
network_type,
own_index,
Expand All @@ -172,15 +201,11 @@ impl ConsensusManagerTrait for MysticetiManager {
Arc::new(tx_validator.clone()),
consumer,
registry.clone(),
boot_counter,
*boot_counter,
)
.await;
let client = authority.transaction_client();

// Now increment the boot counter
let mut boot_counter = self.boot_counter.lock().await;
*boot_counter += 1;

let registry_id = self.registry_service.add(registry.clone());

let registered_authority = Arc::new((authority, registry_id));
Expand All @@ -191,6 +216,7 @@ impl ConsensusManagerTrait for MysticetiManager {

// spin up the new mysticeti consensus handler to listen for committed sub dags
let handler = MysticetiConsensusHandler::new(consensus_handler, commit_receiver, monitor);

let mut consensus_handler = self.consensus_handler.lock().await;
*consensus_handler = Some(handler);

Expand Down
21 changes: 18 additions & 3 deletions crates/iota-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,32 @@ async fn test_mysticeti_manager() {

// THEN
assert!(manager.is_running().await);
let boot_counter = *manager.boot_counter.lock().await;
if i == 1 || i == 2 {
assert_eq!(boot_counter, 0);
} else {
assert_eq!(boot_counter, 1);
}

// Now try to shut it down
sleep(Duration::from_secs(1)).await;

// Simulate a commit by bumping the handled commit index so we can ensure that
// boot counter increments only after the first run. Practically we want
// to simulate a case where consensus engine restarts when no commits have
// happened before for first run.
if i > 1 {
let monitor = manager
.consumer_monitor
.load_full()
.expect("A consumer monitor should have been initialised");
monitor.set_highest_handled_commit(100);
}

// WHEN
manager.shutdown().await;

// THEN
assert!(!manager.is_running().await);

let boot_counter = *manager.boot_counter.lock().await;
assert_eq!(boot_counter, i);
}
}

0 comments on commit 07983d5

Please sign in to comment.