Skip to content

Commit

Permalink
feat(consensus): add sync sender to context to broadcast on decision (#…
Browse files Browse the repository at this point in the history
…413)

This is optional since outside of testing mode sync should be its own component of the node.
  • Loading branch information
matan-starkware authored Aug 15, 2024
1 parent 010f5d4 commit 8e9908b
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 24 deletions.
1 change: 1 addition & 0 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn run_consensus(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender,
config.num_validators,
None,
);
// TODO(matan): connect this to an actual channel.
if let Some(test_config) = config.test.as_ref() {
Expand Down
5 changes: 3 additions & 2 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ pub struct Proposal {
pub block_hash: BlockHash,
}

#[derive(Debug, Hash, Clone, Eq, PartialEq)]
#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)]
pub enum VoteType {
Prevote,
#[default]
Precommit,
}

#[derive(Debug, Hash, Clone, Eq, PartialEq)]
#[derive(Debug, Default, Hash, Clone, Eq, PartialEq)]
pub struct Vote {
pub vote_type: VoteType,
pub height: u64,
Expand Down
2 changes: 2 additions & 0 deletions crates/papyrus_protobuf/src/converters/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ impl From<Vote> for protobuf::Vote {
}
}

auto_impl_into_and_try_from_vec_u8!(Vote, protobuf::Vote);

impl TryFrom<protobuf::ConsensusMessage> for ConsensusMessage {
type Error = ProtobufConversionError;

Expand Down
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ mock! {
) -> Result<(), ConsensusError>;

async fn notify_decision(
&self,
&mut self,
block: TestBlock,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::ProposalWrapper;

// TODO: add debug messages and span to the tasks.

#[derive(Debug, PartialEq, Eq, Clone)]
#[derive(Debug, Default, PartialEq, Eq, Clone)]
pub struct PapyrusConsensusBlock {
content: Vec<Transaction>,
id: BlockHash,
Expand All @@ -45,22 +45,25 @@ impl ConsensusBlock for PapyrusConsensusBlock {

pub struct PapyrusConsensusContext {
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
validators: Vec<ValidatorId>,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
}

impl PapyrusConsensusContext {
// TODO(dvir): remove the dead code attribute after we will use this function.
#[allow(dead_code)]
pub fn new(
storage_reader: StorageReader,
broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
network_broadcast_sender: BroadcastSubscriberSender<ConsensusMessage>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastSubscriberSender<Vote>>,
) -> Self {
Self {
storage_reader,
broadcast_sender,
network_broadcast_sender,
validators: (0..num_validators).map(ContractAddress::from).collect(),
sync_broadcast_sender,
}
}
}
Expand Down Expand Up @@ -160,9 +163,13 @@ impl ConsensusContext for PapyrusConsensusContext {
panic!("Block in {height} was not found in storage despite waiting for it")
})
.block_hash;

// This can happen as a result of sync interrupting `run_height`.
fin_sender
.send(PapyrusConsensusBlock { content: transactions, id: block_hash })
.expect("Send should succeed");
.unwrap_or_else(|_| {
warn!("Failed to send block to consensus. height={height}");
})
}
.instrument(debug_span!("consensus_validate_proposal")),
);
Expand All @@ -180,7 +187,7 @@ impl ConsensusContext for PapyrusConsensusContext {

async fn broadcast(&mut self, message: ConsensusMessage) -> Result<(), ConsensusError> {
debug!("Broadcasting message: {message:?}");
self.broadcast_sender.send(message).await?;
self.network_broadcast_sender.send(message).await?;
Ok(())
}

Expand All @@ -190,7 +197,7 @@ impl ConsensusContext for PapyrusConsensusContext {
mut content_receiver: mpsc::Receiver<Transaction>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<(), ConsensusError> {
let mut broadcast_sender = self.broadcast_sender.clone();
let mut network_broadcast_sender = self.network_broadcast_sender.clone();

tokio::spawn(
async move {
Expand Down Expand Up @@ -219,7 +226,7 @@ impl ConsensusContext for PapyrusConsensusContext {
proposal.block_hash
);

broadcast_sender
network_broadcast_sender
.send(ConsensusMessage::Proposal(proposal))
.await
.expect("Failed to send proposal");
Expand All @@ -230,7 +237,7 @@ impl ConsensusContext for PapyrusConsensusContext {
}

async fn notify_decision(
&self,
&mut self,
block: Self::Block,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError> {
Expand All @@ -239,6 +246,10 @@ impl ConsensusContext for PapyrusConsensusContext {
"Finished consensus for height: {height}. Agreed on block with id: {:x}",
block.id().0
);
if let Some(sender) = &mut self.sync_broadcast_sender {
sender.send(precommits[0].clone()).await?;
}

Ok(())
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_network::network_manager::{mock_register_broadcast_subscriber, BroadcastNetworkMock};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal};
use papyrus_protobuf::consensus::{ConsensusMessage, Proposal, Vote};
use papyrus_storage::body::BodyStorageWriter;
use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
Expand All @@ -10,7 +10,7 @@ use starknet_api::block::Block;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;

use crate::papyrus_consensus_context::PapyrusConsensusContext;
use crate::papyrus_consensus_context::{PapyrusConsensusBlock, PapyrusConsensusContext};
use crate::types::{ConsensusBlock, ConsensusContext, ProposalInit};

// TODO(dvir): consider adding tests for times, i.e, the calls are returned immediately and nothing
Expand All @@ -20,7 +20,7 @@ const TEST_CHANNEL_SIZE: usize = 10;

#[tokio::test]
async fn build_proposal() {
let (block, papyrus_context, _mock_network) = test_setup();
let (block, papyrus_context, _mock_network, _) = test_setup();
let block_number = block.header.block_number;

let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await;
Expand All @@ -38,7 +38,7 @@ async fn build_proposal() {

#[tokio::test]
async fn validate_proposal_success() {
let (block, papyrus_context, _mock_network) = test_setup();
let (block, papyrus_context, _mock_network, _) = test_setup();
let block_number = block.header.block_number;

let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
Expand All @@ -56,7 +56,7 @@ async fn validate_proposal_success() {

#[tokio::test]
async fn validate_proposal_fail() {
let (block, papyrus_context, _mock_network) = test_setup();
let (block, papyrus_context, _mock_network, _) = test_setup();
let block_number = block.header.block_number;

let different_block = get_test_block(4, None, None, None);
Expand All @@ -72,7 +72,7 @@ async fn validate_proposal_fail() {

#[tokio::test]
async fn propose() {
let (block, papyrus_context, mut mock_network) = test_setup();
let (block, papyrus_context, mut mock_network, _) = test_setup();
let block_number = block.header.block_number;

let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
Expand All @@ -99,7 +99,21 @@ async fn propose() {
assert_eq!(mock_network.messages_to_broadcast_receiver.next().await.unwrap(), expected_message);
}

fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock<ConsensusMessage>) {
#[tokio::test]
async fn decision() {
let (_, mut papyrus_context, _, mut sync_network) = test_setup();
let block = PapyrusConsensusBlock::default();
let precommit = Vote::default();
papyrus_context.notify_decision(block, vec![precommit.clone()]).await.unwrap();
assert_eq!(sync_network.messages_to_broadcast_receiver.next().await.unwrap(), precommit);
}

fn test_setup() -> (
Block,
PapyrusConsensusContext,
BroadcastNetworkMock<ConsensusMessage>,
BroadcastNetworkMock<Vote>,
) {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let block = get_test_block(5, None, None, None);
let block_number = block.header.block_number;
Expand All @@ -113,11 +127,13 @@ fn test_setup() -> (Block, PapyrusConsensusContext, BroadcastNetworkMock<Consens
.commit()
.unwrap();

let test_channels = mock_register_broadcast_subscriber().unwrap();
let network_channels = mock_register_broadcast_subscriber().unwrap();
let sync_channels = mock_register_broadcast_subscriber().unwrap();
let papyrus_context = PapyrusConsensusContext::new(
storage_reader.clone(),
test_channels.subscriber_channels.messages_to_broadcast_sender,
network_channels.subscriber_channels.messages_to_broadcast_sender,
4,
Some(sync_channels.subscriber_channels.messages_to_broadcast_sender),
);
(block, papyrus_context, test_channels.mock_network)
(block, papyrus_context, network_channels.mock_network, sync_channels.mock_network)
}
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mock! {
) -> Result<(), ConsensusError>;

async fn notify_decision(
&self,
&mut self,
block: TestBlock,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;
Expand Down
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub trait ConsensusContext {
/// - `precommits` - All precommits must be for the same `(block.id(), height, round)` and form
/// a quorum (>2/3 of the voting power) for this height.
async fn notify_decision(
&self,
&mut self,
block: Self::Block,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;
Expand Down

0 comments on commit 8e9908b

Please sign in to comment.