Skip to content

Commit

Permalink
Merge pull request #22 from rustaceanrob/fix-locators
Browse files Browse the repository at this point in the history
node: fetch locator from db on start
  • Loading branch information
rustaceanrob authored Jun 27, 2024
2 parents a522f83 + 07f616b commit a375f9b
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 18 deletions.
26 changes: 22 additions & 4 deletions src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::{
};

const MAX_REORG_DEPTH: u32 = 5_000;
const REORG_LOOKBACK: u32 = 7;

#[derive(Debug)]
pub(crate) struct Chain {
Expand Down Expand Up @@ -189,11 +190,28 @@ impl Chain {
}

// The "locators" are the headers we inform our peers we know about
pub(crate) fn locators(&mut self) -> Vec<BlockHash> {
pub(crate) async fn locators(&mut self) -> Vec<BlockHash> {
// If a peer is sending us a fork at this point they are faulty.
if !self.checkpoints_complete() {
vec![self.tip()]
} else {
self.header_chain.locators()
// We should try to catch any reorgs if we are on a fresh start.
// The database may have a header that is useful to the remote node
// that is not currently in memory.
if self.header_chain.inner_len() < REORG_LOOKBACK as usize {
let older_locator = self.height().saturating_sub(REORG_LOOKBACK);
let mut db_lock = self.db.lock().await;
let hash = db_lock.hash_at(older_locator).await;
if let Ok(Some(locator)) = hash {
vec![self.tip(), locator]
} else {
// We couldn't find a header deep enough to send over. Just proceed as usual
self.header_chain.locators()
}
} else {
// We have enough headers in memory to catch a reorg.
self.header_chain.locators()
}
}
}

Expand Down Expand Up @@ -231,7 +249,7 @@ impl Chain {
pub(crate) async fn sync_chain(&mut self, message: Vec<Header>) -> Result<(), HeaderSyncError> {
let header_batch = HeadersBatch::new(message).map_err(|_| HeaderSyncError::EmptyMessage)?;
// If our chain already has the last header in the message there is no new information
if self.contains_header(header_batch.last()) {
if self.contains_hash(header_batch.last().block_hash()) {
return Ok(());
}
let initially_syncing = !self.checkpoints.is_exhausted();
Expand All @@ -252,7 +270,7 @@ impl Chain {
if !self.contains_hash(fork_start_hash) {
self.load_fork(&header_batch).await?;
}
//
// Check if the fork has more work.
self.evaluate_fork(&header_batch).await?;
}
Ok(())
Expand Down
15 changes: 15 additions & 0 deletions src/db/sqlite/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,19 @@ impl HeaderStore for SqliteHeaderDb {
.map_err(|_| DatabaseError::Load)?;
Ok(row)
}

async fn hash_at(&mut self, height: u32) -> Result<Option<BlockHash>, DatabaseError> {
let write_lock = self.conn.lock().await;
let stmt = "SELECT block_hash FROM headers WHERE height = ?1";
let row: Option<String> = write_lock
.query_row(stmt, params![height], |row| row.get(0))
.map_err(|_| DatabaseError::Load)?;
match row {
Some(row) => match BlockHash::from_str(&row) {
Ok(hash) => Ok(Some(hash)),
Err(_) => Err(DatabaseError::Deserialization),
},
None => Ok(None),
}
}
}
7 changes: 7 additions & 0 deletions src/db/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ pub trait HeaderStore {

/// Return the height of a block hash in the database, if it exists.
async fn height_of<'a>(&mut self, hash: &'a BlockHash) -> Result<Option<u32>, DatabaseError>;

/// Return the hash at the height in the database, if it exists.
async fn hash_at(&mut self, height: u32) -> Result<Option<BlockHash>, DatabaseError>;
}

/// This is a simple wrapper for the unit type, signifying that no headers will be stored between sessions.
Expand Down Expand Up @@ -56,6 +59,10 @@ impl HeaderStore for () {
) -> Result<Option<u32>, DatabaseError> {
Ok(None)
}

async fn hash_at(&mut self, _height: u32) -> Result<Option<BlockHash>, DatabaseError> {
Ok(None)
}
}

/// Methods that define a list of peers on the Bitcoin P2P network.
Expand Down
14 changes: 5 additions & 9 deletions src/node/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Node {
let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last());
checkpoints.prune_up_to(checkpoint);
// A structured way to talk to the client
let mut dialog = Dialog::new(ntx);
let dialog = Dialog::new(ntx);
// Build the chain
let loaded_chain = Chain::new(
&network,
Expand All @@ -121,12 +121,8 @@ impl Node {
)
.await
.map_err(NodeError::HeaderDatabase)?;
// Initialize the height of the chain
let best_known_height = loaded_chain.height();
// Initialize the chain with the headers we loaded
let chain = Arc::new(Mutex::new(loaded_chain));
dialog
.send_dialog(format!("Starting sync from block {}", best_known_height))
.await;
Ok((
Self {
state,
Expand Down Expand Up @@ -410,7 +406,7 @@ impl Node {
}
// Even if we start the node as caught up in terms of height, we need to check for reorgs. So we can send this unconditionally.
let next_headers = GetHeaderConfig {
locators: chain.locators(),
locators: chain.locators().await,
stop_hash: None,
};
MainThreadMessage::GetHeaders(next_headers)
Expand Down Expand Up @@ -474,7 +470,7 @@ impl Node {
}
if !chain.is_synced() {
let next_headers = GetHeaderConfig {
locators: chain.locators(),
locators: chain.locators().await,
stop_hash: None,
};
return Some(MainThreadMessage::GetHeaders(next_headers));
Expand Down Expand Up @@ -598,7 +594,7 @@ impl Node {
*state = NodeState::Behind;
let mut chain = self.chain.lock().await;
let next_headers = GetHeaderConfig {
locators: chain.locators(),
locators: chain.locators().await,
stop_hash: None,
};
if chain.height().le(&new_height) {
Expand Down
42 changes: 37 additions & 5 deletions tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,16 +439,16 @@ async fn test_two_deep_reorg() {

// This test requires a clean Bitcoin Core regtest instance or unchange headers from Bitcoin Core since the last test.
#[tokio::test]
#[ignore = "broken"]
async fn test_sql_stale_anchor() {
let rpc = bitcoincore_rpc::Client::new(
HOST,
bitcoincore_rpc::Auth::UserPass(RPC_USER.into(), RPC_PASSWORD.into()),
)
.unwrap();
// Do a call that will only fail if we are not connected to RPC.
if let Err(_) = rpc.get_best_block_hash() {
println!("Bitcoin Core is not running. Skipping this test...");
if let Err(_) = rpc.get_new_address(None, None) {
println!("There is no wallet loaded. Have you ran `mine.sh`?");
return;
}
// Get an address and the tip of the chain.
let miner = rpc.get_new_address(None, None).unwrap().assume_checked();
Expand Down Expand Up @@ -507,12 +507,44 @@ async fn test_sql_stale_anchor() {
}
}
client.shutdown().await.unwrap();
// Mine more blocks
// Don't do anything, but reload the node from the checkpoint
let cp = rpc.get_best_block_hash().unwrap();
let old_height = rpc.get_block_count().unwrap();
let best = rpc.get_best_block_hash().unwrap();
// Make sure the node does not have any corrupted headers
let (mut node, mut client) = new_node_anchor_sql(
scripts.clone(),
HeaderCheckpoint::new(old_height as u32, cp),
)
.await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
// The node properly syncs after persisting a reorg
while let Ok(message) = recv.recv().await {
match message {
kyoto::node::messages::NodeMessage::Dialog(d) => println!("{d}"),
kyoto::node::messages::NodeMessage::Warning(e) => println!("{e}"),
kyoto::node::messages::NodeMessage::Synced(update) => {
println!("Done");
assert_eq!(update.tip().hash, best);
break;
}
_ => {}
}
}
client.shutdown().await.unwrap();
// Mine more blocks and reload from the checkpoint
let cp = rpc.get_best_block_hash().unwrap();
let old_height = rpc.get_block_count().unwrap();
rpc.generate_to_address(2, &miner).unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let best = rpc.get_best_block_hash().unwrap();
// Make sure the node does not have any corrupted headers
let (mut node, mut client) = new_node_sql(scripts.clone()).await;
let (mut node, mut client) = new_node_anchor_sql(
scripts.clone(),
HeaderCheckpoint::new(old_height as u32, cp),
)
.await;
tokio::task::spawn(async move { node.run().await });
let (_, mut recv) = client.split();
// The node properly syncs after persisting a reorg
Expand Down

0 comments on commit a375f9b

Please sign in to comment.