From 9d89f5c8c8ef2a6f50e4ccafaa56512b50daf157 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 16 Jan 2025 17:28:19 +0400 Subject: [PATCH 1/9] add timestamp to header --- core/src/api/types.rs | 28 ++++++++++++++++++++++++++- core/src/api/v2/handlers.rs | 11 +++++++++-- core/src/data.rs | 15 ++++++++++++++ core/src/data/keys.rs | 2 ++ core/src/network/rpc/client.rs | 19 ++++++++++++++++++ core/src/network/rpc/subscriptions.rs | 13 +++++++++++-- 6 files changed, 83 insertions(+), 5 deletions(-) diff --git a/core/src/api/types.rs b/core/src/api/types.rs index 0650643fd..bb6351cbe 100644 --- a/core/src/api/types.rs +++ b/core/src/api/types.rs @@ -345,6 +345,7 @@ pub struct Header { extrinsics_root: H256, extension: Extension, digest: Digest, + timestamp: Option, } impl Reply for Header { @@ -401,6 +402,24 @@ struct Extension { app_lookup: CompactDataLookup, } +impl Header { + pub fn from_avail_header_and_timestamp( + header: AvailHeader, + timestamp: Option, + ) -> Result { + Ok(Header { + hash: Encode::using_encoded(&header, blake2_256).into(), + parent_hash: header.parent_hash, + number: header.number, + state_root: header.state_root, + extrinsics_root: header.extrinsics_root, + extension: header.extension.try_into()?, + digest: header.digest.try_into()?, + timestamp, + }) + } +} + impl TryFrom for Header { type Error = Report; @@ -413,6 +432,7 @@ impl TryFrom for Header { extrinsics_root: header.extrinsics_root, extension: header.extension.try_into()?, digest: header.digest.try_into()?, + timestamp: None, }) } } @@ -851,7 +871,7 @@ pub enum WsError { #[cfg(test)] mod tests { - use std::time::Duration; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; use avail_rust::{ avail::runtime_types::avail_core::data_lookup::compact::CompactDataLookup, H256, @@ -903,6 +923,12 @@ mod tests { digest: Digest { logs: vec![DigestItem::RuntimeEnvironmentUpdated], }, + timestamp: Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + ), }, })) } diff --git a/core/src/api/v2/handlers.rs b/core/src/api/v2/handlers.rs index 98705db1d..206695544 100644 --- a/core/src/api/v2/handlers.rs +++ b/core/src/api/v2/handlers.rs @@ -14,7 +14,9 @@ use crate::{ Subscription, SubscriptionId, Transaction, Version, WsClients, }, }, - data::{AppDataKey, BlockHeaderKey, Database, RpcNodeKey, VerifiedCellCountKey}, + data::{ + AppDataKey, BlockHeaderKey, BlockTimestampKey, Database, RpcNodeKey, VerifiedCellCountKey, + }, utils::calculate_confidence, }; @@ -109,6 +111,11 @@ pub async fn block_header( .and_then(|extension| block_status(sync_start_block, db.clone(), block_number, extension)) .ok_or(Error::not_found())?; + let block_timestamp = db + .get(BlockTimestampKey(block_number)) + .map(|timestamp| timestamp) + .ok_or_else(|| Error::not_found())?; + if matches!( block_status, BlockStatus::Unavailable | BlockStatus::Pending | BlockStatus::VerifyingHeader @@ -118,7 +125,7 @@ pub async fn block_header( db.get(BlockHeaderKey(block_number)) .ok_or_else(|| eyre!("Header not found")) - .and_then(|header| header.try_into()) + .and_then(|header| Header::from_avail_header_and_timestamp(header, Some(block_timestamp))) .map_err(Error::internal_server_error) } diff --git a/core/src/data.rs b/core/src/data.rs index d4daef4c5..60c3abce2 100644 --- a/core/src/data.rs +++ b/core/src/data.rs @@ -310,3 +310,18 @@ impl RecordKey for SignerNonceKey { SIGNER_NONCE.into() } } + +pub struct BlockTimestampKey(pub u32); + +impl RecordKey for BlockTimestampKey { + type Type = u64; + + fn space(&self) -> Option<&'static str> { + Some(&BLOCK_TIMESTAMP_KEY) + } + + fn key(&self) -> String { + let BlockTimestampKey(block_num) = self; + format!("{BLOCK_TIMESTAMP_KEY}:{block_num}") + } +} diff --git a/core/src/data/keys.rs b/core/src/data/keys.rs index 499e2b020..d19b1f0b8 100644 --- a/core/src/data/keys.rs +++ b/core/src/data/keys.rs @@ -35,3 +35,5 @@ pub const CLIENT_ID_KEY: &str = "client_id"; pub const P2P_KEYPAIR_KEY: &str = "p2p_keypair"; /// Key for storing signer nonce pub const SIGNER_NONCE: &str = "signer_nonce"; +/// Key for storing block timestamp +pub const BLOCK_TIMESTAMP_KEY: &str = "block_timestamp"; diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index 85a0e1c5a..ea9ab8ef6 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -547,6 +547,25 @@ impl Client { )) } + pub async fn get_block_timestamp(&self, block_hash: H256) -> Result { + self.with_retries(|client| async move { + let storage_query = avail::storage().timestamp().now(); + let storage = client.api.storage().at(block_hash); + storage + .fetch(&storage_query) // Clone the value for each use + .await? + .ok_or_else(|| { + subxt::Error::Other(format!("Block Header with hash: {block_hash:?} not found")) + }) + .map_err(Into::into) + }) + .await + .wrap_err(format!( + "Timestamp for Block Header with hash: {:?} not found", + block_hash + )) + } + pub async fn get_validator_set_by_hash(&self, block_hash: H256) -> Result> { let res = self .with_retries(|client| async move { diff --git a/core/src/network/rpc/subscriptions.rs b/core/src/network/rpc/subscriptions.rs index 38bfead47..34a6fa475 100644 --- a/core/src/network/rpc/subscriptions.rs +++ b/core/src/network/rpc/subscriptions.rs @@ -16,8 +16,8 @@ use web_time::Instant; use super::{Client, OutputEvent, Subscription}; use crate::{ data::{ - Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, IsFinalitySyncedKey, - LatestHeaderKey, VerifiedHeaderKey, + BlockTimestampKey, Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, + IsFinalitySyncedKey, LatestHeaderKey, VerifiedHeaderKey, }, finality::{check_finality, ValidatorSet}, types::{BlockRange, GrandpaJustification}, @@ -101,6 +101,15 @@ impl SubscriptionLoop { self.db.put(LatestHeaderKey, header.clone().number); info!("Header no.: {}", header.number); + let block_hash = self.rpc_client.get_block_hash(header.number).await.unwrap(); + let block_timestamp = self + .rpc_client + .get_block_timestamp(block_hash) + .await + .unwrap(); + self.db + .put(BlockTimestampKey(header.number), block_timestamp); + // if new validator set becomes active, replace the current one if self.block_data.next_valset.is_some() { self.block_data.current_valset = self.block_data.next_valset.take().unwrap(); From 963a28308caf5c97e3528a2c866fce0eba91a63a Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:41:30 +0400 Subject: [PATCH 2/9] add tests --- core/src/api/v2/mod.rs | 20 +++++++++++++++++--- core/src/data.rs | 2 +- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core/src/api/v2/mod.rs b/core/src/api/v2/mod.rs index 98a68b189..ec41f400c 100644 --- a/core/src/api/v2/mod.rs +++ b/core/src/api/v2/mod.rs @@ -217,8 +217,9 @@ mod tests { }, data::{ self, AchievedConfidenceKey, AchievedSyncConfidenceKey, AppDataKey, BlockHeaderKey, - Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, RpcNodeKey, - VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey, + BlockTimestampKey, Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, + RpcNodeKey, VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, + VerifiedSyncDataKey, }, network::rpc::Node, types::BlockRange, @@ -235,7 +236,13 @@ mod tests { AvailHeader, H256, }; use hyper::StatusCode; - use std::{collections::HashSet, str::FromStr, sync::Arc}; + use std::{ + collections::HashSet, + str::FromStr, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, + }; + use test_case::test_case; use uuid::Uuid; @@ -405,6 +412,13 @@ mod tests { db.put(VerifiedHeaderKey, BlockRange::init(9)); db.put(LatestSyncKey, 5); db.put(BlockHeaderKey(block_number), header()); + db.put( + BlockTimestampKey(block_number), + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs(), + ); let route = super::block_header_route(config, db); let response = warp::test::request() .method("GET") diff --git a/core/src/data.rs b/core/src/data.rs index 60c3abce2..dad73e312 100644 --- a/core/src/data.rs +++ b/core/src/data.rs @@ -317,7 +317,7 @@ impl RecordKey for BlockTimestampKey { type Type = u64; fn space(&self) -> Option<&'static str> { - Some(&BLOCK_TIMESTAMP_KEY) + Some(APP_STATE_CF) } fn key(&self) -> String { From ae681993fe09253fb3e5c47a25cf40bc92e6be15 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:42:33 +0400 Subject: [PATCH 3/9] update changelog --- core/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 6f14ee578..27dcade45 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -2,6 +2,7 @@ ## 1.2.0 +- Add block timestamp to v2 header API - Fix issue with multiple telemetry gauge callbacks ## [1.1.0](https://github.com/availproject/avail-light/tree/avail-light-core-v1.1.0) - 2024-12-20 From fce7c871e57d5553a82b8164f232ed2cdb6b95fa Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:57:52 +0400 Subject: [PATCH 4/9] fix tests --- core/src/api/v2/handlers.rs | 3 +-- core/src/api/v2/mod.rs | 3 ++- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/api/v2/handlers.rs b/core/src/api/v2/handlers.rs index 206695544..ab61b1112 100644 --- a/core/src/api/v2/handlers.rs +++ b/core/src/api/v2/handlers.rs @@ -113,8 +113,7 @@ pub async fn block_header( let block_timestamp = db .get(BlockTimestampKey(block_number)) - .map(|timestamp| timestamp) - .ok_or_else(|| Error::not_found())?; + .ok_or_else(Error::not_found)?; if matches!( block_status, diff --git a/core/src/api/v2/mod.rs b/core/src/api/v2/mod.rs index ec41f400c..4fcf93684 100644 --- a/core/src/api/v2/mod.rs +++ b/core/src/api/v2/mod.rs @@ -487,6 +487,7 @@ mod tests { db.put(LatestHeaderKey, 1); db.put(VerifiedHeaderKey, BlockRange::init(1)); db.put(BlockHeaderKey(1), header()); + db.put(BlockTimestampKey(1), 1737039274); let route = super::block_header_route(config, db); let response = warp::test::request() .method("GET") @@ -495,7 +496,7 @@ mod tests { .await; assert_eq!( response.body(), - r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]}}"# + r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]},"timestamp":1737039274}"# ); } From 9c42ecd0c1bf822396fbfa372e8f25b526ee027b Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Thu, 16 Jan 2025 20:00:47 +0400 Subject: [PATCH 5/9] remove redundant docs --- core/src/network/rpc/client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index ea9ab8ef6..a7ee96672 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -552,7 +552,7 @@ impl Client { let storage_query = avail::storage().timestamp().now(); let storage = client.api.storage().at(block_hash); storage - .fetch(&storage_query) // Clone the value for each use + .fetch(&storage_query) .await? .ok_or_else(|| { subxt::Error::Other(format!("Block Header with hash: {block_hash:?} not found")) From 73caa53ddba1043abc413626866f055a54913094 Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Fri, 17 Jan 2025 15:54:52 +0400 Subject: [PATCH 6/9] use received at as timestamp --- core/src/network/rpc/client.rs | 19 ------------------- core/src/network/rpc/subscriptions.rs | 17 ++++++++--------- 2 files changed, 8 insertions(+), 28 deletions(-) diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index a7ee96672..85a0e1c5a 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -547,25 +547,6 @@ impl Client { )) } - pub async fn get_block_timestamp(&self, block_hash: H256) -> Result { - self.with_retries(|client| async move { - let storage_query = avail::storage().timestamp().now(); - let storage = client.api.storage().at(block_hash); - storage - .fetch(&storage_query) - .await? - .ok_or_else(|| { - subxt::Error::Other(format!("Block Header with hash: {block_hash:?} not found")) - }) - .map_err(Into::into) - }) - .await - .wrap_err(format!( - "Timestamp for Block Header with hash: {:?} not found", - block_hash - )) - } - pub async fn get_validator_set_by_hash(&self, block_hash: H256) -> Result> { let res = self .with_retries(|client| async move { diff --git a/core/src/network/rpc/subscriptions.rs b/core/src/network/rpc/subscriptions.rs index 34a6fa475..aac63a59b 100644 --- a/core/src/network/rpc/subscriptions.rs +++ b/core/src/network/rpc/subscriptions.rs @@ -6,7 +6,7 @@ use avail_rust::{ use codec::Encode; use color_eyre::{eyre::eyre, Result}; #[cfg(not(target_arch = "wasm32"))] -use std::time::Instant; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use tracing::{debug, info, trace}; @@ -98,17 +98,16 @@ impl SubscriptionLoop { match subscription { Subscription::Header(header) => { let received_at = Instant::now(); + let current_system_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); self.db.put(LatestHeaderKey, header.clone().number); info!("Header no.: {}", header.number); - let block_hash = self.rpc_client.get_block_hash(header.number).await.unwrap(); - let block_timestamp = self - .rpc_client - .get_block_timestamp(block_hash) - .await - .unwrap(); - self.db - .put(BlockTimestampKey(header.number), block_timestamp); + self.db.put( + BlockTimestampKey(header.number), + current_system_time.as_secs(), + ); // if new validator set becomes active, replace the current one if self.block_data.next_valset.is_some() { From bb916b63b86ccbb5225d04d7ad977c5b2354b68a Mon Sep 17 00:00:00 2001 From: vbhattaccmu <53374818+vbhattaccmu@users.noreply.github.com> Date: Fri, 17 Jan 2025 16:53:24 +0400 Subject: [PATCH 7/9] add crates to web-time --- core/src/network/rpc/subscriptions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/network/rpc/subscriptions.rs b/core/src/network/rpc/subscriptions.rs index aac63a59b..a9af0c7bd 100644 --- a/core/src/network/rpc/subscriptions.rs +++ b/core/src/network/rpc/subscriptions.rs @@ -11,7 +11,7 @@ use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use tracing::{debug, info, trace}; #[cfg(target_arch = "wasm32")] -use web_time::Instant; +use web_time::{Instant, SystemTime, UNIX_EPOCH}; use super::{Client, OutputEvent, Subscription}; use crate::{ From 193c684b9030c49b1abca4706da33797dde2a591 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Tue, 21 Jan 2025 08:12:41 +0100 Subject: [PATCH 8/9] Rename timestamp to received_at for Header --- core/src/api/types.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/api/types.rs b/core/src/api/types.rs index bb6351cbe..1a0f47b25 100644 --- a/core/src/api/types.rs +++ b/core/src/api/types.rs @@ -345,7 +345,7 @@ pub struct Header { extrinsics_root: H256, extension: Extension, digest: Digest, - timestamp: Option, + received_at: Option, } impl Reply for Header { @@ -415,7 +415,7 @@ impl Header { extrinsics_root: header.extrinsics_root, extension: header.extension.try_into()?, digest: header.digest.try_into()?, - timestamp, + received_at: timestamp, }) } } @@ -432,7 +432,7 @@ impl TryFrom for Header { extrinsics_root: header.extrinsics_root, extension: header.extension.try_into()?, digest: header.digest.try_into()?, - timestamp: None, + received_at: None, }) } } @@ -923,7 +923,7 @@ mod tests { digest: Digest { logs: vec![DigestItem::RuntimeEnvironmentUpdated], }, - timestamp: Some( + received_at: Some( SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() From 4f4d5ba3c73c5674e9a0df1386f4dd051d03c3b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Tue, 21 Jan 2025 10:53:43 +0100 Subject: [PATCH 9/9] Add received_at timestamp to header message --- core/src/api/types.rs | 47 ++++++++------------------- core/src/api/v2/README.md | 2 ++ core/src/api/v2/handlers.rs | 9 ++--- core/src/api/v2/mod.rs | 23 ++++--------- core/src/crawl_client.rs | 1 + core/src/data.rs | 8 ++--- core/src/data/keys.rs | 4 +-- core/src/fat_client.rs | 1 + core/src/light_client.rs | 1 + core/src/network/rpc.rs | 1 + core/src/network/rpc/subscriptions.rs | 18 +++++++--- 11 files changed, 50 insertions(+), 65 deletions(-) diff --git a/core/src/api/types.rs b/core/src/api/types.rs index 1a0f47b25..c7cca8673 100644 --- a/core/src/api/types.rs +++ b/core/src/api/types.rs @@ -324,11 +324,11 @@ impl Reply for Block { } } -impl TryFrom for HeaderMessage { +impl TryFrom<(AvailHeader, u64)> for HeaderMessage { type Error = Report; - fn try_from(header: AvailHeader) -> Result { - let header: Header = header.try_into()?; + fn try_from((header, received_at): (AvailHeader, u64)) -> Result { + let header: Header = (header, received_at).try_into()?; Ok(Self { block_number: header.number, header, @@ -345,7 +345,7 @@ pub struct Header { extrinsics_root: H256, extension: Extension, digest: Digest, - received_at: Option, + received_at: u64, } impl Reply for Header { @@ -402,28 +402,10 @@ struct Extension { app_lookup: CompactDataLookup, } -impl Header { - pub fn from_avail_header_and_timestamp( - header: AvailHeader, - timestamp: Option, - ) -> Result { - Ok(Header { - hash: Encode::using_encoded(&header, blake2_256).into(), - parent_hash: header.parent_hash, - number: header.number, - state_root: header.state_root, - extrinsics_root: header.extrinsics_root, - extension: header.extension.try_into()?, - digest: header.digest.try_into()?, - received_at: timestamp, - }) - } -} - -impl TryFrom for Header { +impl TryFrom<(AvailHeader, u64)> for Header { type Error = Report; - fn try_from(header: AvailHeader) -> Result { + fn try_from((header, received_at): (AvailHeader, u64)) -> Result { Ok(Header { hash: Encode::using_encoded(&header, blake2_256).into(), parent_hash: header.parent_hash, @@ -432,7 +414,7 @@ impl TryFrom for Header { extrinsics_root: header.extrinsics_root, extension: header.extension.try_into()?, digest: header.digest.try_into()?, - received_at: None, + received_at, }) } } @@ -503,7 +485,11 @@ impl TryFrom for Option { fn try_from(value: RpcEvent) -> Result { match value { - RpcEvent::HeaderUpdate { header, .. } => header + RpcEvent::HeaderUpdate { + header, + received_at: _, + received_at_timestamp, + } => (header, received_at_timestamp) .try_into() .map(Box::new) .map(PublishMessage::HeaderVerified) @@ -871,7 +857,7 @@ pub enum WsError { #[cfg(test)] mod tests { - use std::time::{Duration, SystemTime, UNIX_EPOCH}; + use std::time::Duration; use avail_rust::{ avail::runtime_types::avail_core::data_lookup::compact::CompactDataLookup, H256, @@ -923,12 +909,7 @@ mod tests { digest: Digest { logs: vec![DigestItem::RuntimeEnvironmentUpdated], }, - received_at: Some( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(), - ), + received_at: 0, }, })) } diff --git a/core/src/api/v2/README.md b/core/src/api/v2/README.md index 7d87234ec..84f716e58 100644 --- a/core/src/api/v2/README.md +++ b/core/src/api/v2/README.md @@ -168,6 +168,7 @@ Content-Type: application/json "{log}", ... ] }, + "received_at": "{received-at-timestamp}" } ``` @@ -595,6 +596,7 @@ When header verification is finished, the message is pushed to the light client ] } } + "received_at": "{received-at-timestamp}", } } } diff --git a/core/src/api/v2/handlers.rs b/core/src/api/v2/handlers.rs index ab61b1112..3775cdb71 100644 --- a/core/src/api/v2/handlers.rs +++ b/core/src/api/v2/handlers.rs @@ -15,7 +15,8 @@ use crate::{ }, }, data::{ - AppDataKey, BlockHeaderKey, BlockTimestampKey, Database, RpcNodeKey, VerifiedCellCountKey, + AppDataKey, BlockHeaderKey, BlockHeaderReceivedAtKey, Database, RpcNodeKey, + VerifiedCellCountKey, }, utils::calculate_confidence, }; @@ -111,8 +112,8 @@ pub async fn block_header( .and_then(|extension| block_status(sync_start_block, db.clone(), block_number, extension)) .ok_or(Error::not_found())?; - let block_timestamp = db - .get(BlockTimestampKey(block_number)) + let received_at = db + .get(BlockHeaderReceivedAtKey(block_number)) .ok_or_else(Error::not_found)?; if matches!( @@ -124,7 +125,7 @@ pub async fn block_header( db.get(BlockHeaderKey(block_number)) .ok_or_else(|| eyre!("Header not found")) - .and_then(|header| Header::from_avail_header_and_timestamp(header, Some(block_timestamp))) + .and_then(|header| (header, received_at).try_into()) .map_err(Error::internal_server_error) } diff --git a/core/src/api/v2/mod.rs b/core/src/api/v2/mod.rs index 4fcf93684..375640449 100644 --- a/core/src/api/v2/mod.rs +++ b/core/src/api/v2/mod.rs @@ -217,8 +217,8 @@ mod tests { }, data::{ self, AchievedConfidenceKey, AchievedSyncConfidenceKey, AppDataKey, BlockHeaderKey, - BlockTimestampKey, Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, - RpcNodeKey, VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, + BlockHeaderReceivedAtKey, Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, + MemoryDB, RpcNodeKey, VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey, }, network::rpc::Node, @@ -236,12 +236,7 @@ mod tests { AvailHeader, H256, }; use hyper::StatusCode; - use std::{ - collections::HashSet, - str::FromStr, - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, - }; + use std::{collections::HashSet, str::FromStr, sync::Arc}; use test_case::test_case; use uuid::Uuid; @@ -412,13 +407,7 @@ mod tests { db.put(VerifiedHeaderKey, BlockRange::init(9)); db.put(LatestSyncKey, 5); db.put(BlockHeaderKey(block_number), header()); - db.put( - BlockTimestampKey(block_number), - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_secs(), - ); + db.put(BlockHeaderReceivedAtKey(block_number), 1737039274); let route = super::block_header_route(config, db); let response = warp::test::request() .method("GET") @@ -487,7 +476,7 @@ mod tests { db.put(LatestHeaderKey, 1); db.put(VerifiedHeaderKey, BlockRange::init(1)); db.put(BlockHeaderKey(1), header()); - db.put(BlockTimestampKey(1), 1737039274); + db.put(BlockHeaderReceivedAtKey(1), 1737039274); let route = super::block_header_route(config, db); let response = warp::test::request() .method("GET") @@ -496,7 +485,7 @@ mod tests { .await; assert_eq!( response.body(), - r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]},"timestamp":1737039274}"# + r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]},"received_at":1737039274}"# ); } diff --git a/core/src/crawl_client.rs b/core/src/crawl_client.rs index e2270c8cd..ebf278faf 100644 --- a/core/src/crawl_client.rs +++ b/core/src/crawl_client.rs @@ -79,6 +79,7 @@ pub async fn run( while let Ok(rpc::OutputEvent::HeaderUpdate { header, received_at, + .. }) = message_rx.recv().await { let block = match types::BlockVerified::try_from((header, None)) { diff --git a/core/src/data.rs b/core/src/data.rs index dad73e312..502a7cbbc 100644 --- a/core/src/data.rs +++ b/core/src/data.rs @@ -311,9 +311,9 @@ impl RecordKey for SignerNonceKey { } } -pub struct BlockTimestampKey(pub u32); +pub struct BlockHeaderReceivedAtKey(pub u32); -impl RecordKey for BlockTimestampKey { +impl RecordKey for BlockHeaderReceivedAtKey { type Type = u64; fn space(&self) -> Option<&'static str> { @@ -321,7 +321,7 @@ impl RecordKey for BlockTimestampKey { } fn key(&self) -> String { - let BlockTimestampKey(block_num) = self; - format!("{BLOCK_TIMESTAMP_KEY}:{block_num}") + let BlockHeaderReceivedAtKey(block_num) = self; + format!("{BLOCK_HEADER_RECEIVED_AT}:{block_num}") } } diff --git a/core/src/data/keys.rs b/core/src/data/keys.rs index d19b1f0b8..134d3a35c 100644 --- a/core/src/data/keys.rs +++ b/core/src/data/keys.rs @@ -35,5 +35,5 @@ pub const CLIENT_ID_KEY: &str = "client_id"; pub const P2P_KEYPAIR_KEY: &str = "p2p_keypair"; /// Key for storing signer nonce pub const SIGNER_NONCE: &str = "signer_nonce"; -/// Key for storing block timestamp -pub const BLOCK_TIMESTAMP_KEY: &str = "block_timestamp"; +/// Key for storing block header received at timestamp +pub const BLOCK_HEADER_RECEIVED_AT: &str = "block_header_received_at"; diff --git a/core/src/fat_client.rs b/core/src/fat_client.rs index 53268103a..1d35cb0e2 100644 --- a/core/src/fat_client.rs +++ b/core/src/fat_client.rs @@ -249,6 +249,7 @@ pub async fn run( RpcEvent::HeaderUpdate { header, received_at, + .. } => (header, received_at), // skip ConnectedHost event RpcEvent::ConnectedHost(_) => continue, diff --git a/core/src/light_client.rs b/core/src/light_client.rs index 8caee33be..54258c489 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -209,6 +209,7 @@ pub async fn run( RpcEvent::HeaderUpdate { header, received_at, + .. } => (header, received_at), // skip ConnectedHost event RpcEvent::ConnectedHost(_) => continue, diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index 59c15e4fc..848ca9e39 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -46,6 +46,7 @@ pub enum OutputEvent { HeaderUpdate { header: AvailHeader, received_at: Instant, + received_at_timestamp: u64, }, } diff --git a/core/src/network/rpc/subscriptions.rs b/core/src/network/rpc/subscriptions.rs index a9af0c7bd..38b2ed2e5 100644 --- a/core/src/network/rpc/subscriptions.rs +++ b/core/src/network/rpc/subscriptions.rs @@ -16,7 +16,7 @@ use web_time::{Instant, SystemTime, UNIX_EPOCH}; use super::{Client, OutputEvent, Subscription}; use crate::{ data::{ - BlockTimestampKey, Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, + BlockHeaderReceivedAtKey, Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, IsFinalitySyncedKey, LatestHeaderKey, VerifiedHeaderKey, }, finality::{check_finality, ValidatorSet}, @@ -98,15 +98,16 @@ impl SubscriptionLoop { match subscription { Subscription::Header(header) => { let received_at = Instant::now(); - let current_system_time = SystemTime::now() + let received_at_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) - .expect("Time went backwards"); + .expect("Time went backwards") + .as_secs(); self.db.put(LatestHeaderKey, header.clone().number); info!("Header no.: {}", header.number); self.db.put( - BlockTimestampKey(header.number), - current_system_time.as_secs(), + BlockHeaderReceivedAtKey(header.number), + received_at_timestamp, ); // if new validator set becomes active, replace the current one @@ -171,6 +172,11 @@ impl SubscriptionLoop { let (header, received_at, valset) = self.block_data.unverified_headers.swap_remove(pos); + let received_at_timestamp = self + .db + .get(BlockHeaderReceivedAtKey(header.number)) + .expect("Block header timestamp is in the database"); + let is_final = check_finality(&valset, &justification); is_final.expect("Finality check failed"); @@ -224,6 +230,7 @@ impl SubscriptionLoop { .send(OutputEvent::HeaderUpdate { header, received_at, + received_at_timestamp, }) .unwrap(); } @@ -248,6 +255,7 @@ impl SubscriptionLoop { .send(OutputEvent::HeaderUpdate { header, received_at, + received_at_timestamp, }) .unwrap(); } else {