diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index a858180de..47725b405 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -2,6 +2,7 @@ ## 1.2.0 +- Add timestamp to v2 header API - Increase maximum kademlia record size to allow row with 512 cells - Fix issue with multiple telemetry gauge callbacks diff --git a/core/src/api/types.rs b/core/src/api/types.rs index 0650643fd..c7cca8673 100644 --- a/core/src/api/types.rs +++ b/core/src/api/types.rs @@ -324,11 +324,11 @@ impl Reply for Block { } } -impl TryFrom for HeaderMessage { +impl TryFrom<(AvailHeader, u64)> for HeaderMessage { type Error = Report; - fn try_from(header: AvailHeader) -> Result { - let header: Header = header.try_into()?; + fn try_from((header, received_at): (AvailHeader, u64)) -> Result { + let header: Header = (header, received_at).try_into()?; Ok(Self { block_number: header.number, header, @@ -345,6 +345,7 @@ pub struct Header { extrinsics_root: H256, extension: Extension, digest: Digest, + received_at: u64, } impl Reply for Header { @@ -401,10 +402,10 @@ struct Extension { app_lookup: CompactDataLookup, } -impl TryFrom for Header { +impl TryFrom<(AvailHeader, u64)> for Header { type Error = Report; - fn try_from(header: AvailHeader) -> Result { + fn try_from((header, received_at): (AvailHeader, u64)) -> Result { Ok(Header { hash: Encode::using_encoded(&header, blake2_256).into(), parent_hash: header.parent_hash, @@ -413,6 +414,7 @@ impl TryFrom for Header { extrinsics_root: header.extrinsics_root, extension: header.extension.try_into()?, digest: header.digest.try_into()?, + received_at, }) } } @@ -483,7 +485,11 @@ impl TryFrom for Option { fn try_from(value: RpcEvent) -> Result { match value { - RpcEvent::HeaderUpdate { header, .. } => header + RpcEvent::HeaderUpdate { + header, + received_at: _, + received_at_timestamp, + } => (header, received_at_timestamp) .try_into() .map(Box::new) .map(PublishMessage::HeaderVerified) @@ -903,6 +909,7 @@ mod tests { digest: Digest { logs: vec![DigestItem::RuntimeEnvironmentUpdated], }, + received_at: 0, }, })) } diff --git a/core/src/api/v2/README.md b/core/src/api/v2/README.md index 7d87234ec..84f716e58 100644 --- a/core/src/api/v2/README.md +++ b/core/src/api/v2/README.md @@ -168,6 +168,7 @@ Content-Type: application/json "{log}", ... ] }, + "received_at": "{received-at-timestamp}" } ``` @@ -595,6 +596,7 @@ When header verification is finished, the message is pushed to the light client ] } } + "received_at": "{received-at-timestamp}", } } } diff --git a/core/src/api/v2/handlers.rs b/core/src/api/v2/handlers.rs index 98705db1d..3775cdb71 100644 --- a/core/src/api/v2/handlers.rs +++ b/core/src/api/v2/handlers.rs @@ -14,7 +14,10 @@ use crate::{ Subscription, SubscriptionId, Transaction, Version, WsClients, }, }, - data::{AppDataKey, BlockHeaderKey, Database, RpcNodeKey, VerifiedCellCountKey}, + data::{ + AppDataKey, BlockHeaderKey, BlockHeaderReceivedAtKey, Database, RpcNodeKey, + VerifiedCellCountKey, + }, utils::calculate_confidence, }; @@ -109,6 +112,10 @@ pub async fn block_header( .and_then(|extension| block_status(sync_start_block, db.clone(), block_number, extension)) .ok_or(Error::not_found())?; + let received_at = db + .get(BlockHeaderReceivedAtKey(block_number)) + .ok_or_else(Error::not_found)?; + if matches!( block_status, BlockStatus::Unavailable | BlockStatus::Pending | BlockStatus::VerifyingHeader @@ -118,7 +125,7 @@ pub async fn block_header( db.get(BlockHeaderKey(block_number)) .ok_or_else(|| eyre!("Header not found")) - .and_then(|header| header.try_into()) + .and_then(|header| (header, received_at).try_into()) .map_err(Error::internal_server_error) } diff --git a/core/src/api/v2/mod.rs b/core/src/api/v2/mod.rs index 98a68b189..375640449 100644 --- a/core/src/api/v2/mod.rs +++ b/core/src/api/v2/mod.rs @@ -217,8 +217,9 @@ mod tests { }, data::{ self, AchievedConfidenceKey, AchievedSyncConfidenceKey, AppDataKey, BlockHeaderKey, - Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, RpcNodeKey, - VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey, + BlockHeaderReceivedAtKey, Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, + MemoryDB, RpcNodeKey, VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, + VerifiedSyncDataKey, }, network::rpc::Node, types::BlockRange, @@ -236,6 +237,7 @@ mod tests { }; use hyper::StatusCode; use std::{collections::HashSet, str::FromStr, sync::Arc}; + use test_case::test_case; use uuid::Uuid; @@ -405,6 +407,7 @@ mod tests { db.put(VerifiedHeaderKey, BlockRange::init(9)); db.put(LatestSyncKey, 5); db.put(BlockHeaderKey(block_number), header()); + db.put(BlockHeaderReceivedAtKey(block_number), 1737039274); let route = super::block_header_route(config, db); let response = warp::test::request() .method("GET") @@ -473,6 +476,7 @@ mod tests { db.put(LatestHeaderKey, 1); db.put(VerifiedHeaderKey, BlockRange::init(1)); db.put(BlockHeaderKey(1), header()); + db.put(BlockHeaderReceivedAtKey(1), 1737039274); let route = super::block_header_route(config, db); let response = warp::test::request() .method("GET") @@ -481,7 +485,7 @@ mod tests { .await; assert_eq!( response.body(), - r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]}}"# + r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]},"received_at":1737039274}"# ); } diff --git a/core/src/crawl_client.rs b/core/src/crawl_client.rs index e2270c8cd..ebf278faf 100644 --- a/core/src/crawl_client.rs +++ b/core/src/crawl_client.rs @@ -79,6 +79,7 @@ pub async fn run( while let Ok(rpc::OutputEvent::HeaderUpdate { header, received_at, + .. }) = message_rx.recv().await { let block = match types::BlockVerified::try_from((header, None)) { diff --git a/core/src/data.rs b/core/src/data.rs index d4daef4c5..502a7cbbc 100644 --- a/core/src/data.rs +++ b/core/src/data.rs @@ -310,3 +310,18 @@ impl RecordKey for SignerNonceKey { SIGNER_NONCE.into() } } + +pub struct BlockHeaderReceivedAtKey(pub u32); + +impl RecordKey for BlockHeaderReceivedAtKey { + type Type = u64; + + fn space(&self) -> Option<&'static str> { + Some(APP_STATE_CF) + } + + fn key(&self) -> String { + let BlockHeaderReceivedAtKey(block_num) = self; + format!("{BLOCK_HEADER_RECEIVED_AT}:{block_num}") + } +} diff --git a/core/src/data/keys.rs b/core/src/data/keys.rs index 499e2b020..134d3a35c 100644 --- a/core/src/data/keys.rs +++ b/core/src/data/keys.rs @@ -35,3 +35,5 @@ pub const CLIENT_ID_KEY: &str = "client_id"; pub const P2P_KEYPAIR_KEY: &str = "p2p_keypair"; /// Key for storing signer nonce pub const SIGNER_NONCE: &str = "signer_nonce"; +/// Key for storing block header received at timestamp +pub const BLOCK_HEADER_RECEIVED_AT: &str = "block_header_received_at"; diff --git a/core/src/fat_client.rs b/core/src/fat_client.rs index 53268103a..1d35cb0e2 100644 --- a/core/src/fat_client.rs +++ b/core/src/fat_client.rs @@ -249,6 +249,7 @@ pub async fn run( RpcEvent::HeaderUpdate { header, received_at, + .. } => (header, received_at), // skip ConnectedHost event RpcEvent::ConnectedHost(_) => continue, diff --git a/core/src/light_client.rs b/core/src/light_client.rs index 8caee33be..54258c489 100644 --- a/core/src/light_client.rs +++ b/core/src/light_client.rs @@ -209,6 +209,7 @@ pub async fn run( RpcEvent::HeaderUpdate { header, received_at, + .. } => (header, received_at), // skip ConnectedHost event RpcEvent::ConnectedHost(_) => continue, diff --git a/core/src/network/rpc.rs b/core/src/network/rpc.rs index 59c15e4fc..848ca9e39 100644 --- a/core/src/network/rpc.rs +++ b/core/src/network/rpc.rs @@ -46,6 +46,7 @@ pub enum OutputEvent { HeaderUpdate { header: AvailHeader, received_at: Instant, + received_at_timestamp: u64, }, } diff --git a/core/src/network/rpc/subscriptions.rs b/core/src/network/rpc/subscriptions.rs index 38bfead47..38b2ed2e5 100644 --- a/core/src/network/rpc/subscriptions.rs +++ b/core/src/network/rpc/subscriptions.rs @@ -6,18 +6,18 @@ use avail_rust::{ use codec::Encode; use color_eyre::{eyre::eyre, Result}; #[cfg(not(target_arch = "wasm32"))] -use std::time::Instant; +use std::time::{Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use tracing::{debug, info, trace}; #[cfg(target_arch = "wasm32")] -use web_time::Instant; +use web_time::{Instant, SystemTime, UNIX_EPOCH}; use super::{Client, OutputEvent, Subscription}; use crate::{ data::{ - Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, IsFinalitySyncedKey, - LatestHeaderKey, VerifiedHeaderKey, + BlockHeaderReceivedAtKey, Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, + IsFinalitySyncedKey, LatestHeaderKey, VerifiedHeaderKey, }, finality::{check_finality, ValidatorSet}, types::{BlockRange, GrandpaJustification}, @@ -98,9 +98,18 @@ impl SubscriptionLoop { match subscription { Subscription::Header(header) => { let received_at = Instant::now(); + let received_at_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_secs(); self.db.put(LatestHeaderKey, header.clone().number); info!("Header no.: {}", header.number); + self.db.put( + BlockHeaderReceivedAtKey(header.number), + received_at_timestamp, + ); + // if new validator set becomes active, replace the current one if self.block_data.next_valset.is_some() { self.block_data.current_valset = self.block_data.next_valset.take().unwrap(); @@ -163,6 +172,11 @@ impl SubscriptionLoop { let (header, received_at, valset) = self.block_data.unverified_headers.swap_remove(pos); + let received_at_timestamp = self + .db + .get(BlockHeaderReceivedAtKey(header.number)) + .expect("Block header timestamp is in the database"); + let is_final = check_finality(&valset, &justification); is_final.expect("Finality check failed"); @@ -216,6 +230,7 @@ impl SubscriptionLoop { .send(OutputEvent::HeaderUpdate { header, received_at, + received_at_timestamp, }) .unwrap(); } @@ -240,6 +255,7 @@ impl SubscriptionLoop { .send(OutputEvent::HeaderUpdate { header, received_at, + received_at_timestamp, }) .unwrap(); } else {