Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp to v2 header API #772

Merged
merged 10 commits into from
Jan 21, 2025
1 change: 1 addition & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.2.0

- Add timestamp to v2 header API
- Increase maximum kademlia record size to allow row with 512 cells
- Fix issue with multiple telemetry gauge callbacks

Expand Down
19 changes: 13 additions & 6 deletions core/src/api/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ impl Reply for Block {
}
}

impl TryFrom<AvailHeader> for HeaderMessage {
impl TryFrom<(AvailHeader, u64)> for HeaderMessage {
type Error = Report;

fn try_from(header: AvailHeader) -> Result<Self, Self::Error> {
let header: Header = header.try_into()?;
fn try_from((header, received_at): (AvailHeader, u64)) -> Result<Self, Self::Error> {
let header: Header = (header, received_at).try_into()?;
Ok(Self {
block_number: header.number,
header,
Expand All @@ -345,6 +345,7 @@ pub struct Header {
extrinsics_root: H256,
extension: Extension,
digest: Digest,
received_at: u64,
}

impl Reply for Header {
Expand Down Expand Up @@ -401,10 +402,10 @@ struct Extension {
app_lookup: CompactDataLookup,
}

impl TryFrom<AvailHeader> for Header {
impl TryFrom<(AvailHeader, u64)> for Header {
type Error = Report;

fn try_from(header: AvailHeader) -> Result<Self> {
fn try_from((header, received_at): (AvailHeader, u64)) -> Result<Self> {
Ok(Header {
hash: Encode::using_encoded(&header, blake2_256).into(),
parent_hash: header.parent_hash,
Expand All @@ -413,6 +414,7 @@ impl TryFrom<AvailHeader> for Header {
extrinsics_root: header.extrinsics_root,
extension: header.extension.try_into()?,
digest: header.digest.try_into()?,
received_at,
})
}
}
Expand Down Expand Up @@ -483,7 +485,11 @@ impl TryFrom<RpcEvent> for Option<PublishMessage> {

fn try_from(value: RpcEvent) -> Result<Self, Self::Error> {
match value {
RpcEvent::HeaderUpdate { header, .. } => header
RpcEvent::HeaderUpdate {
header,
received_at: _,
received_at_timestamp,
} => (header, received_at_timestamp)
.try_into()
.map(Box::new)
.map(PublishMessage::HeaderVerified)
Expand Down Expand Up @@ -903,6 +909,7 @@ mod tests {
digest: Digest {
logs: vec![DigestItem::RuntimeEnvironmentUpdated],
},
received_at: 0,
},
}))
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/api/v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ Content-Type: application/json
"{log}", ...
]
},
"received_at": "{received-at-timestamp}"
}
```

Expand Down Expand Up @@ -595,6 +596,7 @@ When header verification is finished, the message is pushed to the light client
]
}
}
"received_at": "{received-at-timestamp}",
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions core/src/api/v2/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use crate::{
Subscription, SubscriptionId, Transaction, Version, WsClients,
},
},
data::{AppDataKey, BlockHeaderKey, Database, RpcNodeKey, VerifiedCellCountKey},
data::{
AppDataKey, BlockHeaderKey, BlockHeaderReceivedAtKey, Database, RpcNodeKey,
VerifiedCellCountKey,
},
utils::calculate_confidence,
};

Expand Down Expand Up @@ -109,6 +112,10 @@ pub async fn block_header(
.and_then(|extension| block_status(sync_start_block, db.clone(), block_number, extension))
.ok_or(Error::not_found())?;

let received_at = db
.get(BlockHeaderReceivedAtKey(block_number))
.ok_or_else(Error::not_found)?;

if matches!(
block_status,
BlockStatus::Unavailable | BlockStatus::Pending | BlockStatus::VerifyingHeader
Expand All @@ -118,7 +125,7 @@ pub async fn block_header(

db.get(BlockHeaderKey(block_number))
.ok_or_else(|| eyre!("Header not found"))
.and_then(|header| header.try_into())
.and_then(|header| (header, received_at).try_into())
.map_err(Error::internal_server_error)
}

Expand Down
10 changes: 7 additions & 3 deletions core/src/api/v2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,9 @@ mod tests {
},
data::{
self, AchievedConfidenceKey, AchievedSyncConfidenceKey, AppDataKey, BlockHeaderKey,
Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey, MemoryDB, RpcNodeKey,
VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey, VerifiedSyncDataKey,
BlockHeaderReceivedAtKey, Database, IsSyncedKey, LatestHeaderKey, LatestSyncKey,
MemoryDB, RpcNodeKey, VerifiedCellCountKey, VerifiedDataKey, VerifiedHeaderKey,
VerifiedSyncDataKey,
},
network::rpc::Node,
types::BlockRange,
Expand All @@ -236,6 +237,7 @@ mod tests {
};
use hyper::StatusCode;
use std::{collections::HashSet, str::FromStr, sync::Arc};

use test_case::test_case;
use uuid::Uuid;

Expand Down Expand Up @@ -405,6 +407,7 @@ mod tests {
db.put(VerifiedHeaderKey, BlockRange::init(9));
db.put(LatestSyncKey, 5);
db.put(BlockHeaderKey(block_number), header());
db.put(BlockHeaderReceivedAtKey(block_number), 1737039274);
let route = super::block_header_route(config, db);
let response = warp::test::request()
.method("GET")
Expand Down Expand Up @@ -473,6 +476,7 @@ mod tests {
db.put(LatestHeaderKey, 1);
db.put(VerifiedHeaderKey, BlockRange::init(1));
db.put(BlockHeaderKey(1), header());
db.put(BlockHeaderReceivedAtKey(1), 1737039274);
let route = super::block_header_route(config, db);
let response = warp::test::request()
.method("GET")
Expand All @@ -481,7 +485,7 @@ mod tests {
.await;
assert_eq!(
response.body(),
r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]}}"#
r#"{"hash":"0xadf25a1a5d969bb9c9bb9b2e95fe74b0093f0a49ac61e96a1cf41783127f9d1b","parent_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","number":1,"state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extrinsics_root":"0x0000000000000000000000000000000000000000000000000000000000000000","extension":{"rows":0,"cols":0,"data_root":"0x0000000000000000000000000000000000000000000000000000000000000000","commitments":[],"app_lookup":{"size":1,"index":[]}},"digest":{"logs":[]},"received_at":1737039274}"#
);
}

Expand Down
1 change: 1 addition & 0 deletions core/src/crawl_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ pub async fn run(
while let Ok(rpc::OutputEvent::HeaderUpdate {
header,
received_at,
..
}) = message_rx.recv().await
{
let block = match types::BlockVerified::try_from((header, None)) {
Expand Down
15 changes: 15 additions & 0 deletions core/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,18 @@ impl RecordKey for SignerNonceKey {
SIGNER_NONCE.into()
}
}

pub struct BlockHeaderReceivedAtKey(pub u32);

impl RecordKey for BlockHeaderReceivedAtKey {
type Type = u64;

fn space(&self) -> Option<&'static str> {
Some(APP_STATE_CF)
}

fn key(&self) -> String {
let BlockHeaderReceivedAtKey(block_num) = self;
format!("{BLOCK_HEADER_RECEIVED_AT}:{block_num}")
}
}
2 changes: 2 additions & 0 deletions core/src/data/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ pub const CLIENT_ID_KEY: &str = "client_id";
pub const P2P_KEYPAIR_KEY: &str = "p2p_keypair";
/// Key for storing signer nonce
pub const SIGNER_NONCE: &str = "signer_nonce";
/// Key for storing block header received at timestamp
pub const BLOCK_HEADER_RECEIVED_AT: &str = "block_header_received_at";
1 change: 1 addition & 0 deletions core/src/fat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ pub async fn run(
RpcEvent::HeaderUpdate {
header,
received_at,
..
} => (header, received_at),
// skip ConnectedHost event
RpcEvent::ConnectedHost(_) => continue,
Expand Down
1 change: 1 addition & 0 deletions core/src/light_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ pub async fn run(
RpcEvent::HeaderUpdate {
header,
received_at,
..
} => (header, received_at),
// skip ConnectedHost event
RpcEvent::ConnectedHost(_) => continue,
Expand Down
1 change: 1 addition & 0 deletions core/src/network/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum OutputEvent {
HeaderUpdate {
header: AvailHeader,
received_at: Instant,
received_at_timestamp: u64,
},
}

Expand Down
24 changes: 20 additions & 4 deletions core/src/network/rpc/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ use avail_rust::{
use codec::Encode;
use color_eyre::{eyre::eyre, Result};
#[cfg(not(target_arch = "wasm32"))]
use std::time::Instant;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast::Sender;
use tokio_stream::StreamExt;
use tracing::{debug, info, trace};
#[cfg(target_arch = "wasm32")]
use web_time::Instant;
use web_time::{Instant, SystemTime, UNIX_EPOCH};

use super::{Client, OutputEvent, Subscription};
use crate::{
data::{
Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey, IsFinalitySyncedKey,
LatestHeaderKey, VerifiedHeaderKey,
BlockHeaderReceivedAtKey, Database, FinalitySyncCheckpoint, FinalitySyncCheckpointKey,
IsFinalitySyncedKey, LatestHeaderKey, VerifiedHeaderKey,
},
finality::{check_finality, ValidatorSet},
types::{BlockRange, GrandpaJustification},
Expand Down Expand Up @@ -98,9 +98,18 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
match subscription {
Subscription::Header(header) => {
let received_at = Instant::now();
let received_at_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_secs();
self.db.put(LatestHeaderKey, header.clone().number);
info!("Header no.: {}", header.number);

self.db.put(
BlockHeaderReceivedAtKey(header.number),
received_at_timestamp,
);

// if new validator set becomes active, replace the current one
if self.block_data.next_valset.is_some() {
self.block_data.current_valset = self.block_data.next_valset.take().unwrap();
Expand Down Expand Up @@ -163,6 +172,11 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
let (header, received_at, valset) =
self.block_data.unverified_headers.swap_remove(pos);

let received_at_timestamp = self
.db
.get(BlockHeaderReceivedAtKey(header.number))
.expect("Block header timestamp is in the database");

let is_final = check_finality(&valset, &justification);

is_final.expect("Finality check failed");
Expand Down Expand Up @@ -216,6 +230,7 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
.send(OutputEvent::HeaderUpdate {
header,
received_at,
received_at_timestamp,
})
.unwrap();
}
Expand All @@ -240,6 +255,7 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
.send(OutputEvent::HeaderUpdate {
header,
received_at,
received_at_timestamp,
})
.unwrap();
} else {
Expand Down
Loading