Skip to content

Commit

Permalink
feat(consensus): handle proposals from different rounds by context
Browse files Browse the repository at this point in the history
  • Loading branch information
asmaastarkware committed Nov 20, 2024
1 parent b652ffa commit 1818e79
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 56 deletions.
18 changes: 13 additions & 5 deletions crates/sequencing/papyrus_consensus/src/manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mock! {
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
timeout: Duration,
content: mpsc::Receiver<Transaction>
) -> oneshot::Receiver<ProposalContentId>;
Expand All @@ -69,6 +70,9 @@ mock! {
block: ProposalContentId,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);

}
}

Expand All @@ -95,14 +99,15 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 1.
context
.expect_validate_proposal()
.return_once(move |_, _, _| {
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
})
.times(1);
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_broadcast().returning(move |_| Ok(()));

let mut manager = MultiHeightManager::new(*VALIDATOR_ID, TIMEOUTS.clone());
Expand All @@ -114,7 +119,7 @@ async fn manager_multiple_heights_unordered() {
// Run the manager for height 2.
context
.expect_validate_proposal()
.return_once(move |_, _, _| {
.return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_receiver
Expand All @@ -131,13 +136,14 @@ async fn run_consensus_sync() {
let mut context = MockTestContext::new();
let (decision_tx, decision_rx) = oneshot::channel();

context.expect_validate_proposal().return_once(move |_, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::TWO)).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_broadcast().returning(move |_| Ok(()));
context.expect_decision_reached().return_once(move |block, votes| {
assert_eq!(block, BlockHash(Felt::TWO));
Expand Down Expand Up @@ -190,13 +196,14 @@ async fn run_consensus_sync_cancellation_safety() {
let (proposal_handled_tx, proposal_handled_rx) = oneshot::channel();
let (decision_tx, decision_rx) = oneshot::channel();

context.expect_validate_proposal().return_once(move |_, _, _| {
context.expect_validate_proposal().return_once(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
});
context.expect_validators().returning(move |_| vec![*PROPOSER_ID, *VALIDATOR_ID]);
context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_broadcast().with(eq(prevote(Some(Felt::ONE), 1, 0, *VALIDATOR_ID))).return_once(
move |_| {
proposal_handled_tx.send(()).unwrap();
Expand Down Expand Up @@ -260,7 +267,8 @@ async fn test_timeouts() {
send(&mut sender, precommit(None, 1, 0, *VALIDATOR_ID_3)).await;

let mut context = MockTestContext::new();
context.expect_validate_proposal().returning(move |_, _, _| {
context.expect_set_height_and_round().returning(move |_, _| ());
context.expect_validate_proposal().returning(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BlockHash(Felt::ONE)).unwrap();
block_receiver
Expand Down
32 changes: 25 additions & 7 deletions crates/sequencing/papyrus_consensus/src/single_height_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ impl ShcTask {
ShcEvent::Precommit(event)
}
ShcTask::BuildProposal(round, receiver) => {
println!("Building proposal for round {}", round);
let proposal_id = receiver.await.expect("Block building failed.");
ShcEvent::BuildProposal(StateMachineEvent::GetProposal(Some(proposal_id), round))
}
Expand All @@ -125,14 +124,19 @@ impl ShcTask {
) => {
let proposal_id = match id_built_from_content_receiver.await {
Ok(proposal_id) => Some(proposal_id),
// Proposal never received from peer.
// Proposal never received from peer, or proposal was interrupted.
Err(_) => None,
};
let fin = match fin_from_proposer_receiver.await {
Ok(fin) => Some(fin),
// ProposalFin never received from peer.
Err(_) => None,
};
// Avoid voting NIL due to an interrupt, as this does not indicate a failure of the
// Proposal itself. To prevent this, we ensure the current round's proposal is
// never interrupted. Allowing interruptions would require exposing interrupts
// to consensus, enabling it to distinguish between an interrupt and an actual
// failure.
ShcEvent::ValidateProposal(
StateMachineEvent::Proposal(proposal_id, init.round, init.valid_round),
fin,
Expand Down Expand Up @@ -192,7 +196,9 @@ impl SingleHeightConsensus {
info!("Starting consensus with validators {:?}", self.validators);
let leader_fn = |round: Round| -> ValidatorId { context.proposer(self.height, round) };
let events = self.state_machine.start(&leader_fn);
self.handle_state_machine_events(context, events).await
let ret = self.handle_state_machine_events(context, events).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
ret
}

/// Process the proposal init and initiate block validation. See [`ShcTask::ValidateProposal`]
Expand Down Expand Up @@ -231,8 +237,14 @@ impl SingleHeightConsensus {
// twice in parallel. This could be caused by a network repeat or a malicious spam attack.
proposal_entry.insert(None);
let block_receiver = context
.validate_proposal(self.height, self.timeouts.proposal_timeout, p2p_messages_receiver)
.validate_proposal(
self.height,
init.round,
self.timeouts.proposal_timeout,
p2p_messages_receiver,
)
.await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
Ok(ShcReturn::Tasks(vec![ShcTask::ValidateProposal(init, block_receiver, fin_receiver)]))
}

Expand All @@ -258,7 +270,11 @@ impl SingleHeightConsensus {
ConsensusMessage::Proposal(_) => {
unimplemented!("Proposals should use `handle_proposal` due to fake streaming")
}
ConsensusMessage::Vote(vote) => self.handle_vote(context, vote).await,
ConsensusMessage::Vote(vote) => {
let ret = self.handle_vote(context, vote).await;
context.set_height_and_round(self.height, self.state_machine.round()).await;
ret
}
}
}

Expand All @@ -268,7 +284,7 @@ impl SingleHeightConsensus {
event: ShcEvent,
) -> Result<ShcReturn, ConsensusError> {
debug!("Received ShcEvent: {:?}", event);
match event {
let ret = match event {
ShcEvent::TimeoutPropose(event)
| ShcEvent::TimeoutPrevote(event)
| ShcEvent::TimeoutPrecommit(event) => {
Expand Down Expand Up @@ -342,7 +358,9 @@ impl SingleHeightConsensus {
self.handle_state_machine_events(context, sm_events).await
}
_ => unimplemented!("Unexpected event: {:?}", event),
}
};
context.set_height_and_round(self.height, self.state_machine.round()).await;
ret
}

#[instrument(skip_all)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ async fn proposer() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -173,11 +174,12 @@ async fn validator(repeat_proposal: bool) {
);

context.expect_proposer().returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -251,11 +253,12 @@ async fn vote_twice(same_vote: bool) {
);

context.expect_proposer().times(1).returning(move |_, _| *PROPOSER_ID);
context.expect_validate_proposal().times(1).returning(move |_, _, _| {
context.expect_validate_proposal().times(1).returning(move |_, _, _, _| {
let (block_sender, block_receiver) = oneshot::channel();
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context
.expect_broadcast()
.times(1) // Shows the repeat vote is ignored.
Expand Down Expand Up @@ -324,6 +327,7 @@ async fn rebroadcast_votes() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context
.expect_broadcast()
.times(1)
Expand Down Expand Up @@ -385,6 +389,7 @@ async fn repropose() {
block_sender.send(BLOCK.id).unwrap();
block_receiver
});
context.expect_set_height_and_round().returning(move |_, _| ());
context
.expect_broadcast()
.times(1)
Expand Down
4 changes: 4 additions & 0 deletions crates/sequencing/papyrus_consensus/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ impl StateMachine {
}
}

pub fn round(&self) -> Round {
self.round
}

pub fn quorum_size(&self) -> u32 {
self.quorum
}
Expand Down
3 changes: 3 additions & 0 deletions crates/sequencing/papyrus_consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ mock! {
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
timeout: Duration,
content: mpsc::Receiver<u32>
) -> oneshot::Receiver<ProposalContentId>;
Expand All @@ -54,6 +55,8 @@ mock! {
block: ProposalContentId,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
}
}

Expand Down
5 changes: 5 additions & 0 deletions crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub trait ConsensusContext {
/// Params:
/// - `height`: The height of the block to be built. Specifically this indicates the initial
/// state of the block.
/// - `round`: The round of the block to be built.
/// - `timeout`: The maximum time to wait for the block to be built.
/// - `content`: A receiver for the stream of the block's content.
///
Expand All @@ -68,6 +69,7 @@ pub trait ConsensusContext {
async fn validate_proposal(
&mut self,
height: BlockNumber,
round: Round,
timeout: Duration,
content: mpsc::Receiver<Self::ProposalChunk>,
) -> oneshot::Receiver<ProposalContentId>;
Expand Down Expand Up @@ -101,6 +103,9 @@ pub trait ConsensusContext {
block: ProposalContentId,
precommits: Vec<Vote>,
) -> Result<(), ConsensusError>;

/// Update the context with the current height and round.
async fn set_height_and_round(&mut self, height: BlockNumber, round: Round);
}

#[derive(PartialEq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ papyrus_protobuf.workspace = true
papyrus_storage.workspace = true
starknet-types-core.workspace = true
starknet_api.workspace = true
starknet_batcher_types.workspace = true
starknet_batcher_types = { workspace = true, features = ["testing"] }
tokio = { workspace = true, features = ["full"] }
tracing.workspace = true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ impl ConsensusContext for PapyrusConsensusContext {
async fn validate_proposal(
&mut self,
height: BlockNumber,
_round: Round,
_timeout: Duration,
mut content: mpsc::Receiver<Transaction>,
) -> oneshot::Receiver<ProposalContentId> {
Expand Down Expand Up @@ -259,6 +260,10 @@ impl ConsensusContext for PapyrusConsensusContext {
proposals.retain(|&h, _| h > BlockNumber(height));
Ok(())
}

async fn set_height_and_round(&mut self, _height: BlockNumber, _round: Round) {
// No-op
}
}

const SLEEP_BETWEEN_CHECK_FOR_BLOCK: Duration = Duration::from_secs(10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ async fn validate_proposal_success() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, Duration::MAX, validate_receiver)
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.await
.await
.unwrap();
Expand All @@ -72,7 +72,7 @@ async fn validate_proposal_fail() {
validate_sender.close_channel();

let fin = papyrus_context
.validate_proposal(block_number, Duration::MAX, validate_receiver)
.validate_proposal(block_number, 0, Duration::MAX, validate_receiver)
.await
.await;
assert_eq!(fin, Err(oneshot::Canceled));
Expand Down
Loading

0 comments on commit 1818e79

Please sign in to comment.