Skip to content

Commit

Permalink
Merge pull request #45 from mempool/junderw/perf-precache-speed
Browse files Browse the repository at this point in the history
Increase performance for precache operation
  • Loading branch information
wiz authored Oct 3, 2023
2 parents e663693 + d49f752 commit 18e041c
Show file tree
Hide file tree
Showing 12 changed files with 223 additions and 11,591 deletions.
11,365 changes: 0 additions & 11,365 deletions contrib/popular-scripts.txt

This file was deleted.

33 changes: 0 additions & 33 deletions electrs-start-liquid

This file was deleted.

33 changes: 0 additions & 33 deletions electrs-start-liquidtestnet

This file was deleted.

44 changes: 0 additions & 44 deletions electrs-start-mainnet

This file was deleted.

45 changes: 0 additions & 45 deletions electrs-start-signet

This file was deleted.

45 changes: 0 additions & 45 deletions electrs-start-testnet

This file was deleted.

16 changes: 10 additions & 6 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,6 @@ fn run_server(config: Arc<Config>) -> Result<()> {
&metrics,
));

if let Some(ref precache_file) = config.precache_scripts {
let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string())
.expect("cannot load scripts to precache");
precache::precache(&chain, precache_scripthashes);
}

let mempool = Arc::new(RwLock::new(Mempool::new(
Arc::clone(&chain),
&metrics,
Expand Down Expand Up @@ -102,6 +96,16 @@ fn run_server(config: Arc<Config>) -> Result<()> {
let rest_server = rest::start(Arc::clone(&config), Arc::clone(&query));
let electrum_server = ElectrumRPC::start(Arc::clone(&config), Arc::clone(&query), &metrics);

if let Some(ref precache_file) = config.precache_scripts {
let precache_scripthashes = precache::scripthashes_from_file(precache_file.to_string())
.expect("cannot load scripts to precache");
precache::precache(
Arc::clone(&chain),
precache_scripthashes,
config.precache_threads,
);
}

loop {
if let Err(err) = signal.wait(Duration::from_millis(500), true) {
info!("stopping server: {}", err);
Expand Down
23 changes: 23 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub struct Config {
pub index_unspendables: bool,
pub cors: Option<String>,
pub precache_scripts: Option<String>,
pub precache_threads: usize,
pub utxos_limit: usize,
pub electrum_txs_limit: usize,
pub electrum_banner: String,
Expand Down Expand Up @@ -188,6 +189,12 @@ impl Config {
.help("Path to file with list of scripts to pre-cache")
.takes_value(true)
)
.arg(
Arg::with_name("precache_threads")
.long("precache-threads")
.help("Non-zero number of threads to use for precache threadpool. [default: 4 * CORE_COUNT]")
.takes_value(true)
)
.arg(
Arg::with_name("utxos_limit")
.long("utxos-limit")
Expand Down Expand Up @@ -483,6 +490,22 @@ impl Config {
index_unspendables: m.is_present("index_unspendables"),
cors: m.value_of("cors").map(|s| s.to_string()),
precache_scripts: m.value_of("precache_scripts").map(|s| s.to_string()),
precache_threads: m.value_of("precache_threads").map_or_else(
|| {
std::thread::available_parallelism()
.expect("Can't get core count")
.get()
* 4
},
|s| match s.parse::<usize>() {
Ok(v) if v > 0 => v,
_ => clap::Error::value_validation_auto(format!(
"The argument '{}' isn't a valid value",
s
))
.exit(),
},
),

#[cfg(feature = "liquid")]
parent_network,
Expand Down
53 changes: 39 additions & 14 deletions src/new_index/precache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,52 @@ use hex;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Instant;

pub fn precache(chain: &ChainQuery, scripthashes: Vec<FullHash>) {
pub fn precache(chain: Arc<ChainQuery>, scripthashes: Vec<FullHash>, threads: usize) {
let total = scripthashes.len();
info!("Pre-caching stats and utxo set for {} scripthashes", total);
info!(
"Pre-caching stats and utxo set on {} threads for {} scripthashes",
threads, total
);

let pool = rayon::ThreadPoolBuilder::new()
.num_threads(16)
.num_threads(threads)
.thread_name(|i| format!("precache-{}", i))
.build()
.unwrap();
pool.install(|| {
scripthashes
.par_iter()
.enumerate()
.for_each(|(i, scripthash)| {
if i % 5 == 0 {
info!("running pre-cache for scripthash {}/{}", i + 1, total);
}
chain.stats(&scripthash[..]);
//chain.utxo(&scripthash[..]);
})
let now = Instant::now();
let counter = AtomicUsize::new(0);
std::thread::spawn(move || {
pool.install(|| {
scripthashes
.par_iter()
.for_each(|scripthash| {
// First, cache
chain.stats(&scripthash[..], crate::new_index::db::DBFlush::Disable);
let _ = chain.utxo(&scripthash[..], usize::MAX, crate::new_index::db::DBFlush::Disable);

// Then, increment the counter
let pre_increment = counter.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let post_increment_counter = pre_increment + 1;

// Then, log
if post_increment_counter % 500 == 0 {
let now_millis = now.elapsed().as_millis();
info!("{post_increment_counter}/{total} Processed in {now_millis} ms running pre-cache for scripthash");
}

// Every 10k counts, flush the DB to disk
if post_increment_counter % 10000 == 0 {
info!("Flushing cache_db... {post_increment_counter}");
chain.store().cache_db().flush();
info!("Done Flushing cache_db!!! {post_increment_counter}");
}
})
});
// After everything is done, flush the cache
chain.store().cache_db().flush();
});
}

Expand Down
8 changes: 6 additions & 2 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ impl Query {
}

pub fn utxo(&self, scripthash: &[u8]) -> Result<Vec<Utxo>> {
let mut utxos = self.chain.utxo(scripthash, self.config.utxos_limit)?;
let mut utxos = self.chain.utxo(
scripthash,
self.config.utxos_limit,
super::db::DBFlush::Enable,
)?;
let mempool = self.mempool();
utxos.retain(|utxo| !mempool.has_spend(&OutPoint::from(utxo)));
utxos.extend(mempool.utxo(scripthash));
Expand All @@ -111,7 +115,7 @@ impl Query {

pub fn stats(&self, scripthash: &[u8]) -> (ScriptStats, ScriptStats) {
(
self.chain.stats(scripthash),
self.chain.stats(scripthash, super::db::DBFlush::Enable),
self.mempool().stats(scripthash),
)
}
Expand Down
Loading

0 comments on commit 18e041c

Please sign in to comment.