Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Membership Catchup Mechanism #4109

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions crates/examples/infra/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use hotshot_testing::block_builder::{
use hotshot_types::{
consensus::ConsensusMetricsValue,
data::{Leaf, TestableLeaf},
epoch_membership::EpochMembershipCoordinator,
event::{Event, EventType},
network::{BuilderType, NetworkConfig, NetworkConfigFile, NetworkConfigSource},
traits::{
Expand Down Expand Up @@ -386,13 +387,14 @@ pub trait RunDa<
// TODO: we need to pass a valid fallback builder url here somehow
fallback_builder_url: config.config.builder_urls.first().clone(),
};
let epoch_height = config.config.epoch_height;

SystemContext::init(
pk,
sk,
config.node_index,
config.config,
membership,
EpochMembershipCoordinator::new(membership, epoch_height),
Arc::from(network),
initializer,
ConsensusMetricsValue::default(),
Expand Down Expand Up @@ -524,13 +526,11 @@ pub trait RunDa<
}
let num_eligible_leaders = context
.hotshot
.memberships
.read()
.membership_coordinator
.membership_for_epoch(genesis_epoch_from_version::<V, TYPES>())
.await
.committee_leaders(TYPES::View::genesis())
.await
.committee_leaders(
TYPES::View::genesis(),
genesis_epoch_from_version::<V, TYPES>(),
)
.len();
let consensus_lock = context.hotshot.consensus();
let consensus_reader = consensus_lock.read().await;
Expand Down
31 changes: 16 additions & 15 deletions crates/hotshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod documentation;
use committable::Committable;
use futures::future::{select, Either};
use hotshot_types::{
epoch_membership::EpochMembershipCoordinator,
message::UpgradeLock,
traits::{network::BroadcastDelay, node_implementation::Versions},
};
Expand Down Expand Up @@ -56,7 +57,6 @@ use hotshot_types::{
simple_certificate::{NextEpochQuorumCertificate2, QuorumCertificate2, UpgradeCertificate},
traits::{
consensus_api::ConsensusApi,
election::Membership,
network::ConnectedNetwork,
node_implementation::{ConsensusTime, NodeType},
signature_key::SignatureKey,
Expand Down Expand Up @@ -108,7 +108,7 @@ pub struct SystemContext<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
pub network: Arc<I::Network>,

/// Memberships used by consensus
pub memberships: Arc<RwLock<TYPES::Membership>>,
pub membership_coordinator: EpochMembershipCoordinator<TYPES>,

/// the metrics that the implementor is using.
metrics: Arc<ConsensusMetricsValue>,
Expand Down Expand Up @@ -163,7 +163,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> Clone
private_key: self.private_key.clone(),
config: self.config.clone(),
network: Arc::clone(&self.network),
memberships: Arc::clone(&self.memberships),
membership_coordinator: self.membership_coordinator.clone(),
metrics: Arc::clone(&self.metrics),
consensus: self.consensus.clone(),
instance_state: Arc::clone(&self.instance_state),
Expand Down Expand Up @@ -199,7 +199,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
memberships: EpochMembershipCoordinator<TYPES>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -253,7 +253,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
membership_coordinator: EpochMembershipCoordinator<TYPES>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -358,7 +358,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
start_view: initializer.start_view,
start_epoch: initializer.start_epoch,
network,
memberships,
membership_coordinator,
metrics: Arc::clone(&consensus_metrics),
internal_event_stream: (internal_tx, internal_rx.deactivate()),
output_event_stream: (external_tx.clone(), external_rx.clone().deactivate()),
Expand Down Expand Up @@ -509,10 +509,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T

spawn(async move {
let memberships_da_committee_members = api
.memberships
.read()
.membership_coordinator
.membership_for_epoch(epoch)
.await
.da_committee_members(view_number)
.await
.da_committee_members(view_number, epoch)
.iter()
.cloned()
.collect();
Expand Down Expand Up @@ -613,7 +614,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
node_id: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
memberships: EpochMembershipCoordinator<TYPES>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand Down Expand Up @@ -671,7 +672,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> SystemContext<T
hotshot: self.clone().into(),
storage: Arc::clone(&self.storage),
network: Arc::clone(&self.network),
memberships: Arc::clone(&self.memberships),
membership_coordinator: self.membership_coordinator.clone(),
epoch_height: self.config.epoch_height,
};

Expand Down Expand Up @@ -776,7 +777,7 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
memberships: EpochMembershipCoordinator<TYPES>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
Expand All @@ -792,7 +793,7 @@ where
private_key.clone(),
nonce,
config.clone(),
Arc::clone(&memberships),
memberships.clone(),
Arc::clone(&network),
initializer.clone(),
metrics.clone(),
Expand Down Expand Up @@ -853,7 +854,7 @@ where
hotshot: Arc::clone(&left_system_context),
storage: Arc::clone(&left_system_context.storage),
network: Arc::clone(&left_system_context.network),
memberships: Arc::clone(&left_system_context.memberships),
membership_coordinator: left_system_context.membership_coordinator.clone(),
epoch_height,
};

Expand All @@ -865,7 +866,7 @@ where
hotshot: Arc::clone(&right_system_context),
storage: Arc::clone(&right_system_context.storage),
network: Arc::clone(&right_system_context.network),
memberships: Arc::clone(&right_system_context.memberships),
membership_coordinator: right_system_context.membership_coordinator.clone(),
epoch_height,
};

Expand Down
25 changes: 10 additions & 15 deletions crates/hotshot/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
pub mod task_state;
use std::{collections::BTreeMap, fmt::Debug, num::NonZeroUsize, sync::Arc, time::Duration};

use crate::EpochMembershipCoordinator;
use async_broadcast::{broadcast, RecvError};
use async_lock::RwLock;
use async_trait::async_trait;
Expand Down Expand Up @@ -82,7 +83,7 @@ pub fn add_response_task<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versi
) {
let state = NetworkResponseState::<TYPES, V>::new(
handle.hotshot.consensus(),
Arc::clone(&handle.memberships),
handle.membership_coordinator.clone(),
handle.public_key().clone(),
handle.private_key().clone(),
handle.hotshot.id,
Expand Down Expand Up @@ -196,13 +197,12 @@ pub fn add_network_event_task<
>(
handle: &mut SystemContextHandle<TYPES, I, V>,
network: Arc<NET>,
membership: Arc<RwLock<TYPES::Membership>>,
) {
let network_state: NetworkEventTaskState<_, V, _, _> = NetworkEventTaskState {
network,
view: TYPES::View::genesis(),
epoch: genesis_epoch_from_version::<V, TYPES>(),
membership,
membership_coordinator: handle.membership_coordinator.clone(),
storage: Arc::clone(&handle.storage()),
consensus: OuterConsensus::new(handle.consensus()),
upgrade_lock: handle.hotshot.upgrade_lock.clone(),
Expand Down Expand Up @@ -328,20 +328,21 @@ where
private_key: <TYPES::SignatureKey as SignatureKey>::PrivateKey,
nonce: u64,
config: HotShotConfig<TYPES::SignatureKey>,
memberships: Arc<RwLock<TYPES::Membership>>,
memberships: EpochMembershipCoordinator<TYPES>,
network: Arc<I::Network>,
initializer: HotShotInitializer<TYPES>,
metrics: ConsensusMetricsValue,
storage: I::Storage,
marketplace_config: MarketplaceConfig<TYPES, I>,
) -> SystemContextHandle<TYPES, I, V> {
let epoch_height = config.epoch_height;

let hotshot = SystemContext::new(
public_key,
private_key,
nonce,
config,
memberships,
memberships.clone(),
network,
initializer,
metrics,
Expand All @@ -363,7 +364,7 @@ where
hotshot: Arc::clone(&hotshot),
storage: Arc::clone(&hotshot.storage),
network: Arc::clone(&hotshot.network),
memberships: Arc::clone(&hotshot.memberships),
membership_coordinator: memberships.clone(),
epoch_height,
};

Expand Down Expand Up @@ -523,19 +524,17 @@ where
/// Adds the `NetworkEventTaskState` tasks possibly modifying them as well.
fn add_network_event_tasks(&self, handle: &mut SystemContextHandle<TYPES, I, V>) {
let network = Arc::clone(&handle.network);
let memberships = Arc::clone(&handle.memberships);

self.add_network_event_task(handle, network, memberships);
self.add_network_event_task(handle, network);
}

/// Adds a `NetworkEventTaskState` task. Can be reimplemented to modify its behaviour.
fn add_network_event_task(
&self,
handle: &mut SystemContextHandle<TYPES, I, V>,
channel: Arc<<I as NodeImplementation<TYPES>>::Network>,
membership: Arc<RwLock<TYPES::Membership>>,
) {
add_network_event_task(handle, channel, membership);
add_network_event_task(handle, channel);
}
}

Expand Down Expand Up @@ -568,9 +567,5 @@ pub async fn add_network_message_and_request_receiver_tasks<
pub fn add_network_event_tasks<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions>(
handle: &mut SystemContextHandle<TYPES, I, V>,
) {
add_network_event_task(
handle,
Arc::clone(&handle.network),
Arc::clone(&handle.memberships),
);
add_network_event_task(handle, Arc::clone(&handle.network));
}
20 changes: 10 additions & 10 deletions crates/hotshot/src/tasks/task_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
consensus: OuterConsensus::new(handle.hotshot.consensus()),
view: handle.cur_view().await,
delay: handle.hotshot.config.data_request_delay,
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
id: handle.hotshot.id,
Expand All @@ -72,7 +72,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
vote_collectors: BTreeMap::default(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
Expand Down Expand Up @@ -122,7 +122,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
network: Arc::clone(&handle.hotshot.network),
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
id: handle.hotshot.id,
Expand All @@ -140,7 +140,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
Self {
consensus: OuterConsensus::new(handle.hotshot.consensus()),
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
network: Arc::clone(&handle.hotshot.network),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
Expand All @@ -165,7 +165,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
cur_view,
next_view: cur_view,
cur_epoch: handle.cur_epoch().await,
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
num_timeouts_tracked: 0,
Expand All @@ -192,7 +192,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
consensus: OuterConsensus::new(handle.hotshot.consensus()),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
instance_state: handle.hotshot.instance_state(),
Expand Down Expand Up @@ -237,7 +237,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
latest_voted_view: handle.cur_view().await,
vote_dependencies: BTreeMap::new(),
network: Arc::clone(&handle.hotshot.network),
membership: Arc::clone(&handle.hotshot.memberships),
membership: handle.hotshot.membership_coordinator.clone(),
drb_computation: None,
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
id: handle.hotshot.id,
Expand All @@ -262,7 +262,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
proposal_dependencies: BTreeMap::new(),
consensus: OuterConsensus::new(consensus),
instance_state: handle.hotshot.instance_state(),
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
public_key: handle.public_key().clone(),
private_key: handle.private_key().clone(),
storage: Arc::clone(&handle.storage),
Expand All @@ -289,7 +289,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
consensus: OuterConsensus::new(consensus),
cur_view: handle.cur_view().await,
cur_epoch: handle.cur_epoch().await,
membership: Arc::clone(&handle.hotshot.memberships),
membership: handle.hotshot.membership_coordinator.clone(),
timeout: handle.hotshot.config.next_view_timeout,
output_event_stream: handle.hotshot.external_event_stream.0.clone(),
storage: Arc::clone(&handle.storage),
Expand All @@ -313,7 +313,7 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES>, V: Versions> CreateTaskState
private_key: handle.private_key().clone(),
instance_state: handle.hotshot.instance_state(),
network: Arc::clone(&handle.hotshot.network),
membership: Arc::clone(&handle.hotshot.memberships),
membership_coordinator: handle.hotshot.membership_coordinator.clone(),
vote_collectors: BTreeMap::default(),
next_epoch_vote_collectors: BTreeMap::default(),
timeout_vote_collectors: BTreeMap::default(),
Expand Down
Loading
Loading