diff --git a/src/chain/chain.rs b/src/chain/chain.rs index ba62a00..103e2a9 100644 --- a/src/chain/chain.rs +++ b/src/chain/chain.rs @@ -35,6 +35,7 @@ use crate::{ }; const MAX_REORG_DEPTH: u32 = 5_000; +const REORG_LOOKBACK: u32 = 7; #[derive(Debug)] pub(crate) struct Chain { @@ -189,11 +190,28 @@ impl Chain { } // The "locators" are the headers we inform our peers we know about - pub(crate) fn locators(&mut self) -> Vec { + pub(crate) async fn locators(&mut self) -> Vec { + // If a peer is sending us a fork at this point they are faulty. if !self.checkpoints_complete() { vec![self.tip()] } else { - self.header_chain.locators() + // We should try to catch any reorgs if we are on a fresh start. + // The database may have a header that is useful to the remote node + // that is not currently in memory. + if self.header_chain.inner_len() < REORG_LOOKBACK as usize { + let older_locator = self.height().saturating_sub(REORG_LOOKBACK); + let mut db_lock = self.db.lock().await; + let hash = db_lock.hash_at(older_locator).await; + if let Ok(Some(locator)) = hash { + vec![self.tip(), locator] + } else { + // We couldn't find a header deep enough to send over. Just proceed as usual + self.header_chain.locators() + } + } else { + // We have enough headers in memory to catch a reorg. + self.header_chain.locators() + } } } @@ -231,7 +249,7 @@ impl Chain { pub(crate) async fn sync_chain(&mut self, message: Vec
) -> Result<(), HeaderSyncError> { let header_batch = HeadersBatch::new(message).map_err(|_| HeaderSyncError::EmptyMessage)?; // If our chain already has the last header in the message there is no new information - if self.contains_header(header_batch.last()) { + if self.contains_hash(header_batch.last().block_hash()) { return Ok(()); } let initially_syncing = !self.checkpoints.is_exhausted(); @@ -252,7 +270,7 @@ impl Chain { if !self.contains_hash(fork_start_hash) { self.load_fork(&header_batch).await?; } - // + // Check if the fork has more work. self.evaluate_fork(&header_batch).await?; } Ok(()) diff --git a/src/db/sqlite/headers.rs b/src/db/sqlite/headers.rs index 1e379ba..5be156e 100644 --- a/src/db/sqlite/headers.rs +++ b/src/db/sqlite/headers.rs @@ -183,4 +183,19 @@ impl HeaderStore for SqliteHeaderDb { .map_err(|_| DatabaseError::Load)?; Ok(row) } + + async fn hash_at(&mut self, height: u32) -> Result, DatabaseError> { + let write_lock = self.conn.lock().await; + let stmt = "SELECT block_hash FROM headers WHERE height = ?1"; + let row: Option = write_lock + .query_row(stmt, params![height], |row| row.get(0)) + .map_err(|_| DatabaseError::Load)?; + match row { + Some(row) => match BlockHash::from_str(&row) { + Ok(hash) => Ok(Some(hash)), + Err(_) => Err(DatabaseError::Deserialization), + }, + None => Ok(None), + } + } } diff --git a/src/db/traits.rs b/src/db/traits.rs index b5345b3..c865654 100644 --- a/src/db/traits.rs +++ b/src/db/traits.rs @@ -26,6 +26,9 @@ pub trait HeaderStore { /// Return the height of a block hash in the database, if it exists. async fn height_of<'a>(&mut self, hash: &'a BlockHash) -> Result, DatabaseError>; + + /// Return the hash at the height in the database, if it exists. + async fn hash_at(&mut self, height: u32) -> Result, DatabaseError>; } /// This is a simple wrapper for the unit type, signifying that no headers will be stored between sessions. @@ -56,6 +59,10 @@ impl HeaderStore for () { ) -> Result, DatabaseError> { Ok(None) } + + async fn hash_at(&mut self, _height: u32) -> Result, DatabaseError> { + Ok(None) + } } /// Methods that define a list of peers on the Bitcoin P2P network. diff --git a/src/node/node.rs b/src/node/node.rs index bec5b84..82f00e4 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -108,7 +108,7 @@ impl Node { let checkpoint = header_checkpoint.unwrap_or_else(|| checkpoints.last()); checkpoints.prune_up_to(checkpoint); // A structured way to talk to the client - let mut dialog = Dialog::new(ntx); + let dialog = Dialog::new(ntx); // Build the chain let loaded_chain = Chain::new( &network, @@ -121,12 +121,8 @@ impl Node { ) .await .map_err(NodeError::HeaderDatabase)?; - // Initialize the height of the chain - let best_known_height = loaded_chain.height(); + // Initialize the chain with the headers we loaded let chain = Arc::new(Mutex::new(loaded_chain)); - dialog - .send_dialog(format!("Starting sync from block {}", best_known_height)) - .await; Ok(( Self { state, @@ -410,7 +406,7 @@ impl Node { } // Even if we start the node as caught up in terms of height, we need to check for reorgs. So we can send this unconditionally. let next_headers = GetHeaderConfig { - locators: chain.locators(), + locators: chain.locators().await, stop_hash: None, }; MainThreadMessage::GetHeaders(next_headers) @@ -474,7 +470,7 @@ impl Node { } if !chain.is_synced() { let next_headers = GetHeaderConfig { - locators: chain.locators(), + locators: chain.locators().await, stop_hash: None, }; return Some(MainThreadMessage::GetHeaders(next_headers)); @@ -598,7 +594,7 @@ impl Node { *state = NodeState::Behind; let mut chain = self.chain.lock().await; let next_headers = GetHeaderConfig { - locators: chain.locators(), + locators: chain.locators().await, stop_hash: None, }; if chain.height().le(&new_height) { diff --git a/tests/node.rs b/tests/node.rs index d040c38..f925087 100644 --- a/tests/node.rs +++ b/tests/node.rs @@ -439,7 +439,6 @@ async fn test_two_deep_reorg() { // This test requires a clean Bitcoin Core regtest instance or unchange headers from Bitcoin Core since the last test. #[tokio::test] -#[ignore = "broken"] async fn test_sql_stale_anchor() { let rpc = bitcoincore_rpc::Client::new( HOST, @@ -447,8 +446,9 @@ async fn test_sql_stale_anchor() { ) .unwrap(); // Do a call that will only fail if we are not connected to RPC. - if let Err(_) = rpc.get_best_block_hash() { - println!("Bitcoin Core is not running. Skipping this test..."); + if let Err(_) = rpc.get_new_address(None, None) { + println!("There is no wallet loaded. Have you ran `mine.sh`?"); + return; } // Get an address and the tip of the chain. let miner = rpc.get_new_address(None, None).unwrap().assume_checked(); @@ -507,12 +507,44 @@ async fn test_sql_stale_anchor() { } } client.shutdown().await.unwrap(); - // Mine more blocks + // Don't do anything, but reload the node from the checkpoint + let cp = rpc.get_best_block_hash().unwrap(); + let old_height = rpc.get_block_count().unwrap(); + let best = rpc.get_best_block_hash().unwrap(); + // Make sure the node does not have any corrupted headers + let (mut node, mut client) = new_node_anchor_sql( + scripts.clone(), + HeaderCheckpoint::new(old_height as u32, cp), + ) + .await; + tokio::task::spawn(async move { node.run().await }); + let (_, mut recv) = client.split(); + // The node properly syncs after persisting a reorg + while let Ok(message) = recv.recv().await { + match message { + kyoto::node::messages::NodeMessage::Dialog(d) => println!("{d}"), + kyoto::node::messages::NodeMessage::Warning(e) => println!("{e}"), + kyoto::node::messages::NodeMessage::Synced(update) => { + println!("Done"); + assert_eq!(update.tip().hash, best); + break; + } + _ => {} + } + } + client.shutdown().await.unwrap(); + // Mine more blocks and reload from the checkpoint + let cp = rpc.get_best_block_hash().unwrap(); + let old_height = rpc.get_block_count().unwrap(); rpc.generate_to_address(2, &miner).unwrap(); tokio::time::sleep(Duration::from_secs(2)).await; let best = rpc.get_best_block_hash().unwrap(); // Make sure the node does not have any corrupted headers - let (mut node, mut client) = new_node_sql(scripts.clone()).await; + let (mut node, mut client) = new_node_anchor_sql( + scripts.clone(), + HeaderCheckpoint::new(old_height as u32, cp), + ) + .await; tokio::task::spawn(async move { node.run().await }); let (_, mut recv) = client.split(); // The node properly syncs after persisting a reorg