Skip to content

Commit

Permalink
test: add consensus to e2e flow test
Browse files Browse the repository at this point in the history
  • Loading branch information
dan-starkware committed Nov 5, 2024
1 parent 7a3a602 commit cb12e5b
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 84 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/batcher_types/src/batcher_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct BuildProposalInput {
pub deadline: chrono::DateTime<Utc>,
pub retrospective_block_hash: Option<BlockHashAndNumber>,
// TODO: Should we get the gas price here?
// proposer address
// kzg
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -56,6 +58,7 @@ pub enum GetProposalContent {
}

#[derive(Clone, Debug, Serialize, Deserialize)]
// Same as propose?
pub struct ValidateProposalInput {
pub proposal_id: ProposalId,
pub deadline: chrono::DateTime<Utc>,
Expand Down
5 changes: 5 additions & 0 deletions crates/tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mempool_test_utils.workspace = true
papyrus_common.workspace = true
papyrus_config.workspace = true
papyrus_consensus.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
papyrus_rpc.workspace = true
papyrus_storage = { workspace = true, features = ["testing"] }
reqwest.workspace = true
Expand All @@ -42,6 +44,9 @@ tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
futures.workspace = true
papyrus_network.workspace = true
papyrus_protobuf.workspace = true
pretty_assertions.workspace = true
rstest.workspace = true
starknet_sequencer_infra.workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn main() -> anyhow::Result<()> {
.await;

// Derive the configuration for the sequencer node.
let (config, required_params) =
let (config, required_params, _) =
create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await;

// Note: the batcher storage file handle is passed as a reference to maintain its ownership in
Expand Down
8 changes: 7 additions & 1 deletion crates/tests-integration/src/integration_test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::SocketAddr;

use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::communication::SharedBatcherClient;
Expand Down Expand Up @@ -32,6 +34,9 @@ pub struct IntegrationTestSetup {

// Handle of the sequencer node.
pub sequencer_node_handle: JoinHandle<Result<(), anyhow::Error>>,

// Channels for consensus proposals, used for asserting the right transactions are proposed.
pub consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
}

impl IntegrationTestSetup {
Expand All @@ -53,7 +58,7 @@ impl IntegrationTestSetup {
.await;

// Derive the configuration for the mempool node.
let (config, _required_params) =
let (config, _required_params, consensus_proposals_channels) =
create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await;

let (clients, servers) = create_node_modules(&config);
Expand All @@ -77,6 +82,7 @@ impl IntegrationTestSetup {
batcher_client: clients.get_batcher_client().unwrap(),
rpc_storage_file_handle: storage_for_test.rpc_storage_handle,
sequencer_node_handle,
consensus_proposals_channels,
}
}

Expand Down
49 changes: 28 additions & 21 deletions crates/tests-integration/src/integration_test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;

use axum::body::Body;
use blockifier::context::ChainInfo;
Expand All @@ -11,6 +12,9 @@ use mempool_test_utils::starknet_api_test_utils::{
MultiAccountTransactionGenerator,
};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_storage::StorageConfig;
use reqwest::{Client, Response};
use starknet_api::block::BlockNumber;
Expand All @@ -28,29 +32,14 @@ use starknet_gateway::config::{
};
use starknet_gateway_types::errors::GatewaySpecError;
use starknet_http_server::config::HttpServerConfig;
use starknet_sequencer_node::config::component_config::ComponentConfig;
use starknet_sequencer_node::config::test_utils::RequiredParams;
use starknet_sequencer_node::config::{
ComponentExecutionConfig,
ComponentExecutionMode,
SequencerNodeConfig,
};
use starknet_sequencer_node::config::SequencerNodeConfig;
use tokio::net::TcpListener;

pub async fn create_config(
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
) -> (SequencerNodeConfig, RequiredParams) {
// TODO(Arni/ Matan): Enable the consensus in the end to end test.
let components = ComponentConfig {
consensus_manager: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
..Default::default()
};

) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<ProposalPart>) {
let chain_id = batcher_storage_config.db_config.chain_id.clone();
// TODO(Tsabary): create chain_info in setup, and pass relevant values throughout.
let mut chain_info = ChainInfo::create_for_testing();
Expand All @@ -60,12 +49,10 @@ pub async fn create_config(
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let consensus_manager_config = ConsensusManagerConfig {
consensus_config: ConsensusConfig { start_height: BlockNumber(1), ..Default::default() },
};
let (consensus_manager_config, consensus_proposals_channels) =
create_consensus_manager_config();
(
SequencerNodeConfig {
components,
batcher_config,
consensus_manager_config,
gateway_config,
Expand All @@ -78,9 +65,29 @@ pub async fn create_config(
eth_fee_token_address: fee_token_addresses.eth_fee_token_address,
strk_fee_token_address: fee_token_addresses.strk_fee_token_address,
},
consensus_proposals_channels,
)
}

fn create_consensus_manager_config()
-> (ConsensusManagerConfig, BroadcastTopicChannels<ProposalPart>) {
let (network_config, broadcast_channels) =
create_network_config_connected_to_broadcast_channels(
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC,
),
);
let consensus_manager_config = ConsensusManagerConfig {
consensus_config: ConsensusConfig {
start_height: BlockNumber(1),
consensus_delay: Duration::from_secs(3),
network_config,
..Default::default()
},
};
(consensus_manager_config, broadcast_channels)
}

pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig {
// TODO(Tsabary): get the latest version from the RPC crate.
const RPC_SPEC_VERSION: &str = "V0_8";
Expand Down
102 changes: 41 additions & 61 deletions crates/tests-integration/tests/end_to_end_test.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use pretty_assertions::assert_eq;
use rstest::{fixture, rstest};
use starknet_api::block::BlockNumber;
use starknet_api::core::ChainId;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{
BuildProposalInput,
DecisionReachedInput,
GetProposalContent,
GetProposalContentInput,
ProposalId,
StartHeightInput,
};
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_integration_tests::integration_test_setup::IntegrationTestSetup;
use starknet_integration_tests::integration_test_utils::{
create_integration_test_tx_generator,
Expand All @@ -25,7 +19,9 @@ fn tx_generator() -> MultiAccountTransactionGenerator {

#[rstest]
#[tokio::test]
async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) {
async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) {
const LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(5);
// Setup.
let mock_running_system = IntegrationTestSetup::new_from_tx_generator(&tx_generator).await;

Expand All @@ -34,60 +30,44 @@ async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) {
mock_running_system.assert_add_tx_success(tx)
})
.await;

// Test.
run_consensus_for_end_to_end_test(
&mock_running_system.batcher_client,
&expected_batched_tx_hashes,
)
.await;
let join_handle = tokio::spawn(async move {
tokio::time::timeout(
LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT,
listen_to_broadcasted_messages(
mock_running_system.consensus_proposals_channels,
&expected_batched_tx_hashes,
),
)
.await
.expect("listen to broadcasted messages should finish in time");
});
join_handle.await.expect("Task should succeed");
}

/// This function should mirror
/// [`run_consensus`](papyrus_consensus::manager::run_consensus). It makes requests
/// from the batcher client and asserts the expected responses were received.
pub async fn run_consensus_for_end_to_end_test(
batcher_client: &SharedBatcherClient,
async fn listen_to_broadcasted_messages(
consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
expected_batched_tx_hashes: &[TransactionHash],
) {
// Start height.
// TODO(Arni): Get the current height and retrospective_block_hash from the rpc storage or use
// consensus directly.
let current_height = BlockNumber(1);
batcher_client.start_height(StartHeightInput { height: current_height }).await.unwrap();

// Build proposal.
let proposal_id = ProposalId(0);
let retrospective_block_hash = None;
let build_proposal_duaration = chrono::TimeDelta::new(1, 0).unwrap();
batcher_client
.build_proposal(BuildProposalInput {
proposal_id,
deadline: chrono::Utc::now() + build_proposal_duaration,
retrospective_block_hash,
})
.await
.unwrap();

// Get proposal content.
let mut executed_tx_hashes: Vec<TransactionHash> = vec![];
let _proposal_commitment = loop {
let response = batcher_client
.get_proposal_content(GetProposalContentInput { proposal_id })
// TODO(Dan, Guy): retrieve chian ID (from the config?).
const CHAIN_ID_NAME: &str = "CHAIN_ID_SUBDIR";
let chain_id = ChainId::Other(CHAIN_ID_NAME.to_string());
let mut broadcasted_messages_receiver =
consensus_proposals_channels.broadcasted_messages_receiver;
let mut received_tx_hashes = vec![];
while received_tx_hashes.len() < expected_batched_tx_hashes.len() {
let (message, _broadcasted_message_metadata) = broadcasted_messages_receiver
.next()
.await
.unwrap();
match response.content {
GetProposalContent::Txs(batched_txs) => {
executed_tx_hashes.append(&mut batched_txs.iter().map(|tx| tx.tx_hash()).collect());
}
GetProposalContent::Finished(proposal_commitment) => {
break proposal_commitment;
}
.unwrap_or_else(|| panic!("Expected to receive a message from the broadcast topic"));
if let ProposalPart::Transactions(transactions) = message.unwrap() {
received_tx_hashes.append(
&mut transactions
.transactions
.iter()
.map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap())
.collect(),
);
}
};

// Decision reached.
batcher_client.decision_reached(DecisionReachedInput { proposal_id }).await.unwrap();

assert_eq!(expected_batched_tx_hashes, executed_tx_hashes);
}
assert_eq!(received_tx_hashes, expected_batched_tx_hashes);
}

0 comments on commit cb12e5b

Please sign in to comment.