Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add consensus to e2e flow test #1811

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/batcher_types/src/batcher_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub struct BuildProposalInput {
pub deadline: chrono::DateTime<Utc>,
pub retrospective_block_hash: Option<BlockHashAndNumber>,
// TODO: Should we get the gas price here?
// TODO: add proposer address.
// TODO: add whether the kzg mechanism is used for DA.
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -56,6 +58,7 @@ pub enum GetProposalContent {
}

#[derive(Clone, Debug, Serialize, Deserialize)]
// TODO(Dan): Consider unifying with BuildProposalInput as they have the same fields.
pub struct ValidateProposalInput {
pub proposal_id: ProposalId,
pub deadline: chrono::DateTime<Utc>,
Expand Down
2 changes: 1 addition & 1 deletion crates/blockifier/cairo_native
Submodule cairo_native updated 48 files
+5 −5 .github/workflows/bench-hyperfine.yml
+12 −12 .github/workflows/ci.yml
+2 −2 .github/workflows/publish.yml
+2 −2 .github/workflows/release.yml
+2 −2 .github/workflows/rustdoc.yml
+0 −132 .github/workflows/starknet-blocks.yml
+51 −51 Cargo.lock
+80 −6 benches/benches.rs
+0 −17 benches/compile_time.rs
+3 −54 benches/libfuncs.rs
+45 −33 docs/implementing_libfuncs.md
+5 −0 programs/benches/factorial_2M.c
+5 −0 programs/benches/fib_2M.c
+5 −0 programs/benches/logistic_map.c
+48 −42 programs/compile_benches/dijkstra.cairo
+17 −8 programs/compile_benches/extended_euclidean_algorithm.cairo
+27 −78 programs/compile_benches/fast_power.cairo
+285 −0 programs/compile_benches/sha256.cairo
+0 −539 programs/compile_benches/sha512.cairo
+2 −5 runtime/Cargo.toml
+6 −236 runtime/src/lib.rs
+1 −1 rust-toolchain.toml
+0 −32 scripts/diff-check.sh
+6 −6 src/arch.rs
+26 −2 src/compiler.rs
+1 −15 src/context.rs
+3 −0 src/error.rs
+58 −30 src/executor.rs
+12 −0 src/executor/aot.rs
+66 −13 src/executor/contract.rs
+13 −0 src/executor/jit.rs
+0 −25 src/ffi.rs
+6 −5 src/libfuncs/array.rs
+3 −5 src/libfuncs/circuit.rs
+121 −18 src/libfuncs/gas.rs
+595 −290 src/libfuncs/starknet.rs
+1,508 −677 src/libfuncs/starknet/secp256.rs
+9 −6 src/metadata/gas.rs
+6 −4 src/types.rs
+24 −19 src/types/array.rs
+6 −2 src/types/builtin_costs.rs
+45 −3 src/utils.rs
+1 −87 src/utils/block_ext.rs
+2 −2 src/values.rs
+6 −6 tests/alexandria/Scarb.lock
+4 −4 tests/alexandria/Scarb.toml
+12 −6 tests/common.rs
+2 −0 tests/tests/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use papyrus_consensus::types::{
use papyrus_network::network_manager::{BroadcastTopicClient, BroadcastTopicClientTrait};
use papyrus_protobuf::consensus::{
ConsensusMessage,
ProposalFin,
ProposalInit,
ProposalPart,
TransactionBatch,
Expand Down Expand Up @@ -318,6 +319,11 @@ async fn stream_build_proposal(
content.len(),
height
);
debug!("Broadcasting proposal fin: {proposal_content_id:?}");
broadcast_client
.broadcast_message(ProposalPart::Fin(ProposalFin { proposal_content_id }))
.await
.expect("Failed to broadcast proposal fin");
// Update valid_proposals before sending fin to avoid a race condition
// with `repropose` being called before `valid_proposals` is updated.
let mut valid_proposals = valid_proposals.lock().expect("Lock was poisoned");
Expand Down
4 changes: 3 additions & 1 deletion crates/tests_integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ assert_matches.workspace = true
axum.workspace = true
blockifier.workspace = true
cairo-lang-starknet-classes.workspace = true
chrono.workspace = true
indexmap.workspace = true
mempool_test_utils.workspace = true
papyrus_common.workspace = true
papyrus_consensus.workspace = true
papyrus_network = { workspace = true, features = ["testing"] }
papyrus_protobuf.workspace = true
papyrus_rpc.workspace = true
papyrus_storage = { workspace = true, features = ["testing"] }
reqwest.workspace = true
Expand All @@ -42,6 +43,7 @@ tokio.workspace = true
tracing.workspace = true

[dev-dependencies]
futures.workspace = true
pretty_assertions.workspace = true
rstest.workspace = true
starknet_sequencer_infra.workspace = true
10 changes: 8 additions & 2 deletions crates/tests_integration/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::SocketAddr;

use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use starknet_api::rpc_transaction::RpcTransaction;
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::communication::SharedBatcherClient;
Expand Down Expand Up @@ -32,6 +34,9 @@ pub struct FlowTestSetup {

// Handle of the sequencer node.
pub sequencer_node_handle: JoinHandle<Result<(), anyhow::Error>>,

// Channels for consensus proposals, used for asserting the right transactions are proposed.
pub consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
}

impl FlowTestSetup {
Expand All @@ -52,8 +57,8 @@ impl FlowTestSetup {
)
.await;

// Derive the configuration for the mempool node.
let (config, _required_params) =
// Derive the configuration for the sequencer node.
let (config, _required_params, consensus_proposals_channels) =
create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await;

let (clients, servers) = create_node_modules(&config);
Expand All @@ -77,6 +82,7 @@ impl FlowTestSetup {
batcher_client: clients.get_batcher_client().unwrap(),
rpc_storage_file_handle: storage_for_test.rpc_storage_handle,
sequencer_node_handle,
consensus_proposals_channels,
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/tests_integration/src/integration_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl IntegrationTestSetup {
.await;

// Derive the configuration for the sequencer node.
let (config, required_params) =
let (config, required_params, _) =
create_config(rpc_server_addr, storage_for_test.batcher_storage_config).await;

let node_config_dir_handle = tempdir().unwrap();
Expand Down
49 changes: 28 additions & 21 deletions crates/tests_integration/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::future::Future;
use std::net::SocketAddr;
use std::time::Duration;

use axum::body::Body;
use blockifier::context::ChainInfo;
Expand All @@ -11,6 +12,9 @@ use mempool_test_utils::starknet_api_test_utils::{
MultiAccountTransactionGenerator,
};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_network::network_manager::test_utils::create_network_config_connected_to_broadcast_channels;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::ProposalPart;
use papyrus_storage::StorageConfig;
use reqwest::{Client, Response};
use starknet_api::block::BlockNumber;
Expand All @@ -28,29 +32,14 @@ use starknet_gateway::config::{
};
use starknet_gateway_types::errors::GatewaySpecError;
use starknet_http_server::config::HttpServerConfig;
use starknet_sequencer_node::config::component_config::ComponentConfig;
use starknet_sequencer_node::config::test_utils::RequiredParams;
use starknet_sequencer_node::config::{
ComponentExecutionConfig,
ComponentExecutionMode,
SequencerNodeConfig,
};
use starknet_sequencer_node::config::SequencerNodeConfig;
use tokio::net::TcpListener;

pub async fn create_config(
rpc_server_addr: SocketAddr,
batcher_storage_config: StorageConfig,
) -> (SequencerNodeConfig, RequiredParams) {
// TODO(Arni/ Matan): Enable the consensus in the end to end test.
let components = ComponentConfig {
consensus_manager: ComponentExecutionConfig {
execution_mode: ComponentExecutionMode::Disabled,
local_server_config: None,
..Default::default()
},
..Default::default()
};

) -> (SequencerNodeConfig, RequiredParams, BroadcastTopicChannels<ProposalPart>) {
let chain_id = batcher_storage_config.db_config.chain_id.clone();
// TODO(Tsabary): create chain_info in setup, and pass relevant values throughout.
let mut chain_info = ChainInfo::create_for_testing();
Expand All @@ -60,12 +49,10 @@ pub async fn create_config(
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config = create_http_server_config().await;
let rpc_state_reader_config = test_rpc_state_reader_config(rpc_server_addr);
let consensus_manager_config = ConsensusManagerConfig {
consensus_config: ConsensusConfig { start_height: BlockNumber(1), ..Default::default() },
};
let (consensus_manager_config, consensus_proposals_channels) =
create_consensus_manager_config_and_channels();
(
SequencerNodeConfig {
components,
batcher_config,
consensus_manager_config,
gateway_config,
Expand All @@ -78,9 +65,29 @@ pub async fn create_config(
eth_fee_token_address: fee_token_addresses.eth_fee_token_address,
strk_fee_token_address: fee_token_addresses.strk_fee_token_address,
},
consensus_proposals_channels,
)
}

fn create_consensus_manager_config_and_channels()
-> (ConsensusManagerConfig, BroadcastTopicChannels<ProposalPart>) {
let (network_config, broadcast_channels) =
create_network_config_connected_to_broadcast_channels(
papyrus_network::gossipsub_impl::Topic::new(
starknet_consensus_manager::consensus_manager::NETWORK_TOPIC,
),
);
let consensus_manager_config = ConsensusManagerConfig {
consensus_config: ConsensusConfig {
start_height: BlockNumber(1),
consensus_delay: Duration::from_secs(1),
network_config,
..Default::default()
},
};
(consensus_manager_config, broadcast_channels)
}

pub fn test_rpc_state_reader_config(rpc_server_addr: SocketAddr) -> RpcStateReaderConfig {
// TODO(Tsabary): get the latest version from the RPC crate.
const RPC_SPEC_VERSION: &str = "V0_8";
Expand Down
117 changes: 58 additions & 59 deletions crates/tests_integration/tests/end_to_end_flow_test.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
use futures::StreamExt;
use mempool_test_utils::starknet_api_test_utils::MultiAccountTransactionGenerator;
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, ProposalPart};
use pretty_assertions::assert_eq;
use rstest::{fixture, rstest};
use starknet_api::block::BlockNumber;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::{ChainId, ContractAddress};
use starknet_api::transaction::TransactionHash;
use starknet_batcher_types::batcher_types::{
BuildProposalInput,
DecisionReachedInput,
GetProposalContent,
GetProposalContentInput,
ProposalId,
StartHeightInput,
};
use starknet_batcher_types::communication::SharedBatcherClient;
use starknet_integration_tests::flow_test_setup::FlowTestSetup;
use starknet_integration_tests::utils::{
create_integration_test_tx_generator,
run_integration_test_scenario,
};
use starknet_types_core::felt::Felt;

#[fixture]
fn tx_generator() -> MultiAccountTransactionGenerator {
Expand All @@ -25,7 +21,9 @@ fn tx_generator() -> MultiAccountTransactionGenerator {

#[rstest]
#[tokio::test]
async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) {
async fn end_to_end(tx_generator: MultiAccountTransactionGenerator) {
const LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT: std::time::Duration =
std::time::Duration::from_secs(5);
// Setup.
let mock_running_system = FlowTestSetup::new_from_tx_generator(&tx_generator).await;

Expand All @@ -34,60 +32,61 @@ async fn test_end_to_end(tx_generator: MultiAccountTransactionGenerator) {
mock_running_system.assert_add_tx_success(tx)
})
.await;

// Test.
run_consensus_for_end_to_end_test(
&mock_running_system.batcher_client,
&expected_batched_tx_hashes,
// TODO(Dan, Itay): Consider adding a utility function that waits for something to happen.
tokio::time::timeout(
LISTEN_TO_BROADCAST_MESSAGES_TIMEOUT,
listen_to_broadcasted_messages(
mock_running_system.consensus_proposals_channels,
&expected_batched_tx_hashes,
),
)
.await;
.await
.expect("listen to broadcasted messages should finish in time");
}

/// This function should mirror
/// [`run_consensus`](papyrus_consensus::manager::run_consensus). It makes requests
/// from the batcher client and asserts the expected responses were received.
pub async fn run_consensus_for_end_to_end_test(
batcher_client: &SharedBatcherClient,
async fn listen_to_broadcasted_messages(
consensus_proposals_channels: BroadcastTopicChannels<ProposalPart>,
expected_batched_tx_hashes: &[TransactionHash],
) {
// Start height.
// TODO(Arni): Get the current height and retrospective_block_hash from the rpc storage or use
// consensus directly.
let current_height = BlockNumber(1);
batcher_client.start_height(StartHeightInput { height: current_height }).await.unwrap();

// Build proposal.
let proposal_id = ProposalId(0);
let retrospective_block_hash = None;
let build_proposal_duaration = chrono::TimeDelta::new(1, 0).unwrap();
batcher_client
.build_proposal(BuildProposalInput {
proposal_id,
deadline: chrono::Utc::now() + build_proposal_duaration,
retrospective_block_hash,
})
.await
.unwrap();

// Get proposal content.
let mut executed_tx_hashes: Vec<TransactionHash> = vec![];
let _proposal_commitment = loop {
let response = batcher_client
.get_proposal_content(GetProposalContentInput { proposal_id })
.await
.unwrap();
match response.content {
GetProposalContent::Txs(batched_txs) => {
executed_tx_hashes.append(&mut batched_txs.iter().map(|tx| tx.tx_hash()).collect());
// TODO(Dan, Guy): retrieve chain ID. Maybe by modifying IntegrationTestSetup to hold it as a
// member, and instantiate the value using StorageTestSetup.
const CHAIN_ID_NAME: &str = "CHAIN_ID_SUBDIR";
let chain_id = ChainId::Other(CHAIN_ID_NAME.to_string());
let mut broadcasted_messages_receiver =
consensus_proposals_channels.broadcasted_messages_receiver;
let mut received_tx_hashes = vec![];
// TODO (Dan, Guy): retrieve / calculate the expected proposal init and fin.
let expected_proposal_init = ProposalInit {
height: BlockNumber(1),
round: 0,
valid_round: None,
proposer: ContractAddress::default(),
};
let expected_proposal_fin = ProposalFin {
proposal_content_id: BlockHash(Felt::from_hex_unchecked(
"0x4597ceedbef644865917bf723184538ef70d43954d63f5b7d8cb9d1bd4c2c32",
)),
dan-starkware marked this conversation as resolved.
Show resolved Hide resolved
};
assert_eq!(
broadcasted_messages_receiver.next().await.unwrap().0.unwrap(),
ProposalPart::Init(expected_proposal_init)
);
loop {
match broadcasted_messages_receiver.next().await.unwrap().0.unwrap() {
ProposalPart::Init(init) => panic!("Unexpected init: {:?}", init),
ProposalPart::Fin(proposal_fin) => {
assert_eq!(proposal_fin, expected_proposal_fin);
break;
}
GetProposalContent::Finished(proposal_commitment) => {
break proposal_commitment;
ProposalPart::Transactions(transactions) => {
received_tx_hashes.extend(
transactions
.transactions
.iter()
.map(|tx| tx.calculate_transaction_hash(&chain_id).unwrap()),
);
}
}
};

// Decision reached.
batcher_client.decision_reached(DecisionReachedInput { proposal_id }).await.unwrap();

assert_eq!(expected_batched_tx_hashes, executed_tx_hashes);
}
assert_eq!(received_tx_hashes, expected_batched_tx_hashes);
}
Loading