From 09725f4300285166f268fb7da6c6dd0340160243 Mon Sep 17 00:00:00 2001 From: gonzalezzfelipe Date: Wed, 19 Feb 2025 14:40:06 -0300 Subject: [PATCH] feat: Chain --- src/bin/dolos/bootstrap/mithril.rs | 2 +- src/bin/dolos/common.rs | 19 +- src/bin/dolos/daemon.rs | 4 +- src/bin/dolos/data/copy_wal.rs | 2 +- src/bin/dolos/data/dump_wal.rs | 2 +- src/bin/dolos/data/export.rs | 29 +- src/bin/dolos/data/prune_wal.rs | 2 +- src/bin/dolos/data/summary.rs | 2 +- src/bin/dolos/doctor/mod.rs | 12 +- .../doctor/{rebuild_ledger.rs => rebuild.rs} | 36 +- src/bin/dolos/doctor/wal_integrity.rs | 2 +- src/bin/dolos/eval.rs | 2 +- src/bin/dolos/serve.rs | 4 +- src/bin/dolos/sync.rs | 3 +- src/chain/mod.rs | 168 ++++++++ src/chain/redb/indexes.rs | 374 ++++++++++++++++++ src/chain/redb/mod.rs | 213 ++++++++++ src/chain/redb/tables.rs | 53 +++ src/chain/redb/v1.rs | 142 +++++++ src/ledger/mod.rs | 37 +- src/lib.rs | 1 + src/model.rs | 4 + src/serve/grpc/mod.rs | 4 +- src/serve/grpc/sync.rs | 38 +- src/serve/mod.rs | 3 + src/state/mod.rs | 35 +- src/state/redb/mod.rs | 7 +- src/state/redb/v2light.rs | 7 +- src/sync/apply.rs | 21 +- src/sync/mod.rs | 3 + 30 files changed, 1154 insertions(+), 77 deletions(-) rename src/bin/dolos/doctor/{rebuild_ledger.rs => rebuild.rs} (81%) create mode 100644 src/chain/mod.rs create mode 100644 src/chain/redb/indexes.rs create mode 100644 src/chain/redb/mod.rs create mode 100644 src/chain/redb/tables.rs create mode 100644 src/chain/redb/v1.rs diff --git a/src/bin/dolos/bootstrap/mithril.rs b/src/bin/dolos/bootstrap/mithril.rs index 00da4a30..428ff1df 100644 --- a/src/bin/dolos/bootstrap/mithril.rs +++ b/src/bin/dolos/bootstrap/mithril.rs @@ -252,7 +252,7 @@ pub fn run(config: &crate::Config, args: &Args, feedback: &Feedback) -> miette:: import_hardano_into_wal(config, &immutable_path, feedback)?; - crate::doctor::run_rebuild_ledger(config, feedback).context("rebuilding ledger")?; + crate::doctor::run_rebuild(config, feedback).context("rebuilding ledger and chain")?; if !args.retain_snapshot { info!("deleting downloaded snapshot"); diff --git a/src/bin/dolos/common.rs b/src/bin/dolos/common.rs index 2f4a20d5..85795807 100644 --- a/src/bin/dolos/common.rs +++ b/src/bin/dolos/common.rs @@ -1,4 +1,4 @@ -use dolos::{ledger::pparams::Genesis, state, wal}; +use dolos::{chain, ledger::pparams::Genesis, state, wal}; use miette::{Context as _, IntoDiagnostic}; use std::{path::PathBuf, time::Duration}; use tokio::task::JoinHandle; @@ -10,7 +10,7 @@ use dolos::prelude::*; use crate::{GenesisConfig, LoggingConfig}; -pub type Stores = (wal::redb::WalStore, state::LedgerStore); +pub type Stores = (wal::redb::WalStore, state::LedgerStore, chain::ChainStore); pub fn open_wal(config: &crate::Config) -> Result { let root = &config.storage.path; @@ -36,6 +36,15 @@ pub fn define_ledger_path(config: &crate::Config) -> Result { Ok(ledger) } +pub fn define_chain_path(config: &crate::Config) -> Result { + let root = &config.storage.path; + std::fs::create_dir_all(root).map_err(Error::storage)?; + + let ledger = root.join("chain"); + + Ok(ledger) +} + pub fn open_data_stores(config: &crate::Config) -> Result { let root = &config.storage.path; @@ -52,7 +61,11 @@ pub fn open_data_stores(config: &crate::Config) -> Result { .map_err(Error::storage)? .into(); - Ok((wal, ledger)) + let chain = chain::redb::ChainStore::open(root.join("chain"), config.storage.chain_cache) + .map_err(Error::storage)? + .into(); + + Ok((wal, ledger, chain)) } pub fn setup_tracing(config: &LoggingConfig) -> miette::Result<()> { diff --git a/src/bin/dolos/daemon.rs b/src/bin/dolos/daemon.rs index baadcc2a..f57ff8c5 100644 --- a/src/bin/dolos/daemon.rs +++ b/src/bin/dolos/daemon.rs @@ -10,7 +10,7 @@ pub struct Args {} pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (wal, ledger) = crate::common::open_data_stores(&config)?; + let (wal, ledger, chain) = crate::common::open_data_stores(&config)?; let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?); let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone()); let exit = crate::common::hook_exit_token(); @@ -21,6 +21,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> { &config.storage, wal.clone(), ledger.clone(), + chain.clone(), genesis.clone(), mempool.clone(), &config.retries, @@ -40,6 +41,7 @@ pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> { genesis.clone(), wal.clone(), ledger.clone(), + chain.clone(), mempool.clone(), exit.clone(), )); diff --git a/src/bin/dolos/data/copy_wal.rs b/src/bin/dolos/data/copy_wal.rs index 3ffeb6bd..b6cb1ac4 100644 --- a/src/bin/dolos/data/copy_wal.rs +++ b/src/bin/dolos/data/copy_wal.rs @@ -22,7 +22,7 @@ pub struct Args { pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (source, _) = crate::common::open_data_stores(config).context("opening data stores")?; + let source = crate::common::open_wal(config).context("opening data stores")?; let mut target = dolos::wal::redb::WalStore::open(&args.output, None, None) .into_diagnostic() diff --git a/src/bin/dolos/data/dump_wal.rs b/src/bin/dolos/data/dump_wal.rs index cfb44def..e2d75737 100644 --- a/src/bin/dolos/data/dump_wal.rs +++ b/src/bin/dolos/data/dump_wal.rs @@ -99,7 +99,7 @@ impl Formatter { pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (wal, _) = crate::common::open_data_stores(config).context("opening data stores")?; + let wal = crate::common::open_wal(config).context("opening data stores")?; let mut formatter = Formatter::new_table(); diff --git a/src/bin/dolos/data/export.rs b/src/bin/dolos/data/export.rs index b0b30a0b..952e5d70 100644 --- a/src/bin/dolos/data/export.rs +++ b/src/bin/dolos/data/export.rs @@ -47,6 +47,25 @@ fn prepare_ledger( Ok(()) } +fn prepare_chain( + chain: dolos::chain::ChainStore, + pb: &crate::feedback::ProgressBar, +) -> miette::Result<()> { + let mut chain = match chain { + dolos::chain::ChainStore::Redb(x) => x, + _ => miette::bail!("Only redb is supported for export"), + }; + + let db = chain.db_mut().unwrap(); + pb.set_message("compacting ledger"); + db.compact().into_diagnostic()?; + + pb.set_message("checking ledger integrity"); + db.check_integrity().into_diagnostic()?; + + Ok(()) +} + pub fn run( config: &crate::Config, args: &Args, @@ -58,7 +77,7 @@ pub fn run( let encoder = GzEncoder::new(export_file, Compression::default()); let mut archive = Builder::new(encoder); - let (wal, ledger) = crate::common::open_data_stores(config)?; + let (wal, ledger, chain) = crate::common::open_data_stores(config)?; prepare_wal(wal, &pb)?; @@ -76,6 +95,14 @@ pub fn run( .append_path_with_name(&path, "ledger") .into_diagnostic()?; + prepare_chain(chain, &pb)?; + + let path = config.storage.path.join("chain"); + + archive + .append_path_with_name(&path, "chain") + .into_diagnostic()?; + pb.set_message("creating archive"); archive.finish().into_diagnostic()?; diff --git a/src/bin/dolos/data/prune_wal.rs b/src/bin/dolos/data/prune_wal.rs index 1c01725f..be0a08a5 100644 --- a/src/bin/dolos/data/prune_wal.rs +++ b/src/bin/dolos/data/prune_wal.rs @@ -15,7 +15,7 @@ pub struct Args { pub fn run(config: &crate::Config, args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (mut wal, _) = crate::common::open_data_stores(config).context("opening data stores")?; + let mut wal = crate::common::open_wal(config).context("opening data stores")?; let max_slots = match args.max_slots { Some(x) => x, diff --git a/src/bin/dolos/data/summary.rs b/src/bin/dolos/data/summary.rs index e89d444b..3ec41a57 100644 --- a/src/bin/dolos/data/summary.rs +++ b/src/bin/dolos/data/summary.rs @@ -40,7 +40,7 @@ pub struct Args {} pub fn run(config: &crate::Config, _args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (wal, ledger) = crate::common::open_data_stores(config)?; + let (wal, ledger, _) = crate::common::open_data_stores(config)?; if let Some((seq, point)) = wal.crawl_from(None).unwrap().next() { println!("found WAL start"); diff --git a/src/bin/dolos/doctor/mod.rs b/src/bin/dolos/doctor/mod.rs index be6bdb62..2e3825cf 100644 --- a/src/bin/dolos/doctor/mod.rs +++ b/src/bin/dolos/doctor/mod.rs @@ -2,13 +2,13 @@ use clap::{Parser, Subcommand}; use crate::feedback::Feedback; -mod rebuild_ledger; +mod rebuild; mod wal_integrity; #[derive(Debug, Subcommand)] pub enum Command { - /// rebuilds the whole ledger from chain data - RebuildLedger(rebuild_ledger::Args), + /// rebuilds ledger and chain from WAL + Rebuild(rebuild::Args), /// checks the integrity of the WAL records WalIntegrity(wal_integrity::Args), } @@ -21,13 +21,13 @@ pub struct Args { pub fn run(config: &super::Config, args: &Args, feedback: &Feedback) -> miette::Result<()> { match &args.command { - Command::RebuildLedger(x) => rebuild_ledger::run(config, x, feedback)?, + Command::Rebuild(x) => rebuild::run(config, x, feedback)?, Command::WalIntegrity(x) => wal_integrity::run(config, x)?, } Ok(()) } -pub fn run_rebuild_ledger(config: &super::Config, feedback: &Feedback) -> miette::Result<()> { - rebuild_ledger::run(config, &rebuild_ledger::Args, feedback) +pub fn run_rebuild(config: &super::Config, feedback: &Feedback) -> miette::Result<()> { + rebuild::run(config, &rebuild::Args, feedback) } diff --git a/src/bin/dolos/doctor/rebuild_ledger.rs b/src/bin/dolos/doctor/rebuild.rs similarity index 81% rename from src/bin/dolos/doctor/rebuild_ledger.rs rename to src/bin/dolos/doctor/rebuild.rs index 6a5fb29b..abe15502 100644 --- a/src/bin/dolos/doctor/rebuild_ledger.rs +++ b/src/bin/dolos/doctor/rebuild.rs @@ -1,7 +1,4 @@ -use dolos::{ - ledger, - wal::{self, RawBlock, ReadUtils, WalReader as _}, -}; +use dolos::wal::{self, RawBlock, ReadUtils, WalReader as _}; use itertools::Itertools; use miette::{Context, IntoDiagnostic}; use pallas::ledger::traverse::MultiEraBlock; @@ -43,6 +40,11 @@ pub fn run(config: &crate::Config, _args: &Args, feedback: &Feedback) -> miette: .context("applying origin utxos")?; } + let chain_path = crate::common::define_chain_path(config).context("finding chain path")?; + let chain = dolos::chain::redb::ChainStore::open(chain_path, None) + .into_diagnostic() + .context("opening chain store.")?; + let (_, tip) = wal .find_tip() .into_diagnostic() @@ -54,24 +56,15 @@ pub fn run(config: &crate::Config, _args: &Args, feedback: &Feedback) -> miette: wal::ChainPoint::Specific(slot, _) => progress.set_length(slot), } - let wal_seq = light - .cursor() - .into_diagnostic() - .context("finding ledger cursor")? - .map(|ledger::ChainPoint(s, h)| wal.assert_point(&wal::ChainPoint::Specific(s, h))) - .transpose() - .into_diagnostic() - .context("locating wal sequence")?; - let remaining = wal - .crawl_from(wal_seq) + .crawl_from(None) .into_diagnostic() .context("crawling wal")? .filter_forward() .into_blocks() .flatten(); - for chunk in remaining.chunks(100).into_iter() { + for chunk in remaining.chunks(500).into_iter() { let bodies = chunk.map(|RawBlock { body, .. }| body).collect_vec(); let blocks: Vec<_> = bodies @@ -81,8 +74,17 @@ pub fn run(config: &crate::Config, _args: &Args, feedback: &Feedback) -> miette: .into_diagnostic() .context("decoding blocks")?; - dolos::state::apply_block_batch( - &blocks, + let deltas = dolos::state::calculate_block_batch_deltas(&blocks, &light) + .into_diagnostic() + .context("calculating batch deltas.")?; + + chain + .apply(&deltas) + .into_diagnostic() + .context("applying deltas to chain")?; + + dolos::state::apply_delta_batch( + deltas, &light, &genesis, config.storage.max_ledger_history, diff --git a/src/bin/dolos/doctor/wal_integrity.rs b/src/bin/dolos/doctor/wal_integrity.rs index f099d152..5bbf98b0 100644 --- a/src/bin/dolos/doctor/wal_integrity.rs +++ b/src/bin/dolos/doctor/wal_integrity.rs @@ -37,7 +37,7 @@ pub fn run(config: &crate::Config, _args: &Args) -> miette::Result<()> { let feedback = Feedback::default(); - let (wal, _) = crate::common::open_data_stores(config).context("opening data stores")?; + let wal = crate::common::open_wal(config).context("opening data stores")?; let (_, tip) = wal .find_tip() diff --git a/src/bin/dolos/eval.rs b/src/bin/dolos/eval.rs index 11e9d6d1..5fdd683d 100644 --- a/src/bin/dolos/eval.rs +++ b/src/bin/dolos/eval.rs @@ -28,7 +28,7 @@ pub struct Args { pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (_, ledger) = crate::common::open_data_stores(config)?; + let (_, ledger, _) = crate::common::open_data_stores(config)?; let cbor = std::fs::read_to_string(&args.file) .into_diagnostic() diff --git a/src/bin/dolos/serve.rs b/src/bin/dolos/serve.rs index 9c4f1927..ded888c4 100644 --- a/src/bin/dolos/serve.rs +++ b/src/bin/dolos/serve.rs @@ -10,12 +10,12 @@ pub struct Args {} pub async fn run(config: super::Config, _args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (wal, ledger) = crate::common::open_data_stores(&config)?; + let (wal, ledger, chain) = crate::common::open_data_stores(&config)?; let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?); let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone()); let exit = crate::common::hook_exit_token(); - dolos::serve::serve(config.serve, genesis, wal, ledger, mempool, exit) + dolos::serve::serve(config.serve, genesis, wal, ledger, chain, mempool, exit) .await .context("serving clients")?; diff --git a/src/bin/dolos/sync.rs b/src/bin/dolos/sync.rs index d1669a6a..52b4ad3a 100644 --- a/src/bin/dolos/sync.rs +++ b/src/bin/dolos/sync.rs @@ -12,7 +12,7 @@ pub struct Args { pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> { crate::common::setup_tracing(&config.logging)?; - let (wal, ledger) = crate::common::open_data_stores(config)?; + let (wal, ledger, chain) = crate::common::open_data_stores(config)?; let genesis = Arc::new(crate::common::open_genesis_files(&config.genesis)?); let mempool = dolos::mempool::Mempool::new(genesis.clone(), ledger.clone()); @@ -22,6 +22,7 @@ pub fn run(config: &super::Config, args: &Args) -> miette::Result<()> { &config.storage, wal, ledger, + chain, genesis, mempool, &config.retries, diff --git a/src/chain/mod.rs b/src/chain/mod.rs new file mode 100644 index 00000000..1d3c4e52 --- /dev/null +++ b/src/chain/mod.rs @@ -0,0 +1,168 @@ +use pallas::ledger::traverse::MultiEraBlock; +use thiserror::Error; + +use crate::ledger::{BrokenInvariant, LedgerDelta}; +use crate::model::{BlockBody, BlockSlot}; + +pub mod redb; + +#[derive(Debug, Error)] +pub enum ChainError { + #[error("broken invariant")] + BrokenInvariant(#[source] BrokenInvariant), + + #[error("storage error")] + StorageError(#[source] ::redb::Error), + + #[error("address decoding error")] + AddressDecoding(pallas::ledger::addresses::Error), + + #[error("query not supported")] + QueryNotSupported, + + #[error("invalid store version")] + InvalidStoreVersion, + + #[error("decoding error")] + DecodingError(#[source] pallas::codec::minicbor::decode::Error), + + #[error("block decoding error")] + BlockDecodingError(#[source] pallas::ledger::traverse::Error), +} + +impl From<::redb::TableError> for ChainError { + fn from(value: ::redb::TableError) -> Self { + Self::StorageError(value.into()) + } +} + +impl From<::redb::CommitError> for ChainError { + fn from(value: ::redb::CommitError) -> Self { + Self::StorageError(value.into()) + } +} + +impl From<::redb::StorageError> for ChainError { + fn from(value: ::redb::StorageError) -> Self { + Self::StorageError(value.into()) + } +} + +impl From<::redb::TransactionError> for ChainError { + fn from(value: ::redb::TransactionError) -> Self { + Self::StorageError(value.into()) + } +} + +impl From for ChainError { + fn from(value: pallas::ledger::addresses::Error) -> Self { + Self::AddressDecoding(value) + } +} + +/// A persistent store for ledger state +#[derive(Clone)] +#[non_exhaustive] +pub enum ChainStore { + Redb(redb::ChainStore), +} + +impl ChainStore { + pub fn get_possible_block_slots_by_address( + &self, + address: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::Redb(x) => x.get_possible_block_slots_by_address(address), + } + } + + pub fn get_possible_block_slots_by_tx_hash( + &self, + tx_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::Redb(x) => x.get_possible_block_slots_by_tx_hash(tx_hash), + } + } + + pub fn get_possible_block_slots_by_block_hash( + &self, + block_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::Redb(x) => x.get_possible_block_slots_by_block_hash(block_hash), + } + } + + pub fn get_possible_blocks_by_address( + &self, + address: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::Redb(x) => x.get_possible_blocks_by_address(address), + } + } + + pub fn get_possible_blocks_by_tx_hash( + &self, + tx_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::Redb(x) => x.get_possible_blocks_by_tx_hash(tx_hash), + } + } + + pub fn get_possible_blocks_by_block_hash( + &self, + block_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::Redb(x) => x.get_possible_blocks_by_block_hash(block_hash), + } + } + + pub fn get_block_by_block_hash( + &self, + block_hash: &[u8], + ) -> Result, ChainError> { + let possible = self.get_possible_blocks_by_block_hash(block_hash)?; + for raw in possible { + let block = MultiEraBlock::decode(&raw).map_err(ChainError::BlockDecodingError)?; + if *block.hash() == *block_hash { + return Ok(Some(raw)); + } + } + Ok(None) + } + + pub fn get_block_by_slot(&self, slot: &BlockSlot) -> Result, ChainError> { + match self { + ChainStore::Redb(x) => x.get_block_by_slot(slot), + } + } + + pub fn apply(&self, deltas: &[LedgerDelta]) -> Result<(), ChainError> { + match self { + ChainStore::Redb(x) => x.apply(deltas), + } + } + + pub fn finalize(&self, until: BlockSlot) -> Result<(), ChainError> { + match self { + ChainStore::Redb(x) => x.finalize(until), + } + } + + pub fn copy(&self, target: &Self) -> Result<(), ChainError> { + match (self, target) { + (Self::Redb(x), Self::Redb(target)) => x.copy(target), + } + } +} + +impl From for ChainStore { + fn from(value: redb::ChainStore) -> Self { + Self::Redb(value) + } +} diff --git a/src/chain/redb/indexes.rs b/src/chain/redb/indexes.rs new file mode 100644 index 00000000..38f8f126 --- /dev/null +++ b/src/chain/redb/indexes.rs @@ -0,0 +1,374 @@ +use ::redb::{ReadTransaction, ReadableTable as _}; +use ::redb::{TableDefinition, WriteTransaction}; +use pallas::ledger::addresses::Address; +use pallas::ledger::traverse::MultiEraOutput; +use std::hash::{DefaultHasher, Hash as _, Hasher}; + +use crate::ledger::LedgerDelta; +use crate::model::BlockSlot; + +type Error = crate::chain::ChainError; + +pub struct AddressApproxIndexTable; +impl AddressApproxIndexTable { + pub const DEF: TableDefinition<'static, u64, Vec> = + TableDefinition::new("addressapproxindex"); + + pub fn initialize(wx: &WriteTransaction) -> Result<(), Error> { + wx.open_table(Self::DEF)?; + + Ok(()) + } + + pub fn compute_key(address: &[u8]) -> u64 { + let mut hasher = DefaultHasher::new(); + address.hash(&mut hasher); + hasher.finish() + } + + pub fn get_by_address(rx: &ReadTransaction, address: &[u8]) -> Result, Error> { + let table = rx.open_table(Self::DEF)?; + let default = Ok(vec![]); + let key = Self::compute_key(address); + match table.get(key)? { + Some(value) => Ok(value.value().clone()), + None => default, + } + } + + pub fn insert(wx: &WriteTransaction, addresses: Vec>, slot: u64) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + for address in addresses { + let key = Self::compute_key(&address); + + let maybe_new = match table.get(key)? { + Some(value) => { + let mut previous = value.value().clone(); + if !previous.contains(&slot) { + previous.push(slot); + Some(previous) + } else { + None + } + } + None => Some(vec![slot]), + }; + if let Some(new) = maybe_new { + table.insert(key, new)?; + } + } + + Ok(()) + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + if let Some(point) = &delta.new_position { + // Produced + let produced = delta + .produced_utxo + .values() + .map(|body| { + let body = MultiEraOutput::try_from(body).map_err(Error::DecodingError)?; + match body.address()? { + Address::Shelley(add) => Ok(add.to_vec()), + Address::Byron(add) => Ok(add.to_vec()), + Address::Stake(add) => Ok(add.to_vec()), + } + }) + .collect::>, Error>>()?; + + // Consumed + let consumed = delta + .consumed_utxo + .values() + .map(|body| { + let body = MultiEraOutput::try_from(body).map_err(Error::DecodingError)?; + match body.address()? { + Address::Shelley(add) => Ok(add.to_vec()), + Address::Byron(add) => Ok(add.to_vec()), + Address::Stake(add) => Ok(add.to_vec()), + } + }) + .collect::>, Error>>()?; + + Self::insert( + wx, + produced.into_iter().chain(consumed.into_iter()).collect(), + point.0, + )?; + } + + if let Some(point) = &delta.undone_position { + // Produced + let recovered = delta + .recovered_stxi + .values() + .map(|body| { + let body = MultiEraOutput::try_from(body).map_err(Error::DecodingError)?; + match body.address()? { + Address::Shelley(add) => Ok(add.to_vec()), + Address::Byron(add) => Ok(add.to_vec()), + Address::Stake(add) => Ok(add.to_vec()), + } + }) + .collect::>, Error>>()?; + + // Consumed + let undone = delta + .undone_utxo + .values() + .map(|body| { + let body = MultiEraOutput::try_from(body).map_err(Error::DecodingError)?; + match body.address()? { + Address::Shelley(add) => Ok(add.to_vec()), + Address::Byron(add) => Ok(add.to_vec()), + Address::Stake(add) => Ok(add.to_vec()), + } + }) + .collect::>, Error>>()?; + + Self::remove( + wx, + recovered.into_iter().chain(undone.into_iter()).collect(), + point.0, + )?; + } + + Ok(()) + } + + pub fn remove(wx: &WriteTransaction, addresses: Vec>, slot: u64) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + + for address in addresses { + let key = Self::compute_key(&address); + + let maybe_new = match table.get(key)? { + Some(value) => { + let mut previous = value.value().clone(); + match previous.iter().position(|x| *x == slot) { + Some(index) => { + previous.remove(index); + Some(previous) + } + None => None, + } + } + None => None, + }; + if let Some(new) = maybe_new { + table.insert(key, new)?; + } + } + + Ok(()) + } + + pub fn copy(rx: &ReadTransaction, wx: &WriteTransaction) -> Result<(), Error> { + let source = rx.open_table(Self::DEF)?; + let mut target = wx.open_table(Self::DEF)?; + + for entry in source.iter()? { + let (k, v) = entry?; + target.insert(k.value(), v.value())?; + } + + Ok(()) + } +} + +pub struct TxsApproxIndexTable; +impl TxsApproxIndexTable { + pub const DEF: TableDefinition<'static, u64, Vec> = TableDefinition::new("txsapproxindex"); + + pub fn initialize(wx: &WriteTransaction) -> Result<(), Error> { + wx.open_table(Self::DEF)?; + + Ok(()) + } + + pub fn compute_key(tx_hash: &[u8]) -> u64 { + let mut hasher = DefaultHasher::new(); + tx_hash.hash(&mut hasher); + hasher.finish() + } + + pub fn get_by_tx_hash(rx: &ReadTransaction, tx_hash: &[u8]) -> Result, Error> { + let table = rx.open_table(Self::DEF)?; + let default = Ok(vec![]); + let key = Self::compute_key(tx_hash); + match table.get(key)? { + Some(value) => Ok(value.value().clone()), + None => default, + } + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + + if let Some(point) = &delta.new_position { + let slot = point.0; + let tx_hashes = delta + .new_txs + .keys() + .map(|hash| hash.to_vec()) + .collect::>>(); + + for tx_hash in tx_hashes { + let key = Self::compute_key(&tx_hash); + + let maybe_new = match table.get(key)? { + Some(value) => { + let mut previous = value.value().clone(); + if !previous.contains(&slot) { + previous.push(slot); + Some(previous) + } else { + None + } + } + None => Some(vec![slot]), + }; + if let Some(new) = maybe_new { + table.insert(key, new)?; + } + } + } + + if let Some(point) = &delta.undone_position { + let slot = point.0; + let tx_hashes = delta + .undone_txs + .keys() + .map(|hash| hash.to_vec()) + .collect::>>(); + + for tx_hash in tx_hashes { + let key = Self::compute_key(&tx_hash); + + let maybe_new = match table.get(key)? { + Some(value) => { + let mut previous = value.value().clone(); + match previous.iter().position(|x| *x == slot) { + Some(index) => { + previous.remove(index); + Some(previous) + } + None => None, + } + } + None => None, + }; + if let Some(new) = maybe_new { + table.insert(key, new)?; + } + } + } + + Ok(()) + } + + pub fn copy(rx: &ReadTransaction, wx: &WriteTransaction) -> Result<(), Error> { + let source = rx.open_table(Self::DEF)?; + let mut target = wx.open_table(Self::DEF)?; + + for entry in source.iter()? { + let (k, v) = entry?; + target.insert(k.value(), v.value())?; + } + + Ok(()) + } +} + +pub struct BlockHashApproxIndexTable; +impl BlockHashApproxIndexTable { + pub const DEF: TableDefinition<'static, u64, Vec> = + TableDefinition::new("blockhashapproxindex"); + + pub fn initialize(wx: &WriteTransaction) -> Result<(), Error> { + wx.open_table(Self::DEF)?; + + Ok(()) + } + + pub fn compute_key(block_hash: &[u8]) -> u64 { + let mut hasher = DefaultHasher::new(); + block_hash.hash(&mut hasher); + hasher.finish() + } + + pub fn get_by_block_hash( + rx: &ReadTransaction, + block_hash: &[u8], + ) -> Result, Error> { + let table = rx.open_table(Self::DEF)?; + let default = Ok(vec![]); + let key = Self::compute_key(block_hash); + match table.get(key)? { + Some(value) => Ok(value.value().clone()), + None => default, + } + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + + if let Some(point) = &delta.new_position { + let key = Self::compute_key(point.1.as_ref()); + let slot = point.0; + + let maybe_new = match table.get(key)? { + Some(value) => { + let mut previous = value.value().clone(); + if !previous.contains(&slot) { + previous.push(slot); + Some(previous) + } else { + None + } + } + None => Some(vec![slot]), + }; + if let Some(new) = maybe_new { + table.insert(key, new)?; + } + } + + if let Some(point) = &delta.undone_position { + let key = Self::compute_key(point.1.as_ref()); + let slot = point.0; + + let maybe_new = match table.get(key)? { + Some(value) => { + let mut previous = value.value().clone(); + match previous.iter().position(|x| *x == slot) { + Some(index) => { + previous.remove(index); + Some(previous) + } + None => None, + } + } + None => None, + }; + if let Some(new) = maybe_new { + table.insert(key, new)?; + } + } + + Ok(()) + } + + pub fn copy(rx: &ReadTransaction, wx: &WriteTransaction) -> Result<(), Error> { + let source = rx.open_table(Self::DEF)?; + let mut target = wx.open_table(Self::DEF)?; + + for entry in source.iter()? { + let (k, v) = entry?; + target.insert(k.value(), v.value())?; + } + + Ok(()) + } +} diff --git a/src/chain/redb/mod.rs b/src/chain/redb/mod.rs new file mode 100644 index 00000000..bf8b4590 --- /dev/null +++ b/src/chain/redb/mod.rs @@ -0,0 +1,213 @@ +use ::redb::{Database, MultimapTableHandle as _, TableHandle as _}; +use itertools::Itertools; +use log::info; +use std::path::Path; + +use tracing::{debug, warn}; + +mod indexes; +mod tables; +mod v1; + +use super::*; + +const DEFAULT_CACHE_SIZE_MB: usize = 500; + +fn compute_schema_hash(db: &Database) -> Result, ChainError> { + let mut hasher = pallas::crypto::hash::Hasher::<160>::new(); + + let rx = db + .begin_read() + .map_err(|e| ChainError::StorageError(e.into()))?; + + let names_1 = rx + .list_tables() + .map_err(|e| ChainError::StorageError(e.into()))? + .map(|t| t.name().to_owned()); + + let names_2 = rx + .list_multimap_tables() + .map_err(|e| ChainError::StorageError(e.into()))? + .map(|t| t.name().to_owned()); + + let mut names = names_1.chain(names_2).collect_vec(); + + debug!(tables = ?names, "tables names used to compute hash"); + + if names.is_empty() { + // this db hasn't been initialized, we can't compute hash + return Ok(None); + } + + // sort to make sure we don't depend on some redb implementation regarding order + // of the tables. + names.sort(); + + names.into_iter().for_each(|n| hasher.input(n.as_bytes())); + + let hash = hasher.finalize(); + + Ok(Some(hash.to_string())) +} + +fn open_db(path: impl AsRef, cache_size: Option) -> Result { + let db = Database::builder() + .set_repair_callback(|x| warn!(progress = x.progress() * 100f64, "ledger db is repairing")) + .set_cache_size(1024 * 1024 * cache_size.unwrap_or(DEFAULT_CACHE_SIZE_MB)) + .create(path) + .map_err(|x| ChainError::StorageError(x.into()))?; + + Ok(db) +} + +impl From<::redb::Error> for ChainError { + fn from(value: ::redb::Error) -> Self { + ChainError::StorageError(value) + } +} + +const V1_HASH: &str = "ac7a7fbaf084bc058c753a07cc86849db28c051c"; + +#[derive(Clone)] +pub enum ChainStore { + SchemaV1(v1::ChainStore), +} + +impl ChainStore { + pub fn open(path: impl AsRef, cache_size: Option) -> Result { + let db = open_db(path, cache_size)?; + let hash = compute_schema_hash(&db)?; + + let schema = match hash.as_deref() { + // use stable schema if no hash + None => { + info!("no state db schema, initializing as v1"); + v1::ChainStore::initialize(db)?.into() + } + Some(V1_HASH) => { + info!("detected state db schema v1"); + v1::ChainStore::from(db).into() + } + Some(x) => panic!("can't recognize db hash {}", x), + }; + + Ok(schema) + } + + pub fn in_memory_v1() -> Result { + let db = ::redb::Database::builder() + .create_with_backend(::redb::backends::InMemoryBackend::new()) + .unwrap(); + + let store = v1::ChainStore::initialize(db)?; + Ok(store.into()) + } + + pub fn db(&self) -> &Database { + match self { + ChainStore::SchemaV1(x) => x.db(), + } + } + + pub fn db_mut(&mut self) -> Option<&mut Database> { + match self { + ChainStore::SchemaV1(x) => x.db_mut(), + } + } + + pub fn get_possible_block_slots_by_address( + &self, + address: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.get_possible_block_slots_by_address(address)?), + } + } + + pub fn get_possible_block_slots_by_tx_hash( + &self, + tx_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.get_possible_block_slots_by_tx_hash(tx_hash)?), + } + } + + pub fn get_possible_block_slots_by_block_hash( + &self, + block_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.get_possible_block_slots_by_block_hash(block_hash)?), + } + } + + pub fn get_possible_blocks_by_address( + &self, + address: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.get_possible_blocks_by_address(address)?), + } + } + + pub fn get_possible_blocks_by_tx_hash( + &self, + tx_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.get_possible_blocks_by_tx_hash(tx_hash)?), + } + } + + pub fn get_possible_blocks_by_block_hash( + &self, + block_hash: &[u8], + ) -> Result, ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.get_possible_blocks_by_block_hash(block_hash)?), + } + } + + pub fn get_block_by_slot(&self, slot: &BlockSlot) -> Result, ChainError> { + match self { + ChainStore::SchemaV1(x) => x.get_block_by_slot(slot), + } + } + + pub fn apply(&self, deltas: &[LedgerDelta]) -> Result<(), ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.apply(deltas)?), + } + } + + pub fn finalize(&self, until: BlockSlot) -> Result<(), ChainError> { + match self { + ChainStore::SchemaV1(x) => Ok(x.finalize(until)?), + } + } + + pub fn copy(&self, target: &Self) -> Result<(), ChainError> { + match (self, target) { + (ChainStore::SchemaV1(x), ChainStore::SchemaV1(target)) => Ok(x.copy(target)?), + } + } +} + +impl From for ChainStore { + fn from(value: v1::ChainStore) -> Self { + Self::SchemaV1(value) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn schema_hash_computation() { + let store = ChainStore::in_memory_v1().unwrap(); + let hash = compute_schema_hash(store.db()).unwrap(); + assert_eq!(hash.unwrap(), V1_HASH); + } +} diff --git a/src/chain/redb/tables.rs b/src/chain/redb/tables.rs new file mode 100644 index 00000000..e65f6573 --- /dev/null +++ b/src/chain/redb/tables.rs @@ -0,0 +1,53 @@ +use ::redb::{ReadTransaction, ReadableTable as _}; +use ::redb::{TableDefinition, WriteTransaction}; + +use crate::ledger::LedgerDelta; +use crate::model::{BlockBody, BlockSlot}; + +type Error = crate::chain::ChainError; + +pub struct BlocksTable; +impl BlocksTable { + pub const DEF: TableDefinition<'static, BlockSlot, BlockBody> = TableDefinition::new("blocks"); + + pub fn initialize(wx: &WriteTransaction) -> Result<(), Error> { + wx.open_table(Self::DEF)?; + + Ok(()) + } + + pub fn get_by_slot(rx: &ReadTransaction, slot: BlockSlot) -> Result, Error> { + let table = rx.open_table(Self::DEF)?; + match table.get(slot)? { + Some(value) => Ok(Some(value.value().clone())), + None => Ok(None), + } + } + + pub fn apply(wx: &WriteTransaction, delta: &LedgerDelta) -> Result<(), Error> { + let mut table = wx.open_table(Self::DEF)?; + if let Some(point) = &delta.new_position { + let slot = point.0; + table.insert(slot, delta.new_block.clone())?; + } + + if let Some(point) = &delta.undone_position { + let slot = point.0; + table.remove(slot)?; + } + + Ok(()) + } + + pub fn copy(rx: &ReadTransaction, wx: &WriteTransaction) -> Result<(), Error> { + let source = rx.open_table(Self::DEF)?; + let mut target = wx.open_table(Self::DEF)?; + + for entry in source.iter()? { + let (k, v) = entry?; + target.insert(k.value(), v.value())?; + } + + Ok(()) + } +} diff --git a/src/chain/redb/v1.rs b/src/chain/redb/v1.rs new file mode 100644 index 00000000..db4d8792 --- /dev/null +++ b/src/chain/redb/v1.rs @@ -0,0 +1,142 @@ +use ::redb::{Database, Durability}; +use std::sync::Arc; + +type Error = crate::chain::ChainError; + +use super::{indexes, tables, LedgerDelta}; +use crate::model::{BlockBody, BlockSlot}; + +#[derive(Clone)] +pub struct ChainStore(pub Arc); + +impl ChainStore { + pub fn initialize(db: Database) -> Result { + let mut wx = db.begin_write()?; + wx.set_durability(Durability::Immediate); + + indexes::AddressApproxIndexTable::initialize(&wx)?; + indexes::BlockHashApproxIndexTable::initialize(&wx)?; + indexes::TxsApproxIndexTable::initialize(&wx)?; + tables::BlocksTable::initialize(&wx)?; + + wx.commit()?; + + Ok(db.into()) + } + + pub(crate) fn db(&self) -> &Database { + &self.0 + } + + pub(crate) fn db_mut(&mut self) -> Option<&mut Database> { + Arc::get_mut(&mut self.0) + } + + pub fn apply(&self, deltas: &[LedgerDelta]) -> Result<(), Error> { + let mut wx = self.db().begin_write()?; + wx.set_durability(Durability::Eventual); + + for delta in deltas { + indexes::AddressApproxIndexTable::apply(&wx, delta)?; + indexes::BlockHashApproxIndexTable::apply(&wx, delta)?; + indexes::TxsApproxIndexTable::apply(&wx, delta)?; + tables::BlocksTable::apply(&wx, delta)?; + } + + wx.commit()?; + + Ok(()) + } + + pub fn copy(&self, target: &Self) -> Result<(), Error> { + let rx = self.db().begin_read()?; + let wx = target.db().begin_write()?; + + indexes::AddressApproxIndexTable::copy(&rx, &wx)?; + indexes::BlockHashApproxIndexTable::copy(&rx, &wx)?; + indexes::TxsApproxIndexTable::copy(&rx, &wx)?; + tables::BlocksTable::copy(&rx, &wx)?; + + wx.commit()?; + + Ok(()) + } + + pub fn finalize(&self, _: BlockSlot) -> Result<(), Error> { + Ok(()) + } + + pub fn get_possible_block_slots_by_address( + &self, + address: &[u8], + ) -> Result, Error> { + let rx = self.db().begin_read()?; + indexes::AddressApproxIndexTable::get_by_address(&rx, address) + } + + pub fn get_possible_block_slots_by_tx_hash( + &self, + tx_hash: &[u8], + ) -> Result, Error> { + let rx = self.db().begin_read()?; + indexes::TxsApproxIndexTable::get_by_tx_hash(&rx, tx_hash) + } + + pub fn get_possible_block_slots_by_block_hash( + &self, + block_hash: &[u8], + ) -> Result, Error> { + let rx = self.db().begin_read()?; + indexes::BlockHashApproxIndexTable::get_by_block_hash(&rx, block_hash) + } + + pub fn get_block_by_slot(&self, slot: &BlockSlot) -> Result, Error> { + let rx = self.db().begin_read()?; + tables::BlocksTable::get_by_slot(&rx, *slot) + } + + pub fn get_possible_blocks_by_address(&self, address: &[u8]) -> Result, Error> { + let rx = self.db().begin_read()?; + indexes::AddressApproxIndexTable::get_by_address(&rx, address)? + .iter() + .flat_map(|slot| match self.get_block_by_slot(slot) { + Ok(Some(block)) => Some(Ok(block)), + Ok(None) => None, + Err(e) => Some(Err(e)), + }) + .collect() + } + + pub fn get_possible_blocks_by_tx_hash(&self, tx_hash: &[u8]) -> Result, Error> { + let rx = self.db().begin_read()?; + indexes::TxsApproxIndexTable::get_by_tx_hash(&rx, tx_hash)? + .iter() + .flat_map(|slot| match self.get_block_by_slot(slot) { + Ok(Some(block)) => Some(Ok(block)), + Ok(None) => None, + Err(e) => Some(Err(e)), + }) + .collect() + } + + pub fn get_possible_blocks_by_block_hash( + &self, + block_hash: &[u8], + ) -> Result, Error> { + let rx = self.db().begin_read()?; + indexes::BlockHashApproxIndexTable::get_by_block_hash(&rx, block_hash)? + .iter() + .flat_map(|slot| match self.get_block_by_slot(slot) { + Ok(Some(block)) => Some(Ok(block)), + Ok(None) => None, + Err(e) => Some(Err(e)), + }) + .collect() + } +} + +impl From for ChainStore { + fn from(value: Database) -> Self { + Self(Arc::new(value)) + } +} diff --git a/src/ledger/mod.rs b/src/ledger/mod.rs index 57a125ec..cc9676ab 100644 --- a/src/ledger/mod.rs +++ b/src/ledger/mod.rs @@ -1,10 +1,13 @@ -use pallas::ledger::traverse::{Era, MultiEraBlock, MultiEraInput, MultiEraUpdate}; +use pallas::codec::minicbor; +use pallas::ledger::traverse::{Era, MultiEraBlock, MultiEraInput, MultiEraTx, MultiEraUpdate}; use pallas::{crypto::hash::Hash, ledger::traverse::MultiEraOutput}; use pparams::Genesis; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use thiserror::Error; +use crate::model::BlockBody; + pub mod pparams; //pub mod validate; @@ -43,6 +46,14 @@ impl<'a> TryFrom<&'a EraCbor> for MultiEraOutput<'a> { } } +impl<'a> TryFrom<&'a EraCbor> for MultiEraTx<'a> { + type Error = pallas::codec::minicbor::decode::Error; + + fn try_from(value: &'a EraCbor) -> Result { + MultiEraTx::decode_for_era(value.0, &value.1) + } +} + impl TryFrom for MultiEraUpdate<'_> { type Error = pallas::codec::minicbor::decode::Error; @@ -106,6 +117,9 @@ pub struct LedgerDelta { pub recovered_stxi: HashMap, pub undone_utxo: HashMap, pub new_pparams: Vec, + pub new_txs: HashMap, + pub undone_txs: HashMap, + pub new_block: BlockBody, } /// Computes the ledger delta of applying a particular block. @@ -126,8 +140,17 @@ pub fn compute_delta( block: &MultiEraBlock, mut context: LedgerSlice, ) -> Result { + let era: u16 = block.era().into(); let mut delta = LedgerDelta { new_position: Some(ChainPoint(block.slot(), block.hash())), + new_block: match block { + MultiEraBlock::Byron(x) => minicbor::to_vec((era, x)).unwrap(), + MultiEraBlock::Conway(x) => minicbor::to_vec((era, x)).unwrap(), + MultiEraBlock::Babbage(x) => minicbor::to_vec((era, x)).unwrap(), + MultiEraBlock::AlonzoCompatible(x, _) => minicbor::to_vec((era, x)).unwrap(), + MultiEraBlock::EpochBoundary(x) => minicbor::to_vec((0_u16, x)).unwrap(), + _ => Default::default(), + }, ..Default::default() }; @@ -156,6 +179,12 @@ pub fn compute_delta( } } + delta.new_txs = block + .txs() + .into_iter() + .map(|tx| (tx.hash(), EraCbor::from((tx.era(), tx.encode())))) + .collect(); + // check block-level updates (because of f#!@#@ byron) if let Some(update) = block.update() { delta @@ -197,6 +226,12 @@ pub fn compute_undo_delta( } } + delta.undone_txs = block + .txs() + .into_iter() + .map(|tx| (tx.hash(), EraCbor::from((tx.era(), tx.encode())))) + .collect(); + Ok(delta) } diff --git a/src/lib.rs b/src/lib.rs index ae027179..7f20ef7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,4 @@ +pub mod chain; pub mod ledger; pub mod mempool; pub mod model; diff --git a/src/model.rs b/src/model.rs index a14d1bfa..709b85f0 100644 --- a/src/model.rs +++ b/src/model.rs @@ -53,6 +53,9 @@ pub struct StorageConfig { /// Size (in Mb) of memory allocated for ledger caching pub ledger_cache: Option, + /// Size (in Mb) of memory allocated for chain caching + pub chain_cache: Option, + /// Maximum number of slots (not blocks) to keep in the WAL pub max_wal_history: Option, @@ -66,6 +69,7 @@ impl Default for StorageConfig { path: std::path::PathBuf::from("data"), wal_cache: None, ledger_cache: None, + chain_cache: None, max_wal_history: None, max_ledger_history: None, } diff --git a/src/serve/grpc/mod.rs b/src/serve/grpc/mod.rs index 8b63d154..2e534132 100644 --- a/src/serve/grpc/mod.rs +++ b/src/serve/grpc/mod.rs @@ -7,6 +7,7 @@ use tonic::transport::{Certificate, Server, ServerTlsConfig}; use tower_http::cors::CorsLayer; use tracing::info; +use crate::chain::ChainStore; use crate::ledger::pparams::Genesis; use crate::mempool::Mempool; use crate::prelude::*; @@ -31,12 +32,13 @@ pub async fn serve( genesis: Arc, wal: WalStore, ledger: LedgerStore, + chain: ChainStore, mempool: Mempool, exit: CancellationToken, ) -> Result<(), Error> { let addr = config.listen_address.parse().unwrap(); - let sync_service = sync::SyncServiceImpl::new(wal.clone(), ledger.clone()); + let sync_service = sync::SyncServiceImpl::new(wal.clone(), ledger.clone(), chain); let sync_service = u5c::sync::sync_service_server::SyncServiceServer::new(sync_service); let query_service = query::QueryServiceImpl::new(ledger.clone(), genesis.clone()); diff --git a/src/serve/grpc/sync.rs b/src/serve/grpc/sync.rs index 923807e0..9f0233fa 100644 --- a/src/serve/grpc/sync.rs +++ b/src/serve/grpc/sync.rs @@ -8,6 +8,7 @@ use pallas::interop::utxorpc::{spec as u5c, Mapper}; use std::pin::Pin; use tonic::{Request, Response, Status}; +use crate::chain::ChainStore; use crate::state::LedgerStore; use crate::wal::{self, ChainPoint, RawBlock, WalReader as _}; @@ -81,14 +82,16 @@ fn point_to_reset_tip_response(point: ChainPoint) -> u5c::sync::FollowTipRespons pub struct SyncServiceImpl { wal: wal::redb::WalStore, + chain: ChainStore, mapper: interop::Mapper, } impl SyncServiceImpl { - pub fn new(wal: wal::redb::WalStore, ledger: LedgerStore) -> Self { + pub fn new(wal: wal::redb::WalStore, ledger: LedgerStore, chain: ChainStore) -> Self { Self { wal, mapper: Mapper::new(ledger), + chain, } } } @@ -104,19 +107,28 @@ impl u5c::sync::sync_service_server::SyncService for SyncServiceImpl { ) -> Result, Status> { let message = request.into_inner(); - let points: Vec<_> = message + let out: Vec<_> = message .r#ref - .into_iter() - .map(u5c_to_chain_point) - .try_collect()?; - - let out = self - .wal - .read_sparse_blocks(&points) - .map_err(|_err| Status::internal("can't query block"))? - .into_iter() - .map(|x| raw_to_anychain(&self.mapper, &x)) - .collect(); + .iter() + .map(|br| { + let maybe_raw = self + .chain + .get_block_by_slot(&br.index) + .map_err(|_| Status::internal("Failed to query chain service."))?; + + match maybe_raw { + Some(body) => { + let block = self.mapper.map_block_cbor(&body); + + Ok(u5c::sync::AnyChainBlock { + native_bytes: body.to_vec().into(), + chain: u5c::sync::any_chain_block::Chain::Cardano(block).into(), + }) + } + None => Err(Status::not_found(format!("Failed to find block: {:?}", br))), + } + }) + .collect::, Status>>()?; let response = u5c::sync::FetchBlockResponse { block: out }; diff --git a/src/serve/mod.rs b/src/serve/mod.rs index 72c6754c..b63363fc 100644 --- a/src/serve/mod.rs +++ b/src/serve/mod.rs @@ -6,6 +6,7 @@ use serde::{Deserialize, Serialize}; use tokio_util::sync::CancellationToken; use tracing::info; +use crate::chain::ChainStore; use crate::ledger::pparams::Genesis; use crate::mempool::Mempool; use crate::state::LedgerStore; @@ -41,6 +42,7 @@ pub async fn serve( genesis: Arc, wal: WalStore, ledger: LedgerStore, + chain: ChainStore, mempool: Mempool, exit: CancellationToken, ) -> miette::Result<()> { @@ -53,6 +55,7 @@ pub async fn serve( genesis.clone(), wal.clone(), ledger, + chain, mempool, exit.clone(), ) diff --git a/src/state/mod.rs b/src/state/mod.rs index d3acc969..aaf00a5e 100644 --- a/src/state/mod.rs +++ b/src/state/mod.rs @@ -184,6 +184,12 @@ pub fn load_slice_for_block( .map(|utxo| TxoRef(*utxo.hash(), utxo.index() as u32)) .collect(); + let refferenced: HashSet<_> = txs + .values() + .flat_map(MultiEraTx::reference_inputs) + .map(|utxo| TxoRef(*utxo.hash(), utxo.index() as u32)) + .collect(); + let consumed_same_block: HashMap<_, _> = txs .iter() .flat_map(|(tx_hash, tx)| { @@ -191,18 +197,19 @@ pub fn load_slice_for_block( .into_iter() .map(|(idx, utxo)| (TxoRef(*tx_hash, idx as u32), utxo.into())) }) - .filter(|(x, _)| consumed.contains(x)) + .filter(|(x, _)| consumed.contains(x) || refferenced.contains(x)) .collect(); let consumed_unapplied_deltas: HashMap<_, _> = unapplied_deltas .iter() .flat_map(|d| d.produced_utxo.iter().chain(d.recovered_stxi.iter())) - .filter(|(x, _)| consumed.contains(x)) + .filter(|(x, _)| consumed.contains(x) || refferenced.contains(x)) .map(|(k, v)| (k.clone(), v.clone())) .collect(); let to_fetch = consumed .into_iter() + .chain(refferenced) .filter(|x| !consumed_same_block.contains_key(x)) .filter(|x| !consumed_unapplied_deltas.contains_key(x)) .collect_vec(); @@ -216,12 +223,10 @@ pub fn load_slice_for_block( Ok(LedgerSlice { resolved_inputs }) } -pub fn apply_block_batch<'a>( +pub fn calculate_block_batch_deltas<'a>( blocks: impl IntoIterator>, store: &LedgerStore, - genesis: &Genesis, - max_ledger_history: Option, -) -> Result<(), LedgerError> { +) -> Result, LedgerError> { let mut deltas: Vec = vec![]; for block in blocks { @@ -230,7 +235,15 @@ pub fn apply_block_batch<'a>( deltas.push(delta); } + Ok(deltas) +} +pub fn apply_delta_batch( + deltas: Vec, + store: &LedgerStore, + genesis: &Genesis, + max_ledger_history: Option, +) -> Result<(), LedgerError> { store.apply(&deltas)?; let tip = deltas @@ -247,3 +260,13 @@ pub fn apply_block_batch<'a>( Ok(()) } + +pub fn apply_block_batch<'a>( + blocks: impl IntoIterator>, + store: &LedgerStore, + genesis: &Genesis, + max_ledger_history: Option, +) -> Result<(), LedgerError> { + let deltas = calculate_block_batch_deltas(blocks, store)?; + apply_delta_batch(deltas, store, genesis, max_ledger_history) +} diff --git a/src/state/redb/mod.rs b/src/state/redb/mod.rs index d6708dcc..b6e5fcf7 100644 --- a/src/state/redb/mod.rs +++ b/src/state/redb/mod.rs @@ -325,12 +325,7 @@ mod tests { 1, pallas::crypto::hash::Hash::new(b"01010101010101010101010101010101".to_owned()), )), - undone_position: Default::default(), - produced_utxo: Default::default(), - consumed_utxo: Default::default(), - recovered_stxi: Default::default(), - undone_utxo: Default::default(), - new_pparams: Default::default(), + ..Default::default() }; store.apply(&[delta]).unwrap(); diff --git a/src/state/redb/v2light.rs b/src/state/redb/v2light.rs index b5c8b41d..1c91a331 100644 --- a/src/state/redb/v2light.rs +++ b/src/state/redb/v2light.rs @@ -129,12 +129,7 @@ impl LedgerStore { let delta = LedgerDelta { produced_utxo: chunk.into_iter().collect(), - new_position: Default::default(), - undone_position: Default::default(), - consumed_utxo: Default::default(), - recovered_stxi: Default::default(), - undone_utxo: Default::default(), - new_pparams: Default::default(), + ..Default::default() }; tables::FilterIndexes::apply(&wx, &delta)?; diff --git a/src/sync/apply.rs b/src/sync/apply.rs index 8f72c7a0..5e56e19e 100644 --- a/src/sync/apply.rs +++ b/src/sync/apply.rs @@ -15,6 +15,7 @@ pub type UpstreamPort = gasket::messaging::InputPort; pub struct Stage { wal: crate::wal::redb::WalStore, ledger: crate::state::LedgerStore, + chain: crate::chain::ChainStore, genesis: Arc, mempool: crate::mempool::Mempool, // Add this line @@ -33,6 +34,7 @@ impl Stage { pub fn new( wal: crate::wal::redb::WalStore, ledger: crate::state::LedgerStore, + chain: crate::chain::ChainStore, mempool: crate::mempool::Mempool, genesis: Arc, max_ledger_history: Option, @@ -40,6 +42,7 @@ impl Stage { Self { wal, ledger, + chain, mempool, genesis, max_ledger_history, @@ -52,8 +55,9 @@ impl Stage { fn process_origin(&self) -> Result<(), WorkerError> { info!("applying origin"); - let delta = crate::ledger::compute_origin_delta(&self.genesis); - self.ledger.apply(&[delta]).or_panic()?; + let deltas = vec![crate::ledger::compute_origin_delta(&self.genesis)]; + self.ledger.apply(&deltas).or_panic()?; + self.chain.apply(&deltas).or_panic()?; Ok(()) } @@ -66,8 +70,9 @@ impl Stage { let block = MultiEraBlock::decode(body).or_panic()?; let context = crate::state::load_slice_for_block(&block, &self.ledger, &[]).or_panic()?; - let delta = crate::ledger::compute_undo_delta(&block, context).or_panic()?; - self.ledger.apply(&[delta]).or_panic()?; + let deltas = vec![crate::ledger::compute_undo_delta(&block, context).or_panic()?]; + self.ledger.apply(&deltas).or_panic()?; + self.chain.apply(&deltas).or_panic()?; self.mempool.undo_block(&block); @@ -81,8 +86,12 @@ impl Stage { let block = MultiEraBlock::decode(body).or_panic()?; - crate::state::apply_block_batch( - [&block], + let deltas = + crate::state::calculate_block_batch_deltas([&block], &self.ledger).or_panic()?; + + self.chain.apply(&deltas).or_panic()?; + crate::state::apply_delta_batch( + deltas, &self.ledger, &self.genesis, self.max_ledger_history, diff --git a/src/sync/mod.rs b/src/sync/mod.rs index 2e9cd6da..5052ab7a 100644 --- a/src/sync/mod.rs +++ b/src/sync/mod.rs @@ -1,3 +1,4 @@ +use crate::chain::ChainStore; use crate::ledger::pparams::Genesis; use crate::state::LedgerStore; use crate::wal::redb::WalStore; @@ -51,6 +52,7 @@ pub fn pipeline( storage: &StorageConfig, wal: WalStore, ledger: LedgerStore, + chain: ChainStore, genesis: Arc, mempool: Mempool, retries: &Option, @@ -69,6 +71,7 @@ pub fn pipeline( let mut apply = apply::Stage::new( wal.clone(), ledger, + chain, mempool.clone(), genesis, storage.max_ledger_history,