Skip to content

Commit

Permalink
Merge branch 'cuprated-blockchain' into cuprated-startup
Browse files Browse the repository at this point in the history
  • Loading branch information
Boog900 committed Oct 6, 2024
2 parents 1fa4099 + c87edf2 commit dc3a3c5
Show file tree
Hide file tree
Showing 21 changed files with 1,223 additions and 31 deletions.
10 changes: 8 additions & 2 deletions binaries/cuprated/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cuprate-p2p-core = { path = "../../p2p/p2p-core", features = ["serde"] }
cuprate-dandelion-tower = { path = "../../p2p/dandelion-tower" }
cuprate-async-buffer = { path = "../../p2p/async-buffer" }
cuprate-address-book = { path = "../../p2p/address-book", features = ["serde_config"] }
cuprate-blockchain = { path = "../../storage/blockchain", features = ["serde"] }
cuprate-blockchain = { path = "../../storage/blockchain", features = ["service", "serde"] }
cuprate-database-service = { path = "../../storage/service" }
cuprate-txpool = { path = "../../storage/txpool", features = ["serde"] }
cuprate-database = { path = "../../storage/database" }
Expand Down Expand Up @@ -70,8 +70,14 @@ tokio-util = { workspace = true }
tokio-stream = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
tracing-subscriber = { workspace = true, features = ["std", "fmt", "ansi"] }
tracing-subscriber = { workspace = true }
tracing = { workspace = true }

[lints]
workspace = true

[profile.dev]
panic = "abort"

[profile.release]
panic = "abort"
97 changes: 96 additions & 1 deletion binaries/cuprated/src/blockchain.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,101 @@
//! Blockchain
//!
//! Will contain the chain manager and syncer.
//! Contains the blockchain manager, syncer and an interface to mutate the blockchain.
use std::sync::Arc;

use futures::FutureExt;
use tokio::sync::{mpsc, Notify};
use tower::{BoxError, Service, ServiceExt};

use cuprate_blockchain::service::{BlockchainReadHandle, BlockchainWriteHandle};
use cuprate_consensus::{generate_genesis_block, BlockChainContextService, ContextConfig};
use cuprate_cryptonight::cryptonight_hash_v0;
use cuprate_p2p::{block_downloader::BlockDownloaderConfig, NetworkInterface};
use cuprate_p2p_core::{ClearNet, Network};
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainWriteRequest},
VerifiedBlockInformation,
};

use crate::constants::PANIC_CRITICAL_SERVICE_ERROR;

mod chain_service;
pub mod interface;
mod manager;
mod syncer;
mod types;

use types::{
ConcreteBlockVerifierService, ConcreteTxVerifierService, ConsensusBlockchainReadHandle,
};

/// Checks if the genesis block is in the blockchain and adds it if not.
pub async fn check_add_genesis(
blockchain_read_handle: &mut BlockchainReadHandle,
blockchain_write_handle: &mut BlockchainWriteHandle,
network: Network,
) {
// Try to get the chain height, will fail if the genesis block is not in the DB.
if blockchain_read_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainReadRequest::ChainHeight)
.await
.is_ok()
{
return;
}

let genesis = generate_genesis_block(network);

assert_eq!(genesis.miner_transaction.prefix().outputs.len(), 1);
assert!(genesis.transactions.is_empty());

blockchain_write_handle
.ready()
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
.call(BlockchainWriteRequest::WriteBlock(
VerifiedBlockInformation {
block_blob: genesis.serialize(),
txs: vec![],
block_hash: genesis.hash(),
pow_hash: cryptonight_hash_v0(&genesis.serialize_pow_hash()),
height: 0,
generated_coins: genesis.miner_transaction.prefix().outputs[0]
.amount
.unwrap(),
weight: genesis.miner_transaction.weight(),
long_term_weight: genesis.miner_transaction.weight(),
cumulative_difficulty: 1,
block: genesis,
},
))
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR);
}

/// Initializes the consensus services.
pub async fn init_consensus(
blockchain_read_handle: BlockchainReadHandle,
context_config: ContextConfig,
) -> Result<
(
ConcreteBlockVerifierService,
ConcreteTxVerifierService,
BlockChainContextService,
),
BoxError,
> {
let read_handle = ConsensusBlockchainReadHandle::new(blockchain_read_handle, BoxError::from);

let ctx_service =
cuprate_consensus::initialize_blockchain_context(context_config, read_handle.clone())
.await?;

let (block_verifier_svc, tx_verifier_svc) =
cuprate_consensus::initialize_verifier(read_handle, ctx_service.clone());

Ok((block_verifier_svc, tx_verifier_svc, ctx_service))
}
72 changes: 72 additions & 0 deletions binaries/cuprated/src/blockchain/chain_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::task::{Context, Poll};

use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use tower::Service;

use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_p2p::block_downloader::{ChainSvcRequest, ChainSvcResponse};
use cuprate_types::blockchain::{BlockchainReadRequest, BlockchainResponse};

/// That service that allows retrieving the chain state to give to the P2P crates, so we can figure out
/// what blocks we need.
///
/// This has a more minimal interface than [`BlockchainReadRequest`] to make using the p2p crates easier.
#[derive(Clone)]
pub struct ChainService(pub BlockchainReadHandle);

impl Service<ChainSvcRequest> for ChainService {
type Response = ChainSvcResponse;
type Error = tower::BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: ChainSvcRequest) -> Self::Future {
let map_res = |res: BlockchainResponse| match res {
BlockchainResponse::CompactChainHistory {
block_ids,
cumulative_difficulty,
} => ChainSvcResponse::CompactHistory {
block_ids,
cumulative_difficulty,
},
BlockchainResponse::FindFirstUnknown(res) => ChainSvcResponse::FindFirstUnknown(res),
_ => unreachable!(),
};

match req {
ChainSvcRequest::CompactHistory => self
.0
.call(BlockchainReadRequest::CompactChainHistory)
.map_ok(map_res)
.map_err(Into::into)
.boxed(),
ChainSvcRequest::FindFirstUnknown(req) => self
.0
.call(BlockchainReadRequest::FindFirstUnknown(req))
.map_ok(map_res)
.map_err(Into::into)
.boxed(),
ChainSvcRequest::CumulativeDifficulty => self
.0
.call(BlockchainReadRequest::CompactChainHistory)
.map_ok(|res| {
// TODO create a custom request instead of hijacking this one.
// TODO: use the context cache.
let BlockchainResponse::CompactChainHistory {
cumulative_difficulty,
..
} = res
else {
unreachable!()
};

ChainSvcResponse::CumulativeDifficulty(cumulative_difficulty)
})
.map_err(Into::into)
.boxed(),
}
}
}
161 changes: 161 additions & 0 deletions binaries/cuprated/src/blockchain/interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
//! The blockchain manager interface.
//!
//! This module contains all the functions to mutate the blockchain's state in any way, through the
//! blockchain manager.
use std::{
collections::{HashMap, HashSet},
sync::{LazyLock, Mutex, OnceLock},
};

use monero_serai::{block::Block, transaction::Transaction};
use rayon::prelude::*;
use tokio::sync::{mpsc, oneshot};
use tower::{Service, ServiceExt};

use cuprate_blockchain::service::BlockchainReadHandle;
use cuprate_consensus::transactions::new_tx_verification_data;
use cuprate_helper::cast::usize_to_u64;
use cuprate_types::{
blockchain::{BlockchainReadRequest, BlockchainResponse},
Chain,
};

use crate::{
blockchain::manager::{BlockchainManagerCommand, IncomingBlockOk},
constants::PANIC_CRITICAL_SERVICE_ERROR,
};

/// The channel used to send [`BlockchainManagerCommand`]s to the blockchain manager.
///
/// This channel is initialized in [`init_blockchain_manager`](super::manager::init_blockchain_manager), the functions
/// in this file document what happens if this is not initialized when they are called.
pub(super) static COMMAND_TX: OnceLock<mpsc::Sender<BlockchainManagerCommand>> = OnceLock::new();

/// An error that can be returned from [`handle_incoming_block`].
#[derive(Debug, thiserror::Error)]
pub enum IncomingBlockError {
/// Some transactions in the block were unknown.
///
/// The inner values are the block hash and the indexes of the missing txs in the block.
#[error("Unknown transactions in block.")]
UnknownTransactions([u8; 32], Vec<u64>),
/// We are missing the block's parent.
#[error("The block has an unknown parent.")]
Orphan,
/// The block was invalid.
#[error(transparent)]
InvalidBlock(anyhow::Error),
}

/// Try to add a new block to the blockchain.
///
/// On success returns [`IncomingBlockOk`].
///
/// # Errors
///
/// This function will return an error if:
/// - the block was invalid
/// - we are missing transactions
/// - the block's parent is unknown
pub async fn handle_incoming_block(
block: Block,
given_txs: Vec<Transaction>,
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<IncomingBlockOk, IncomingBlockError> {
/// A [`HashSet`] of block hashes that the blockchain manager is currently handling.
///
/// This lock prevents sending the same block to the blockchain manager from multiple connections
/// before one of them actually gets added to the chain, allowing peers to do other things.
///
/// This is used over something like a dashmap as we expect a lot of collisions in a short amount of
/// time for new blocks, so we would lose the benefit of sharded locks. A dashmap is made up of `RwLocks`
/// which are also more expensive than `Mutex`s.
static BLOCKS_BEING_HANDLED: LazyLock<Mutex<HashSet<[u8; 32]>>> =
LazyLock::new(|| Mutex::new(HashSet::new()));
// FIXME: we should look in the tx-pool for txs when that is ready.

if !block_exists(block.header.previous, blockchain_read_handle)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
return Err(IncomingBlockError::Orphan);
}

let block_hash = block.hash();

if block_exists(block_hash, blockchain_read_handle)
.await
.expect(PANIC_CRITICAL_SERVICE_ERROR)
{
return Ok(IncomingBlockOk::AlreadyHave);
}

// TODO: remove this when we have a working tx-pool.
if given_txs.len() != block.transactions.len() {
return Err(IncomingBlockError::UnknownTransactions(
block_hash,
(0..usize_to_u64(block.transactions.len())).collect(),
));
}

// TODO: check we actually got given the right txs.
let prepped_txs = given_txs
.into_par_iter()
.map(|tx| {
let tx = new_tx_verification_data(tx)?;
Ok((tx.tx_hash, tx))
})
.collect::<Result<_, anyhow::Error>>()
.map_err(IncomingBlockError::InvalidBlock)?;

let Some(incoming_block_tx) = COMMAND_TX.get() else {
// We could still be starting up the blockchain manager.
return Ok(IncomingBlockOk::NotReady);
};

// Add the blocks hash to the blocks being handled.
if !BLOCKS_BEING_HANDLED.lock().unwrap().insert(block_hash) {
// If another place is already adding this block then we can stop.
return Ok(IncomingBlockOk::AlreadyHave);
}

// From this point on we MUST not early return without removing the block hash from `BLOCKS_BEING_HANDLED`.

let (response_tx, response_rx) = oneshot::channel();

incoming_block_tx
.send(BlockchainManagerCommand::AddBlock {
block,
prepped_txs,
response_tx,
})
.await
.expect("TODO: don't actually panic here, an err means we are shutting down");

let res = response_rx
.await
.expect("The blockchain manager will always respond")
.map_err(IncomingBlockError::InvalidBlock);

// Remove the block hash from the blocks being handled.
BLOCKS_BEING_HANDLED.lock().unwrap().remove(&block_hash);

res
}

/// Check if we have a block with the given hash.
async fn block_exists(
block_hash: [u8; 32],
blockchain_read_handle: &mut BlockchainReadHandle,
) -> Result<bool, anyhow::Error> {
let BlockchainResponse::FindBlock(chain) = blockchain_read_handle
.ready()
.await?
.call(BlockchainReadRequest::FindBlock(block_hash))
.await?
else {
unreachable!();
};

Ok(chain.is_some())
}
Loading

0 comments on commit dc3a3c5

Please sign in to comment.