Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): stop sync at block number from config for profiling #1838

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion config/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@
"privacy": "Public",
"value": 10000
},
"p2p_sync.stop_sync_at_block_number": {
"description": "Stops the sync at given block number and closes the node cleanly. Used to run profiling on the node.",
"privacy": "Public",
"value": 1000
},
"p2p_sync.stop_sync_at_block_number.#is_none": {
"description": "Flag for an optional field",
"privacy": "TemporaryValue",
"value": true
},
"p2p_sync.wait_period_for_new_data": {
"description": "Time in seconds to wait when a query returned with partial data before sending a new query",
"privacy": "Public",
Expand Down Expand Up @@ -314,4 +324,4 @@
"privacy": "Public",
"value": true
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,18 @@ expression: dumped_default_config
},
"privacy": "Public"
},
"p2p_sync.stop_sync_at_block_number": {
"description": "Stops the sync at given block number and closes the node cleanly. Used to run profiling on the node.",
"value": {
"$serde_json::private::Number": "1000"
},
"privacy": "Public"
},
"p2p_sync.stop_sync_at_block_number.#is_none": {
"description": "Flag for an optional field",
"value": true,
"privacy": "TemporaryValue"
},
"p2p_sync.wait_period_for_new_data": {
"description": "Time in seconds to wait when a query returned with partial data before sending a new query",
"value": {
Expand Down
8 changes: 7 additions & 1 deletion crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,5 +298,11 @@ async fn main() -> anyhow::Result<()> {
}

info!("Booting up.");
run_threads(config).await
let res = run_threads(config.clone()).await;
if config.p2p_sync.is_some_and(|c| c.stop_sync_at_block_number.is_some()) {
if let Err(err) = res {
error!("Error: {err}");
};
}
Ok(())
}
19 changes: 16 additions & 3 deletions crates/papyrus_p2p_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Duration;

use futures::channel::mpsc::{SendError, Sender};
use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::dumping::{ser_optional_param, ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::{DataType, Query, ResponseReceivers};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
Expand All @@ -37,11 +37,12 @@ pub struct P2PSyncConfig {
pub num_block_state_diffs_per_query: usize,
#[serde(deserialize_with = "deserialize_seconds_to_duration")]
pub wait_period_for_new_data: Duration,
pub stop_sync_at_block_number: Option<BlockNumber>,
}

impl SerializeConfig for P2PSyncConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([
let mut config = BTreeMap::from_iter([
ser_param(
"num_headers_per_query",
&self.num_headers_per_query,
Expand All @@ -61,7 +62,16 @@ impl SerializeConfig for P2PSyncConfig {
new query",
ParamPrivacyInput::Public,
),
])
]);
config.extend(ser_optional_param(
&self.stop_sync_at_block_number,
BlockNumber(1000),
"stop_sync_at_block_number",
"Stops the sync at given block number and closes the node cleanly. Used to run \
profiling on the node.",
ParamPrivacyInput::Public,
));
config
}
}

Expand All @@ -73,6 +83,7 @@ impl Default for P2PSyncConfig {
// messages in the network buffers.
num_block_state_diffs_per_query: 100,
wait_period_for_new_data: Duration::from_secs(5),
stop_sync_at_block_number: None,
}
}
}
Expand Down Expand Up @@ -153,6 +164,7 @@ impl P2PSync {
self.storage_reader.clone(),
self.config.wait_period_for_new_data,
self.config.num_headers_per_query,
self.config.stop_sync_at_block_number,
);

let state_diff_stream = StateDiffStreamFactory::create_stream(
Expand All @@ -163,6 +175,7 @@ impl P2PSync {
self.storage_reader,
self.config.wait_period_for_new_data,
self.config.num_block_state_diffs_per_query,
self.config.stop_sync_at_block_number,
);

let mut data_stream = header_stream.merge(state_diff_stream);
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_p2p_sync/src/p2p_sync_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

7 changes: 7 additions & 0 deletions crates/papyrus_p2p_sync/src/stream_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub(crate) trait DataStreamFactory {
storage_reader: StorageReader,
wait_period_for_new_data: Duration,
num_blocks_per_query: usize,
stop_sync_at_block_number: Option<BlockNumber>,
) -> BoxStream<'static, Result<Box<dyn BlockData>, P2PSyncError>> {
stream! {
let mut current_block_number = Self::get_start_block_number(&storage_reader)?;
Expand Down Expand Up @@ -109,6 +110,12 @@ pub(crate) trait DataStreamFactory {
}
info!("Added {:?} for block {}.", Self::DATA_TYPE, current_block_number);
current_block_number = current_block_number.unchecked_next();
if stop_sync_at_block_number.is_some_and(|stop_sync_at_block_number| {
current_block_number >= stop_sync_at_block_number
}) {
info!("{:?} hit the stop sync block number.", Self::DATA_TYPE);
return;
}
}

// Consume the None message signaling the end of the query.
Expand Down
3 changes: 2 additions & 1 deletion crates/papyrus_p2p_sync/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ lazy_static! {
static ref TEST_CONFIG: P2PSyncConfig = P2PSyncConfig {
num_headers_per_query: HEADER_QUERY_LENGTH,
num_block_state_diffs_per_query: STATE_DIFF_QUERY_LENGTH,
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA
wait_period_for_new_data: WAIT_PERIOD_FOR_NEW_DATA,
stop_sync_at_block_number: None,
};
}

Expand Down
Loading