Skip to content

Commit

Permalink
chore(blockifier_reexecution): add parallel offline reexecution
Browse files Browse the repository at this point in the history
  • Loading branch information
aner-starkware authored Nov 18, 2024
1 parent a624338 commit 846e4ea
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/blockifier_reexecution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ starknet-types-core.workspace = true
starknet_api.workspace = true
starknet_gateway.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }

[dev-dependencies]
rstest.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
"example_declare_v2": 822636,
"example_declare_v3": 825013,
"example_l1_handler": 868429
}
}
109 changes: 84 additions & 25 deletions crates/blockifier_reexecution/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ enum Command {

/// Block numbers. If not specified, blocks are retrieved from
/// get_block_numbers_for_reexecution().
#[clap(long, short = 'b', num_args = 0..)]
block_numbers: Vec<u64>,
#[clap(long, short = 'b', num_args = 1.., default_value = None)]
block_numbers: Option<Vec<u64>>,

// Directory path to json files directory. Default:
// "./crates/blockifier_reexecution/resources".
Expand All @@ -83,14 +83,35 @@ enum Command {
#[clap(long, short = 'd', default_value = None)]
full_file_path: Option<String>,
},

// Reexecute all (selected) blocks
ReexecuteAll {
/// Block numbers. If not specified, blocks are retrieved from
/// get_block_numbers_for_reexecution().
#[clap(long, short = 'b', num_args = 1.., default_value = None)]
block_numbers: Option<Vec<u64>>,

// Directory path to json files directory. Default:
// "./crates/blockifier_reexecution/resources".
// TODO(Aner): add possibility to retrieve files from gc bucket.
#[clap(long, short = 'd', default_value = None)]
directory_path: Option<String>,
},
}

fn parse_block_numbers_args(block_numbers: Option<Vec<u64>>) -> Vec<BlockNumber> {
block_numbers
.map(|block_numbers| block_numbers.into_iter().map(BlockNumber).collect())
.unwrap_or(get_block_numbers_for_reexecution())
}

#[derive(Debug, Args)]
struct GlobalOptions {}

/// Main entry point of the blockifier reexecution CLI.
/// TODO(Aner): Add concurrency to the reexecution tests (using tokio).
fn main() {
#[tokio::main]
async fn main() {
let args = BlockifierReexecutionCliArgs::parse();

match args.command {
Expand All @@ -102,11 +123,18 @@ fn main() {
json_rpc_version: JSON_RPC_VERSION.to_string(),
};

reexecute_and_verify_correctness(ConsecutiveTestStateReaders::new(
BlockNumber(block_number - 1),
Some(config),
false,
));
// RPC calls are "synchronous IO" (see, e.g., https://stackoverflow.com/questions/74547541/when-should-you-use-tokios-spawn-blocking)
// for details), so should be executed in a blocking thread.
// TODO(Aner): make only the RPC calls blocking, not the whole function.
tokio::task::spawn_blocking(move || {
reexecute_and_verify_correctness(ConsecutiveTestStateReaders::new(
BlockNumber(block_number - 1),
Some(config),
false,
))
})
.await
.unwrap();

// Compare the expected and actual state differences
// by avoiding discrepancies caused by insertion order
Expand All @@ -119,34 +147,42 @@ fn main() {
json"
));

write_block_reexecution_data_to_file(
BlockNumber(block_number),
&full_file_path,
node_url,
);
// RPC calls are "synchronous IO" (see, e.g., https://stackoverflow.com/questions/74547541/when-should-you-use-tokios-spawn-blocking
// for details), so should be executed in a blocking thread.
// TODO(Aner): make only the RPC calls blocking, not the whole function.
tokio::task::spawn_blocking(move || {
write_block_reexecution_data_to_file(
BlockNumber(block_number),
full_file_path,
node_url,
);
})
.await
.unwrap();
}

Command::WriteAll { node_url, block_numbers, directory_path } => {
let directory_path =
directory_path.unwrap_or("./crates/blockifier_reexecution/resources".to_string());

let block_numbers = match block_numbers.len() {
0 => get_block_numbers_for_reexecution(),
_ => block_numbers.into_iter().map(BlockNumber).collect(),
};

let block_numbers = parse_block_numbers_args(block_numbers);
println!("Computing reexecution data for blocks {block_numbers:?}.");

// TODO(Aner): Execute in parallel.
// TODO(Aner): Execute in parallel. Requires making the function async, and only the RPC
// calls blocking.
for block_number in block_numbers {
let node_url = node_url.clone();
let full_file_path =
format!("{directory_path}/block_{block_number}/reexecution_data.json");

write_block_reexecution_data_to_file(
block_number,
&full_file_path,
node_url.clone(),
);
// RPC calls are "synchronous IO" (see, e.g., https://stackoverflow.com/questions/74547541/when-should-you-use-tokios-spawn-blocking
// for details), so should be executed in a blocking thread.
// TODO(Aner): make only the RPC calls blocking, not the whole function.
tokio::task::spawn_blocking(move || {
println!("Computing reexecution data for block {block_number}.");
write_block_reexecution_data_to_file(block_number, full_file_path, node_url)
})
.await
.unwrap();
}
}

Expand All @@ -162,5 +198,28 @@ fn main() {

println!("Reexecution test for block {block_number} passed successfully.");
}

Command::ReexecuteAll { block_numbers, directory_path } => {
let directory_path =
directory_path.unwrap_or("./crates/blockifier_reexecution/resources".to_string());

let block_numbers = parse_block_numbers_args(block_numbers);
println!("Reexecuting blocks {block_numbers:?}.");

let mut threads = vec![];
for block in block_numbers {
let full_file_path =
format!("{directory_path}/block_{block}/reexecution_data.json");
threads.push(tokio::task::spawn(async move {
reexecute_and_verify_correctness(
OfflineConsecutiveStateReaders::new_from_file(&full_file_path).unwrap(),
);
println!("Reexecution test for block {block} passed successfully.");
}));
}
for thread in threads {
thread.await.unwrap();
}
}
}
}
4 changes: 2 additions & 2 deletions crates/blockifier_reexecution/src/state_reader/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ pub fn reexecute_block_for_testing(block_number: u64) {

pub fn write_block_reexecution_data_to_file(
block_number: BlockNumber,
full_file_path: &str,
full_file_path: String,
node_url: String,
) {
let config =
Expand Down Expand Up @@ -272,7 +272,7 @@ pub fn write_block_reexecution_data_to_file(
serializable_data_next_block,
old_block_hash,
}
.write_to_file(full_file_path)
.write_to_file(&full_file_path)
.unwrap();

println!("RPC replies required for reexecuting block {block_number} written to json file.");
Expand Down

0 comments on commit 846e4ea

Please sign in to comment.