Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chain: change CBF header abstraction #25

Merged
merged 1 commit into from
Jun 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 59 additions & 30 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
db::traits::HeaderStore,
filters::{
cfheader_batch::CFHeaderBatch,
cfheader_chain::{AppendAttempt, CFHeaderChain, CFHeaderSyncResult},
cfheader_chain::{AppendAttempt, CFHeaderChain, QueuedCFHeader},
error::{CFHeaderSyncError, CFilterSyncError},
filter::Filter,
filter_chain::FilterChain,
Expand Down Expand Up @@ -142,6 +142,13 @@ impl Chain {
self.header_chain.header_at_height(height)
}

// The hash at the given height, potentially checking on disk
pub(crate) fn block_hash_at_height(&self, height: u32) -> Option<BlockHash> {
self.header_chain
.header_at_height(height)
.map(|header| header.block_hash())
}

// This header chain contains a block hash
pub(crate) fn contains_header(&self, header: &Header) -> bool {
self.header_chain.contains_header(header)
Expand Down Expand Up @@ -396,6 +403,7 @@ impl Chain {
self.dialog
.send_data(NodeMessage::BlocksDisconnected(reorged))
.await;
self.clear_compact_filter_queue();
self.clear_filter_headers().await;
self.clear_filters().await;
self.flush_over_height(stem).await;
Expand All @@ -413,6 +421,10 @@ impl Chain {
}
}

// We don't have a header in memory that we need to evaluate a fork.
// We check if we have it on disk, and load some more headers into memory.
// This call occurs if we sync to a block that is later reorganized out of the chain,
// but we have restarted our node in between these events.
async fn load_fork(&mut self, header_batch: &HeadersBatch) -> Result<(), HeaderSyncError> {
let mut db_lock = self.db.lock().await;
let prev_hash = header_batch.first().prev_blockhash;
Expand Down Expand Up @@ -448,9 +460,9 @@ impl Chain {
// Sync the compact filter headers, possibly encountering conflicts
pub(crate) async fn sync_cf_headers(
&mut self,
peer_id: u32,
_peer_id: u32,
cf_headers: CFHeaders,
) -> Result<CFHeaderSyncResult, CFHeaderSyncError> {
) -> Result<AppendAttempt, CFHeaderSyncError> {
let batch: CFHeaderBatch = cf_headers.into();
self.dialog
.chain_update(
Expand All @@ -469,46 +481,50 @@ impl Chain {
// A new block was mined and we ended up asking for this batch twice,
// or the quorum required is less than our connected peers.
if batch_last.eq(&prev_header) {
return Ok(CFHeaderSyncResult::AddedToQueue);
return Ok(AppendAttempt::AddedToQueue);
}
}
}
None => return Err(CFHeaderSyncError::EmptyMessage),
}
// Check for any obvious faults
self.audit_cf_headers(&batch).await?;
match self.cf_header_chain.append(peer_id, batch).await? {
AppendAttempt::AddedToQueue => Ok(CFHeaderSyncResult::AddedToQueue),
AppendAttempt::Extended => Ok(CFHeaderSyncResult::ReadyForNext),
AppendAttempt::Conflict(height) => match self.header_at_height(height) {
Some(header) => Ok(CFHeaderSyncResult::Dispute(header.block_hash())),
None => Err(CFHeaderSyncError::HeaderChainIndexOverflow),
},
// We already have a message like this. Verify they are the same
if self.cf_header_chain.has_queue() {
Ok(self.cf_header_chain.verify(batch).await)
} else {
// Associate the block hashes with the filter hashes and add them to the queue
let queue = self.construct_cf_header_queue(&batch).await?;
Ok(self.cf_header_chain.set_queue(queue).await)
}
}

/// Audit the validity of a batch of compact filter headers
async fn audit_cf_headers(&mut self, batch: &CFHeaderBatch) -> Result<(), CFHeaderSyncError> {
// Does this stop hash even exist in our chain
if !self.contains_hash(*batch.stop_hash()) {
return Err(CFHeaderSyncError::UnknownStophash);
// We need to associate the block hash with the incoming filter hashes
async fn construct_cf_header_queue(
&self,
batch: &CFHeaderBatch,
) -> Result<Vec<QueuedCFHeader>, CFHeaderSyncError> {
let mut queue = Vec::new();
let ref_height = self.cf_header_chain.height();
for (index, (filter_header, filter_hash)) in batch.inner().into_iter().enumerate() {
let block_hash = self
// This call may or may not retrieve the hash from disk
.block_hash_at_height(ref_height + index as u32 + 1)
.ok_or(CFHeaderSyncError::HeaderChainIndexOverflow)?;
queue.push(QueuedCFHeader::new(block_hash, filter_header, filter_hash))
}
Ok(queue)
}

// Audit the validity of a batch of compact filter headers
async fn audit_cf_headers(&mut self, batch: &CFHeaderBatch) -> Result<(), CFHeaderSyncError> {
// Does the filter header line up with our current chain of filter headers
if let Some(prev_header) = self.cf_header_chain.prev_header() {
if batch.prev_header().ne(&prev_header) {
return Err(CFHeaderSyncError::PrevHeaderMismatch);
}
}
// Did they send us the right amount of headers
let expected_stop_header =
self.header_at_height(self.cf_header_chain.height() + batch.len() as u32);
if let Some(stop_header) = expected_stop_header {
if stop_header.block_hash().ne(batch.stop_hash()) {
return Err(CFHeaderSyncError::StopHashMismatch);
}
} else {
return Err(CFHeaderSyncError::HeaderChainIndexOverflow);
}
// Did we request up to this stop hash
// Did we request up to this stop hash. We should have caught if this was a repeated message.
if let Some(prev_stophash) = self.cf_header_chain.last_stop_hash_request() {
if prev_stophash.ne(batch.stop_hash()) {
return Err(CFHeaderSyncError::StopHashMismatch);
Expand All @@ -517,6 +533,17 @@ impl Chain {
// If we never asked for a stophash before this was unsolitited
return Err(CFHeaderSyncError::UnexpectedCFHeaderMessage);
}
// Did they send us the right amount of headers
let expected_stop_header =
// This call may or may not retrieve the hash from disk
self.block_hash_at_height(self.cf_header_chain.height() + batch.len() as u32);
if let Some(stop_header) = expected_stop_header {
if stop_header.ne(batch.stop_hash()) {
return Err(CFHeaderSyncError::StopHashMismatch);
}
} else {
return Err(CFHeaderSyncError::HeaderChainIndexOverflow);
}
Ok(())
}

Expand All @@ -529,9 +556,6 @@ impl Chain {
self.tip()
};
self.cf_header_chain.set_last_stop_hash(stop_hash);
if self.is_cf_headers_synced() {
self.cf_header_chain.join(&self.header_chain.values()).await;
}
GetCFHeaders {
filter_type: 0x00,
start_height: self.cf_header_chain.height() + 1,
Expand Down Expand Up @@ -663,6 +687,11 @@ impl Chain {
}
}

// Reset the compact filter queue because we received a new block
pub(crate) fn clear_compact_filter_queue(&mut self) {
self.cf_header_chain.clear_queue();
}

// We found a reorg and some filters are no longer valid.
async fn clear_filter_headers(&mut self) {
self.cf_header_chain.clear_queue();
Expand Down
Loading
Loading