Skip to content

Commit

Permalink
Fix logging and refactor reorg function
Browse files Browse the repository at this point in the history
  • Loading branch information
junderw committed Dec 15, 2024
1 parent d0a2a94 commit 737d725
Showing 1 changed file with 53 additions and 29 deletions.
82 changes: 53 additions & 29 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ pub enum Operation {
DeleteBlocksWithHistory(crossbeam_channel::Sender<[u8; 32]>),
}

impl std::fmt::Display for Operation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Operation::AddBlocks => "Adding",
Operation::DeleteBlocks | Operation::DeleteBlocksWithHistory(_) => "Deleting",
})
}
}

// TODO: &[Block] should be an iterator / a queue.
impl Indexer {
pub fn open(store: Arc<Store>, from: FetchFrom, config: &Config, metrics: &Metrics) -> Self {
Expand Down Expand Up @@ -275,6 +284,40 @@ impl Indexer {
Ok(result)
}

fn reorg(&self, reorged: Vec<HeaderEntry>, daemon: &Daemon) -> Result<()> {
if reorged.len() > 10 {
warn!(
"reorg of over 10 blocks ({}) detected! Wonky stuff might happen!",
reorged.len()
);
}
// This channel holds a Vec of [u8; 32] scripts found in the blocks (with duplicates)
// if we reorg the whole mainnet chain it should come out to about 145 GB of memory.
let (tx, rx) = crossbeam_channel::unbounded();
// Delete txstore
start_fetcher(self.from, daemon, reorged.clone())?
.map(|blocks| self.add(&blocks, Operation::DeleteBlocks));
// Delete history_db
start_fetcher(self.from, daemon, reorged)?
.map(|blocks| self.index(&blocks, Operation::DeleteBlocksWithHistory(tx.clone())));
// All senders must be dropped for receiver iterator to finish
drop(tx);

// All senders are dropped by now, so the receiver will iterate until the
// end of the unbounded queue.
let scripts = rx.into_iter().collect::<HashSet<_>>();
for script in scripts {
// cancel the script cache DB for these scripts. They might get incorrect data mixed in.
self.store.cache_db.delete(vec![
StatsCacheRow::key(&script),
UtxoCacheRow::key(&script),
#[cfg(feature = "liquid")]
[b"z", &script[..]].concat(), // asset cache key
]);
}
Ok(())
}

pub fn update(&mut self, daemon: &Daemon) -> Result<BlockHash> {
let daemon = daemon.reconnect()?;
let tip = daemon.getbestblockhash()?;
Expand All @@ -289,33 +332,9 @@ impl Indexer {
drop(headers);

if !reorged.is_empty() {
if reorged.len() > 10 {
warn!("reorg of over 10 blocks detected! Wonky stuff might happen!");
}
let (tx, rx) = crossbeam_channel::unbounded();
// Delete txstore
start_fetcher(self.from, &daemon, reorged.clone())?
.map(|blocks| self.add(&blocks, Operation::DeleteBlocks));
// Delete history_db
start_fetcher(self.from, &daemon, reorged)?.map(|blocks| {
self.index(&blocks, Operation::DeleteBlocksWithHistory(tx.clone()))
});
// Needed to clone in order to pass into the closure otherwise
drop(tx);

// All senders are dropped by now, so the receiver will iterate until the
// end of the unbounded queue.
let scripts = rx.into_iter().collect::<HashSet<_>>();
for script in scripts {
// cancel the script cache DB for these scripts. They might get incorrect data mixed in.
self.store.cache_db.delete(vec![
StatsCacheRow::key(&script),
UtxoCacheRow::key(&script),
#[cfg(feature = "liquid")]
[b"z", &script[..]].concat(), // asset cache key
]);
}
self.reorg(reorged, &daemon)?;
}

headers_len
};

Expand Down Expand Up @@ -360,14 +379,19 @@ impl Indexer {
}

fn add(&self, blocks: &[BlockEntry], op: Operation) {
debug!("Adding {} blocks to Indexer", blocks.len());
debug!("{} {} blocks to Indexer", op, blocks.len());
let write_label = match &op {
Operation::AddBlocks => "add_write",
_ => "delete_write",
};

// TODO: skip orphaned blocks?
let rows = {
let _timer = self.start_timer("add_process");
add_blocks(blocks, &self.iconfig)
};
{
let _timer = self.start_timer("add_write");
let _timer = self.start_timer(write_label);
if let Operation::AddBlocks = op {
self.store.txstore_db.write(rows, self.flush);
} else {
Expand Down Expand Up @@ -397,7 +421,7 @@ impl Indexer {
}

fn index(&self, blocks: &[BlockEntry], op: Operation) {
debug!("Indexing {} blocks with Indexer", blocks.len());
debug!("Indexing ({}) {} blocks with Indexer", op, blocks.len());
let previous_txos_map = {
let _timer = self.start_timer("index_lookup");
lookup_txos(&self.store.txstore_db, &get_previous_txos(blocks), false)
Expand Down

0 comments on commit 737d725

Please sign in to comment.