Skip to content

Commit

Permalink
cargo fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
tbro committed Nov 20, 2024
1 parent 229e1b6 commit 0e1c8e9
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 132 deletions.
254 changes: 127 additions & 127 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
{
/// Adds a hotshot consensus-related task to the `SystemContextHandle`.
pub fn add_task<S: TaskState<Event = HotShotEvent<TYPES>> + 'static>(&mut self, task_state: S) {
let task = Task::new(
task_state,
self.internal_event_stream.0.clone(),
self.internal_event_stream.1.activate_cloned(),
);
let task = Task::new(
task_state,
self.internal_event_stream.0.clone(),
self.internal_event_stream.1.activate_cloned(),
);

self.consensus_registry.run_task(task);
}
self.consensus_registry.run_task(task);
}

/// obtains a stream to expose to the user
pub fn event_stream(&self) -> impl Stream<Item = Event<TYPES>> {
self.output_event_stream.1.activate_cloned()
}
self.output_event_stream.1.activate_cloned()
}

/// Message other participents with a serialized message from the application
/// Receivers of this message will get an `Event::ExternalMessageReceived` via
Expand All @@ -101,35 +101,35 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
/// # Errors
/// Errors if serializing the request fails, or the request fails to be sent
pub async fn send_external_message(
&self,
msg: Vec<u8>,
recipients: RecipientList<TYPES::SignatureKey>,
) -> Result<()> {
let message = Message {
sender: self.public_key().clone(),
kind: MessageKind::External(msg),
};
let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;

match recipients {
RecipientList::Broadcast => {
self.network
.broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None)
.await?;
}
RecipientList::Direct(recipient) => {
self.network
.direct_message(serialized_message, recipient)
.await?;
}
RecipientList::Many(recipients) => {
self.network
.da_broadcast_message(serialized_message, recipients, BroadcastDelay::None)
.await?;
&self,
msg: Vec<u8>,
recipients: RecipientList<TYPES::SignatureKey>,
) -> Result<()> {
let message = Message {
sender: self.public_key().clone(),
kind: MessageKind::External(msg),
};
let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;

match recipients {
RecipientList::Broadcast => {
self.network
.broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None)
.await?;
}
RecipientList::Direct(recipient) => {
self.network
.direct_message(serialized_message, recipient)
.await?;
}
RecipientList::Many(recipients) => {
self.network
.da_broadcast_message(serialized_message, recipients, BroadcastDelay::None)
.await?;
}
}
Ok(())
}
Ok(())
}

/// Request a proposal from the all other nodes. Will block until some node
/// returns a valid proposal with the requested commitment. If nobody has the
Expand All @@ -138,70 +138,70 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
/// # Errors
/// Errors if signing the request for proposal fails
pub fn request_proposal(
&self,
view: TYPES::View,
epoch: TYPES::Epoch,
leaf_commitment: Commitment<Leaf2<TYPES>>,
) -> Result<impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposal2<TYPES>>>>>
{
// We need to be able to sign this request before submitting it to the network. Compute the
// payload first.
let signed_proposal_request = ProposalRequestPayload {
view_number: view,
key: self.public_key().clone(),
};

// Finally, compute the signature for the payload.
let signature = TYPES::SignatureKey::sign(
self.private_key(),
signed_proposal_request.commit().as_ref(),
)?;

let mem = (*self.memberships).clone();
let receiver = self.internal_event_stream.1.activate_cloned();
let sender = self.internal_event_stream.0.clone();
Ok(async move {
// First, broadcast that we need a proposal
broadcast_event(
HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
&sender,
)
.await;
loop {
let hs_event = EventDependency::new(
receiver.clone(),
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event {
quorum_proposal.data.view_number() == view
} else {
false
}
}),
&self,
view: TYPES::View,
epoch: TYPES::Epoch,
leaf_commitment: Commitment<Leaf2<TYPES>>,
) -> Result<impl futures::Future<Output = Result<Proposal<TYPES, QuorumProposal2<TYPES>>>>>
{
// We need to be able to sign this request before submitting it to the network. Compute the
// payload first.
let signed_proposal_request = ProposalRequestPayload {
view_number: view,
key: self.public_key().clone(),
};

// Finally, compute the signature for the payload.
let signature = TYPES::SignatureKey::sign(
self.private_key(),
signed_proposal_request.commit().as_ref(),
)?;

let mem = (*self.memberships).clone();
let receiver = self.internal_event_stream.1.activate_cloned();
let sender = self.internal_event_stream.0.clone();
Ok(async move {
// First, broadcast that we need a proposal
broadcast_event(
HotShotEvent::QuorumProposalRequestSend(signed_proposal_request, signature).into(),
&sender,
)
.completed()
.await
.ok_or(anyhow!("Event dependency failed to get event"))?;

// Then, if it's `Some`, make sure that the data is correct

if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = hs_event.as_ref()
{
// Make sure that the quorum_proposal is valid
if let Err(err) = quorum_proposal.validate_signature(&mem, epoch) {
tracing::warn!("Invalid Proposal Received after Request. Err {:?}", err);
continue;
}
let proposed_leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
let commit = proposed_leaf.commit();
if commit == leaf_commitment {
return Ok(quorum_proposal.clone());
.await;
loop {
let hs_event = EventDependency::new(
receiver.clone(),
Box::new(move |event| {
let event = event.as_ref();
if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = event {
quorum_proposal.data.view_number() == view
} else {
false
}
}),
)
.completed()
.await
.ok_or(anyhow!("Event dependency failed to get event"))?;

// Then, if it's `Some`, make sure that the data is correct

if let HotShotEvent::QuorumProposalResponseRecv(quorum_proposal) = hs_event.as_ref()
{
// Make sure that the quorum_proposal is valid
if let Err(err) = quorum_proposal.validate_signature(&mem, epoch) {
tracing::warn!("Invalid Proposal Received after Request. Err {:?}", err);
continue;
}
let proposed_leaf = Leaf2::from_quorum_proposal(&quorum_proposal.data);
let commit = proposed_leaf.commit();
if commit == leaf_commitment {
return Ok(quorum_proposal.clone());
}
tracing::warn!("Proposal receied from request has different commitment than expected.\nExpected = {:?}\nReceived{:?}", leaf_commitment, commit);
}
tracing::warn!("Proposal receied from request has different commitment than expected.\nExpected = {:?}\nReceived{:?}", leaf_commitment, commit);
}
}
})
}
})
}

/// HACK so we can know the types when running tests...
/// there are two cleaner solutions:
Expand Down Expand Up @@ -234,8 +234,8 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
/// # Panics
/// If the internal consensus is in an inconsistent state.
pub async fn decided_state(&self) -> Arc<TYPES::ValidatedState> {
self.hotshot.decided_state().await
}
self.hotshot.decided_state().await
}

/// Get the validated state from a given `view`.
///
Expand All @@ -245,16 +245,16 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
/// [`decided_state`](Self::decided_state)) or if there is no path for the requested
/// view to ever be decided.
pub async fn state(&self, view: TYPES::View) -> Option<Arc<TYPES::ValidatedState>> {
self.hotshot.state(view).await
}
self.hotshot.state(view).await
}

/// Get the last decided leaf of the [`SystemContext`] instance.
///
/// # Panics
/// If the internal consensus is in an inconsistent state.
pub async fn decided_leaf(&self) -> Leaf2<TYPES> {
self.hotshot.decided_leaf().await
}
self.hotshot.decided_leaf().await
}

/// Tries to get the most recent decided leaf, returning instantly
/// if we can't acquire the lock.
Expand All @@ -275,11 +275,11 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
/// Will return a [`HotShotError`] if some error occurs in the underlying
/// [`SystemContext`] instance.
pub async fn submit_transaction(
&self,
tx: TYPES::Transaction,
) -> Result<(), HotShotError<TYPES>> {
self.hotshot.publish_transaction_async(tx).await
}
&self,
tx: TYPES::Transaction,
) -> Result<(), HotShotError<TYPES>> {
self.hotshot.publish_transaction_async(tx).await
}

/// Get the underlying consensus state for this [`SystemContext`]
#[must_use]
Expand All @@ -289,25 +289,25 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>

/// Shut down the the inner hotshot and wait until all background threads are closed.
pub async fn shut_down(&mut self) {
// this is required because `SystemContextHandle` holds an inactive receiver and
// `broadcast_direct` below can wait indefinitely
self.internal_event_stream.0.set_await_active(false);
let _ = self
.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::Shutdown))
.await
.inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));

tracing::error!("Shutting down the network!");
self.hotshot.network.shut_down().await;

tracing::error!("Shutting down network tasks!");
self.network_registry.shutdown().await;

tracing::error!("Shutting down consensus!");
self.consensus_registry.shutdown().await;
}
// this is required because `SystemContextHandle` holds an inactive receiver and
// `broadcast_direct` below can wait indefinitely
self.internal_event_stream.0.set_await_active(false);
let _ = self
.internal_event_stream
.0
.broadcast_direct(Arc::new(HotShotEvent::Shutdown))
.await
.inspect_err(|err| tracing::error!("Failed to send shutdown event: {err}"));

tracing::error!("Shutting down the network!");
self.hotshot.network.shut_down().await;

tracing::error!("Shutting down network tasks!");
self.network_registry.shutdown().await;

tracing::error!("Shutting down consensus!");
self.consensus_registry.shutdown().await;
}

/// return the timeout for a view of the underlying `SystemContext`
#[must_use]
Expand Down
4 changes: 1 addition & 3 deletions crates/libp2p-networking/src/network/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,9 +522,7 @@ mod test {

use hotshot_example_types::node_types::TestTypes;
use hotshot_types::{
light_client::StateVerKey,
signature_key::BLSPubKey,
traits::signature_key::SignatureKey,
light_client::StateVerKey, signature_key::BLSPubKey, traits::signature_key::SignatureKey,
PeerConfig,
};
use libp2p::{core::transport::dummy::DummyTransport, quic::Connection};
Expand Down
5 changes: 3 additions & 2 deletions crates/types/src/simple_certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ use crate::{
data::serialize_signature2,
message::UpgradeLock,
simple_vote::{
DaData, QuorumData, QuorumData2, QuorumMaker, TimeoutData, UpgradeProposalData, VersionedVoteData,
ViewSyncCommitData, ViewSyncFinalizeData, ViewSyncPreCommitData, Voteable,
DaData, QuorumData, QuorumData2, QuorumMaker, TimeoutData, UpgradeProposalData,
VersionedVoteData, ViewSyncCommitData, ViewSyncFinalizeData, ViewSyncPreCommitData,
Voteable,
},
traits::{
election::Membership,
Expand Down

0 comments on commit 0e1c8e9

Please sign in to comment.