From 671118d014715f8bf5a19f52c77151812e894089 Mon Sep 17 00:00:00 2001 From: Dan Brownstein Date: Tue, 5 Nov 2024 00:42:37 +0200 Subject: [PATCH] test: add consensus to e2e flow test --- Cargo.lock | 4 +- crates/batcher_types/src/batcher_types.rs | 3 + crates/blockifier/cairo_native | 2 +- .../src/sequencer_consensus_context.rs | 6 + crates/tests_integration/Cargo.toml | 4 +- .../tests_integration/src/flow_test_setup.rs | 10 +- .../src/integration_test_setup.rs | 2 +- crates/tests_integration/src/utils.rs | 49 ++++---- .../tests/end_to_end_flow_test.rs | 117 +++++++++--------- 9 files changed, 111 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cdf85f605d..b56fc565cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10288,11 +10288,13 @@ dependencies = [ "axum", "blockifier", "cairo-lang-starknet-classes", - "chrono", + "futures", "indexmap 2.6.0", "mempool_test_utils", "papyrus_common", "papyrus_consensus", + "papyrus_network", + "papyrus_protobuf", "papyrus_rpc", "papyrus_storage", "pretty_assertions", diff --git a/crates/batcher_types/src/batcher_types.rs b/crates/batcher_types/src/batcher_types.rs index 49b3f3f1af..4ce45a2b98 100644 --- a/crates/batcher_types/src/batcher_types.rs +++ b/crates/batcher_types/src/batcher_types.rs @@ -36,6 +36,8 @@ pub struct BuildProposalInput { pub deadline: chrono::DateTime, pub retrospective_block_hash: Option, // TODO: Should we get the gas price here? + // TODO: add proposer address. + // TODO: add whether the kzg mechanism is used for DA. } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -56,6 +58,7 @@ pub enum GetProposalContent { } #[derive(Clone, Debug, Serialize, Deserialize)] +// TODO(Dan): Consider unifying with BuildProposalInput as they have the same fields. pub struct ValidateProposalInput { pub proposal_id: ProposalId, pub deadline: chrono::DateTime, diff --git a/crates/blockifier/cairo_native b/crates/blockifier/cairo_native index ab478323d6..b5769e4f6b 160000 --- a/crates/blockifier/cairo_native +++ b/crates/blockifier/cairo_native @@ -1 +1 @@ -Subproject commit ab478323d6aee5e0424712bbde98de443b8cc72f +Subproject commit b5769e4f6ba914b36eef68e0b1f71c791d7d075c diff --git a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs index 40e23fe204..d0c2137b4c 100644 --- a/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs +++ b/crates/sequencing/papyrus_consensus_orchestrator/src/sequencer_consensus_context.rs @@ -22,6 +22,7 @@ use papyrus_consensus::types::{ use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait}; use papyrus_protobuf::consensus::{ ConsensusMessage, + ProposalFin, ProposalInit, ProposalPart, TransactionBatch, @@ -318,6 +319,11 @@ async fn stream_build_proposal( content.len(), height ); + debug!("Broadcasting proposal fin: {proposal_content_id:?}"); + broadcast_client + .broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id })) + .await + .expect("Failed to broadcast proposal fin"); // Update valid_proposals before sending fin to avoid a race condition // with `repropose` being called before `valid_proposals` is updated. let mut valid_proposals = valid_proposals.lock().expect("Lock was poisoned"); diff --git a/crates/tests_integration/Cargo.toml b/crates/tests_integration/Cargo.toml index 7260d3dc62..0aac6c931b 100644 --- a/crates/tests_integration/Cargo.toml +++ b/crates/tests_integration/Cargo.toml @@ -14,11 +14,12 @@ assert_matches.workspace = true axum.workspace = true blockifier.workspace = true cairo-lang-starknet-classes.workspace = true -chrono.workspace = true indexmap.workspace = true mempool_test_utils.workspace = true papyrus_common.workspace = true papyrus_consensus.workspace = true +papyrus_network = { workspace = true, features = ["testing"] } +papyrus_protobuf.workspace = true papyrus_rpc.workspace = true papyrus_storage = { workspace = true, features = ["testing"] } reqwest.workspace = true @@ -42,6 +43,7 @@ tokio.workspace = true tracing.workspace = true [dev-dependencies] +futures.workspace = true pretty_assertions.workspace = true rstest.workspace = true starknet_sequencer_infra.workspace = true diff --git a/crates/tests_integration/src/flow_test_setup.rs b/crates/tests_integration/src/flow_test_setup.rs index 037663dd59..dfd683f6cb 100644 --- a/crates/tests_integration/src/flow_test_setup.rs +++ b/crates/tests_integration/src/flow_test_setup.rs @@ -1,6 +1,8 @@ use std::net::SocketAddr; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::ProposalPart; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_batcher_types::communication::SharedBatcherClient; @@ -32,6 +34,9 @@ pub struct FlowTestSetup { // Handle of the sequencer node. pub sequencer_node_handle: JoinHandle>, + + // Channels for consensus proposals, used for asserting the right transactions are proposed. + pub consensus_proposals_channels: BroadcastTopicChannels, } impl FlowTestSetup { @@ -52,8 +57,8 @@ impl FlowTestSetup { ) .await; - // Derive the configuration for the mempool node. - let (config, _required_params) = + // Derive the configuration for the sequencer node. + let (config, _required_params, consensus_proposals_channels) = create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await; let (clients, servers) = create_node_modules(&config); @@ -77,6 +82,7 @@ impl FlowTestSetup { batcher_client: clients.get_batcher_client().unwrap(), rpc_storage_file_handle: storage_for_test.rpc_storage_handle, sequencer_node_handle, + consensus_proposals_channels, } } diff --git a/crates/tests_integration/src/integration_test_setup.rs b/crates/tests_integration/src/integration_test_setup.rs index fb4d516d72..0f5c437d41 100644 --- a/crates/tests_integration/src/integration_test_setup.rs +++ b/crates/tests_integration/src/integration_test_setup.rs @@ -45,7 +45,7 @@ impl IntegrationTestSetup { .await; // Derive the configuration for the sequencer node. - let (config, required_params) = + let (config, required_params, _) = create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await; let node_config_dir_handle = tempdir().unwrap(); diff --git a/crates/tests_integration/src/utils.rs b/crates/tests_integration/src/utils.rs index 4d8620e9cf..f5fb162f7b 100644 --- a/crates/tests_integration/src/utils.rs +++ b/crates/tests_integration/src/utils.rs @@ -1,5 +1,6 @@ use std::future::Future; use std::net::SocketAddr; +use std::time::Duration; use axum::body::Body; use blockifier::context::ChainInfo; @@ -11,6 +12,9 @@ use mempool_test_utils::starknet_api_test_utils::{ MultiAccountTransactionGenerator, }; use papyrus_consensus::config::ConsensusConfig; +use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::ProposalPart; use papyrus_storage::StorageConfig; use reqwest::{Client, Response}; use starknet_api::block::BlockNumber; @@ -28,29 +32,14 @@ use starknet_gateway::config::{ }; use starknet_gateway_types::errors::GatewaySpecError; use starknet_http_server::config::HttpServerConfig; -use starknet_sequencer_node::config::component_config::ComponentConfig; use starknet_sequencer_node::config::test_utils::RequiredParams; -use starknet_sequencer_node::config::{ - ComponentExecutionConfig, - ComponentExecutionMode, - SequencerNodeConfig, -}; +use starknet_sequencer_node::config::SequencerNodeConfig; use tokio::net::TcpListener; pub async fn create_config( rpc_server_addr: SocketAddr, batcher_storage_config: StorageConfig, -) -> (SequencerNodeConfig, RequiredParams) { - // TODO(Arni/ Matan): Enable the consensus in the end to end test. - let components = ComponentConfig { - consensus_manager: ComponentExecutionConfig { - execution_mode: ComponentExecutionMode::Disabled, - local_server_config: None, - ..Default::default() - }, - ..Default::default() - }; - +) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels) { let chain_id = batcher_storage_config.db_config.chain_id.clone(); // TODO(Tsabary): create chain_info in setup, and pass relevant values throughout. let mut chain_info = ChainInfo::create_for_testing(); @@ -60,12 +49,10 @@ pub async fn create_config( let gateway_config = create_gateway_config(chain_info).await; let http_server_config = create_http_server_config().await; let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr); - let consensus_manager_config = ConsensusManagerConfig { - consensus_config: ConsensusConfig { start_height: BlockNumber(1), ..Default::default() }, - }; + let (consensus_manager_config, consensus_proposals_channels) = + create_consensus_manager_config_and_channels(); ( SequencerNodeConfig { - components, batcher_config, consensus_manager_config, gateway_config, @@ -78,9 +65,29 @@ pub async fn create_config( eth_fee_token_address: fee_token_addresses.eth_fee_token_address, strk_fee_token_address: fee_token_addresses.strk_fee_token_address, }, + consensus_proposals_channels, ) } +fn create_consensus_manager_config_and_channels() +-> (ConsensusManagerConfig, BroadcastTopicChannels) { + let (network_config, broadcast_channels) = + create_network_config_connected_to_broadcast_channels( + papyrus_network::gossipsub_impl::Topic::new( + starknet_consensus_manager::consensus_manager::NETWORK_TOPIC, + ), + ); + let consensus_manager_config = ConsensusManagerConfig { + consensus_config: ConsensusConfig { + start_height: BlockNumber(1), + consensus_delay: Duration::from_secs(1), + network_config, + ..Default::default() + }, + }; + (consensus_manager_config, broadcast_channels) +} + pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig { // TODO(Tsabary): get the latest version from the RPC crate. const RPC_SPEC_VERSION: &str = "V0_8"; diff --git a/crates/tests_integration/tests/end_to_end_flow_test.rs b/crates/tests_integration/tests/end_to_end_flow_test.rs index 47a647e99a..fcca679160 100644 --- a/crates/tests_integration/tests/end_to_end_flow_test.rs +++ b/crates/tests_integration/tests/end_to_end_flow_test.rs @@ -1,22 +1,18 @@ +use futures::StreamExt; use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator; +use papyrus_network::network_manager::BroadcastTopicChannels; +use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart}; use pretty_assertions::assert_eq; use rstest::{fixture, rstest}; -use starknet_api::block::BlockNumber; +use starknet_api::block::{BlockHash, BlockNumber}; +use starknet_api::core::{ChainId, ContractAddress}; use starknet_api::transaction::TransactionHash; -use starknet_batcher_types::batcher_types::{ - BuildProposalInput, - DecisionReachedInput, - GetProposalContent, - GetProposalContentInput, - ProposalId, - StartHeightInput, -}; -use starknet_batcher_types::communication::SharedBatcherClient; use starknet_integration_tests::flow_test_setup::FlowTestSetup; use starknet_integration_tests::utils::{ create_integration_test_tx_generator, run_integration_test_scenario, }; +use starknet_types_core::felt::Felt; #[fixture] fn tx_generator() -> MultiAccountTransactionGenerator { @@ -25,7 +21,9 @@ fn tx_generator() -> MultiAccountTransactionGenerator { #[rstest] #[tokio::test] -async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) { +async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) { + const LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT: std::time::Duration = + std::time::Duration::from_secs(5); // Setup. let mock_running_system = FlowTestSetup::new_from_tx_generator(&tx_generator).await; @@ -34,60 +32,61 @@ async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) { mock_running_system.assert_add_tx_success(tx) }) .await; - - // Test. - run_consensus_for_end_to_end_test( - &mock_running_system.batcher_client, - &expected_batched_tx_hashes, + // TODO(Dan, Itay): Consider adding a utility function that waits for something to happen. + tokio::time::timeout( + LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT, + listen_to_broadcasted_messages( + mock_running_system.consensus_proposals_channels, + &expected_batched_tx_hashes, + ), ) - .await; + .await + .expect("listen to broadcasted messages should finish in time"); } -/// This function should mirror -/// [`run_consensus`](papyrus_consensus::manager::run_consensus). It makes requests -/// from the batcher client and asserts the expected responses were received. -pub async fn run_consensus_for_end_to_end_test( - batcher_client: &SharedBatcherClient, +async fn listen_to_broadcasted_messages( + consensus_proposals_channels: BroadcastTopicChannels, expected_batched_tx_hashes: &[TransactionHash], ) { - // Start height. - // TODO(Arni): Get the current height and retrospective_block_hash from the rpc storage or use - // consensus directly. - let current_height = BlockNumber(1); - batcher_client.start_height(StartHeightInput { height: current_height }).await.unwrap(); - - // Build proposal. - let proposal_id = ProposalId(0); - let retrospective_block_hash = None; - let build_proposal_duaration = chrono::TimeDelta::new(1, 0).unwrap(); - batcher_client - .build_proposal(BuildProposalInput { - proposal_id, - deadline: chrono::Utc::now() + build_proposal_duaration, - retrospective_block_hash, - }) - .await - .unwrap(); - - // Get proposal content. - let mut executed_tx_hashes: Vec = vec![]; - let _proposal_commitment = loop { - let response = batcher_client - .get_proposal_content(GetProposalContentInput { proposal_id }) - .await - .unwrap(); - match response.content { - GetProposalContent::Txs(batched_txs) => { - executed_tx_hashes.append(&mut batched_txs.iter().map(|tx| tx.tx_hash()).collect()); + // TODO(Dan, Guy): retrieve chain ID. Maybe by modifying IntegrationTestSetup to hold it as a + // member, and instantiate the value using StorageTestSetup. + const CHAIN_ID_NAME: &str = "CHAIN_ID_SUBDIR"; + let chain_id = ChainId::Other(CHAIN_ID_NAME.to_string()); + let mut broadcasted_messages_receiver = + consensus_proposals_channels.broadcasted_messages_receiver; + let mut received_tx_hashes = vec![]; + // TODO (Dan, Guy): retrieve / calculate the expected proposal init and fin. + let expected_proposal_init = ProposalInit { + height: BlockNumber(1), + round: 0, + valid_round: None, + proposer: ContractAddress::default(), + }; + let expected_proposal_fin = ProposalFin { + proposal_content_id: BlockHash(Felt::from_hex_unchecked( + "0x4597ceedbef644865917bf723184538ef70d43954d63f5b7d8cb9d1bd4c2c32", + )), + }; + assert_eq!( + broadcasted_messages_receiver.next().await.unwrap().0.unwrap(), + ProposalPart::Init(expected_proposal_init) + ); + loop { + match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() { + ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init), + ProposalPart::Fin(proposal_fin) => { + assert_eq!(proposal_fin, expected_proposal_fin); + break; } - GetProposalContent::Finished(proposal_commitment) => { - break proposal_commitment; + ProposalPart::Transactions(transactions) => { + received_tx_hashes.extend( + transactions + .transactions + .iter() + .map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()), + ); } } - }; - - // Decision reached. - batcher_client.decision_reached(DecisionReachedInput { proposal_id }).await.unwrap(); - - assert_eq!(expected_batched_tx_hashes, executed_tx_hashes); + } + assert_eq!(received_tx_hashes, expected_batched_tx_hashes); }