diff --git a/CHANGELOG.md b/CHANGELOG.md index dea8996f90f..af4106660e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,20 @@ All notable changes to Zebra are documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org). +## [Zebra 1.6.0](https://github.com/ZcashFoundation/zebra/releases/tag/v1.6.0) - TODO: 2024-01-?? + +This release: +- TODO: summary of other important changes +- adds an experimental `internal-miner` feature, which mines blocks within `zebrad`. This feature + is only supported on testnet. Use a more efficient GPU or ASIC for mainnet mining. + +TODO: the rest of the changelog + + ## [Zebra 1.5.0](https://github.com/ZcashFoundation/zebra/releases/tag/v1.5.0) - 2023-11-28 This release: -- fixes a panic that was introduced in Zebra v1.4.0, which happens in rare circumstances when reading cached sprout or history trees. +- fixes a panic that was introduced in Zebra v1.4.0, which happens in rare circumstances when reading cached sprout or history trees. - further improves how Zebra recovers from network interruptions and prevents potential network hangs. - limits the ability of synthetic nodes to spread throughout the network through Zebra to address some of the Ziggurat red team report. diff --git a/Cargo.lock b/Cargo.lock index a83e74c1f3c..e23547a956a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1346,6 +1346,16 @@ dependencies = [ "byteorder", ] +[[package]] +name = "equihash" +version = "0.2.0" +source = "git+https://github.com/ZcashFoundation/librustzcash.git?branch=equihash-solver-tromp#251098313920466958fcd05b25e151d4edd3a1b1" +dependencies = [ + "blake2b_simd", + "byteorder", + "cc", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -4353,6 +4363,20 @@ dependencies = [ "syn 2.0.40", ] +[[package]] +name = "thread-priority" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b72cb4958060ee2d9540cef68bb3871fd1e547037772c7fe7650d5d1cbec53b3" +dependencies = [ + "bitflags 1.3.2", + "cfg-if 1.0.0", + "libc", + "log", + "rustversion", + "winapi", +] + [[package]] name = "thread_local" version = "1.1.7" @@ -5619,7 +5643,7 @@ dependencies = [ "blake2s_simd", "bls12_381", "byteorder", - "equihash", + "equihash 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "ff", "fpe", "group", @@ -5717,7 +5741,8 @@ dependencies = [ "criterion", "displaydoc", "ed25519-zebra", - "equihash", + "equihash 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", + "equihash 0.2.0 (git+https://github.com/ZcashFoundation/librustzcash.git?branch=equihash-solver-tromp)", "futures", "group", "halo2_proofs", @@ -5873,7 +5898,6 @@ dependencies = [ "jsonrpc-core", "jsonrpc-derive", "jsonrpc-http-server", - "num_cpus", "proptest", "rand 0.8.5", "serde", @@ -6072,6 +6096,7 @@ dependencies = [ "serde_json", "tempfile", "thiserror", + "thread-priority", "tinyvec", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index e2c5e03372a..05a7d6ece9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,12 @@ opt-level = 3 [profile.dev.package.bls12_381] opt-level = 3 +[profile.dev.package.byteorder] +opt-level = 3 + +[profile.dev.package.equihash] +opt-level = 3 + [profile.dev.package.zcash_proofs] opt-level = 3 diff --git a/deny.toml b/deny.toml index 428084835be..4fc06769919 100644 --- a/deny.toml +++ b/deny.toml @@ -86,6 +86,11 @@ skip-tree = [ # wait for hdwallet to upgrade { name = "ring", version = "=0.16.20" }, + # wait for the equihash/solver feature to merge + # https://github.com/zcash/librustzcash/pull/1083 + # https://github.com/zcash/librustzcash/pull/1088 + { name = "equihash", version = "=0.2.0" }, + # zebra-utils dependencies # wait for structopt upgrade (or upgrade to clap 4) @@ -137,6 +142,10 @@ unknown-git = "deny" allow-registry = ["https://github.com/rust-lang/crates.io-index"] # List of URLs for allowed Git repositories allow-git = [ + # TODO: remove this after the equihash solver branch is merged and released. + # + # "cargo deny" will log a warning in builds without the internal-miner feature. That's ok. + "https://github.com/ZcashFoundation/librustzcash.git" ] [sources.allow-org] diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index a68ab42581c..7b6d716ce78 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -29,11 +29,19 @@ async-error = [ "tokio", ] -# Experimental mining RPC support +# Mining RPC support getblocktemplate-rpcs = [ "zcash_address", ] +# Experimental internal miner support +internal-miner = [ + # TODO: replace with "equihash/solver" when that feature is merged and released: + # https://github.com/zcash/librustzcash/pull/1083 + # https://github.com/zcash/librustzcash/pull/1088 + "equihash-solver", +] + # Experimental elasticsearch support elasticsearch = [] @@ -61,7 +69,21 @@ blake2s_simd = "1.0.2" bridgetree = "0.4.0" bs58 = { version = "0.5.0", features = ["check"] } byteorder = "1.5.0" + equihash = "0.2.0" +# Experimental internal miner support +# +# TODO: remove "equihash-solver" when the "equihash/solver" feature is merged and released: +# https://github.com/zcash/librustzcash/pull/1083 +# https://github.com/zcash/librustzcash/pull/1088 +# +# Use the solver PR: +# - latest: branch = "equihash-solver-tromp", +# - crashing with double-frees: rev = "da26c34772f4922eb13b4a1e7d88a969bbcf6a91", +equihash-solver = { version = "0.2.0", git = "https://github.com/ZcashFoundation/librustzcash.git", branch = "equihash-solver-tromp", features = ["solver"], package = "equihash", optional = true } +# or during development, use the locally checked out and modified version of equihash: +#equihash-solver = { version = "0.2.0", path = "../../librustzcash/components/equihash", features = ["solver"], package = "equihash", optional = true } + group = "0.13.0" incrementalmerkletree = "0.5.0" jubjub = "0.10.0" diff --git a/zebra-chain/src/block/header.rs b/zebra-chain/src/block/header.rs index c59d3972ff5..1bbec3b471c 100644 --- a/zebra-chain/src/block/header.rs +++ b/zebra-chain/src/block/header.rs @@ -123,6 +123,11 @@ impl Header { ))? } } + + /// Compute the hash of this header. + pub fn hash(&self) -> Hash { + Hash::from(self) + } } /// A header with a count of the number of transactions in its block. diff --git a/zebra-chain/src/primitives.rs b/zebra-chain/src/primitives.rs index 9b5056bc620..d074463286e 100644 --- a/zebra-chain/src/primitives.rs +++ b/zebra-chain/src/primitives.rs @@ -12,6 +12,8 @@ mod address; #[cfg(feature = "getblocktemplate-rpcs")] pub use address::Address; +pub mod byte_array; + pub use ed25519_zebra as ed25519; pub use reddsa; pub use redjubjub; diff --git a/zebra-chain/src/primitives/byte_array.rs b/zebra-chain/src/primitives/byte_array.rs new file mode 100644 index 00000000000..7484864e8d0 --- /dev/null +++ b/zebra-chain/src/primitives/byte_array.rs @@ -0,0 +1,14 @@ +//! Functions for modifying byte arrays. + +/// Increments `byte_array` by 1, interpreting it as a big-endian integer. +/// If the big-endian integer overflowed, sets all the bytes to zero, and returns `true`. +pub fn increment_big_endian(byte_array: &mut [u8]) -> bool { + // Increment the last byte in the array that is less than u8::MAX, and clear any bytes after it + // to increment the next value in big-endian (lexicographic) order. + let is_wrapped_overflow = byte_array.iter_mut().rev().all(|v| { + *v = v.wrapping_add(1); + v == &0 + }); + + is_wrapped_overflow +} diff --git a/zebra-chain/src/work/equihash.rs b/zebra-chain/src/work/equihash.rs index f65438a5314..18903c935f1 100644 --- a/zebra-chain/src/work/equihash.rs +++ b/zebra-chain/src/work/equihash.rs @@ -12,16 +12,24 @@ use crate::{ }, }; -/// The error type for Equihash +#[cfg(feature = "internal-miner")] +use crate::serialization::AtLeastOne; + +/// The error type for Equihash validation. #[non_exhaustive] #[derive(Debug, thiserror::Error)] #[error("invalid equihash solution for BlockHeader")] pub struct Error(#[from] equihash::Error); +/// The error type for Equihash solving. +#[derive(Copy, Clone, Debug, Eq, PartialEq, thiserror::Error)] +#[error("solver was cancelled")] +pub struct SolverCancelled; + /// The size of an Equihash solution in bytes (always 1344). pub(crate) const SOLUTION_SIZE: usize = 1344; -/// Equihash Solution. +/// Equihash Solution in compressed format. /// /// A wrapper around [u8; 1344] because Rust doesn't implement common /// traits like `Debug`, `Clone`, etc for collections like array @@ -53,6 +61,8 @@ impl Solution { .zcash_serialize(&mut input) .expect("serialization into a vec can't fail"); + // The part of the header before the nonce and solution. + // This data is kept constant during solver runs, so the verifier API takes it separately. let input = &input[0..Solution::INPUT_LENGTH]; equihash::is_valid_solution(n, k, input, nonce.as_ref(), solution)?; @@ -60,11 +70,129 @@ impl Solution { Ok(()) } - #[cfg(feature = "getblocktemplate-rpcs")] + /// Returns a [`Solution`] containing the bytes from `solution`. + /// Returns an error if `solution` is the wrong length. + pub fn from_bytes(solution: &[u8]) -> Result { + if solution.len() != SOLUTION_SIZE { + return Err(SerializationError::Parse( + "incorrect equihash solution size", + )); + } + + let mut bytes = [0; SOLUTION_SIZE]; + // Won't panic, because we just checked the length. + bytes.copy_from_slice(solution); + + Ok(Self(bytes)) + } + /// Returns a [`Solution`] of `[0; SOLUTION_SIZE]` to be used in block proposals. + #[cfg(feature = "getblocktemplate-rpcs")] pub fn for_proposal() -> Self { Self([0; SOLUTION_SIZE]) } + + /// Mines and returns one or more [`Solution`]s based on a template `header`. + /// The returned header contains a valid `nonce` and `solution`. + /// + /// If `cancel_fn()` returns an error, returns early with `Err(SolverCancelled)`. + /// + /// The `nonce` in the header template is taken as the starting nonce. If you are running multiple + /// solvers at the same time, start them with different nonces. + /// The `solution` in the header template is ignored. + /// + /// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core while running. + /// It can run for minutes or hours if the network difficulty is high. + #[cfg(feature = "internal-miner")] + #[allow(clippy::unwrap_in_result)] + pub fn solve( + mut header: Header, + mut cancel_fn: F, + ) -> Result, SolverCancelled> + where + F: FnMut() -> Result<(), SolverCancelled>, + { + use crate::shutdown::is_shutting_down; + + let mut input = Vec::new(); + header + .zcash_serialize(&mut input) + .expect("serialization into a vec can't fail"); + // Take the part of the header before the nonce and solution. + // This data is kept constant for this solver run. + let input = &input[0..Solution::INPUT_LENGTH]; + + while !is_shutting_down() { + // Don't run the solver if we'd just cancel it anyway. + cancel_fn()?; + + let solutions = equihash_solver::tromp::solve_200_9_compressed(input, || { + // Cancel the solver if we have a new template. + if cancel_fn().is_err() { + return None; + } + + // This skips the first nonce, which doesn't matter in practice. + Self::next_nonce(&mut header.nonce); + Some(*header.nonce) + }); + + let mut valid_solutions = Vec::new(); + + // If we got any solutions, try submitting them, because the new template might just + // contain some extra transactions. Mining extra transactions is optional. + for solution in &solutions { + header.solution = Self::from_bytes(solution) + .expect("unexpected invalid solution: incorrect length"); + + // TODO: work out why we sometimes get invalid solutions here + if let Err(error) = header.solution.check(&header) { + info!(?error, "found invalid solution for header"); + continue; + } + + if Self::difficulty_is_valid(&header) { + valid_solutions.push(header); + } + } + + match valid_solutions.try_into() { + Ok(at_least_one_solution) => return Ok(at_least_one_solution), + Err(_is_empty_error) => debug!( + solutions = ?solutions.len(), + "found valid solutions which did not pass the validity or difficulty checks" + ), + } + } + + Err(SolverCancelled) + } + + /// Modifies `nonce` to be the next integer in big-endian order. + /// Wraps to zero if the next nonce would overflow. + #[cfg(feature = "internal-miner")] + fn next_nonce(nonce: &mut [u8; 32]) { + let _ignore_overflow = crate::primitives::byte_array::increment_big_endian(&mut nonce[..]); + } + + /// Returns `true` if the `nonce` and `solution` in `header` meet the difficulty threshold. + /// + /// Assumes that the difficulty threshold in the header is valid. + #[cfg(feature = "internal-miner")] + fn difficulty_is_valid(header: &Header) -> bool { + // Simplified from zebra_consensus::block::check::difficulty_is_valid(). + let difficulty_threshold = header + .difficulty_threshold + .to_expanded() + .expect("unexpected invalid header template: invalid difficulty threshold"); + + // TODO: avoid calculating this hash multiple times + let hash = header.hash(); + + // Note: this comparison is a u256 integer comparison, like zcashd and bitcoin. Greater + // values represent *less* work. + hash <= difficulty_threshold + } } impl PartialEq for Solution { @@ -109,17 +237,6 @@ impl ZcashSerialize for Solution { impl ZcashDeserialize for Solution { fn zcash_deserialize(mut reader: R) -> Result { let solution: Vec = (&mut reader).zcash_deserialize_into()?; - - if solution.len() != SOLUTION_SIZE { - return Err(SerializationError::Parse( - "incorrect equihash solution size", - )); - } - - let mut bytes = [0; SOLUTION_SIZE]; - // Won't panic, because we just checked the length. - bytes.copy_from_slice(&solution); - - Ok(Self(bytes)) + Self::from_bytes(&solution) } } diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index c230921c34e..9782b35e896 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -24,7 +24,7 @@ progress-bar = [ "zebra-state/progress-bar", ] -# Experimental mining RPC support +# Mining RPC support getblocktemplate-rpcs = [ "zebra-state/getblocktemplate-rpcs", "zebra-node-services/getblocktemplate-rpcs", diff --git a/zebra-node-services/Cargo.toml b/zebra-node-services/Cargo.toml index d4fafd956fb..a32fc55a6a2 100644 --- a/zebra-node-services/Cargo.toml +++ b/zebra-node-services/Cargo.toml @@ -19,7 +19,7 @@ default = [] # Production features that activate extra dependencies, or extra features in dependencies -# Experimental mining RPC support +# Mining RPC support getblocktemplate-rpcs = [ "zebra-chain/getblocktemplate-rpcs", ] diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index 95405ec29d3..cff2f724c0e 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -19,7 +19,7 @@ default = [] # Production features that activate extra dependencies, or extra features in dependencies -# Experimental mining RPC support +# Mining RPC support getblocktemplate-rpcs = [ "rand", "zcash_address", @@ -29,6 +29,9 @@ getblocktemplate-rpcs = [ "zebra-chain/getblocktemplate-rpcs", ] +# Experimental internal miner support +internal-miner = [] + # Test-only features proptest-impl = [ "proptest", @@ -48,7 +51,6 @@ hyper = { version = "0.14.28", features = ["http1", "server"] } jsonrpc-core = "18.0.0" jsonrpc-derive = "18.0.0" jsonrpc-http-server = "18.0.0" -num_cpus = "1.16.0" # zebra-rpc needs the preserve_order feature in serde_json, which is a dependency of jsonrpc-core serde_json = { version = "1.0.108", features = ["preserve_order"] } diff --git a/zebra-rpc/src/config.rs b/zebra-rpc/src/config.rs index b6bbf5196ef..3f74ead07db 100644 --- a/zebra-rpc/src/config.rs +++ b/zebra-rpc/src/config.rs @@ -36,17 +36,17 @@ pub struct Config { /// State queries are run concurrently using the shared thread pool controlled by /// the [`SyncSection.parallel_cpu_threads`](https://docs.rs/zebrad/latest/zebrad/components/sync/struct.Config.html#structfield.parallel_cpu_threads) config. /// - /// We recommend setting both configs to `0` (automatic scaling) for the best performance. - /// This uses one thread per available CPU core. + /// If the number of threads is not configured or zero, Zebra uses the number of logical cores. + /// If the number of logical cores can't be detected, Zebra uses one thread. /// - /// Set to `1` by default, which runs all RPC queries on a single thread, and detects RPC - /// port conflicts from multiple Zebra or `zcashd` instances. + /// Set to `1` to run all RPC queries on a single thread, and detect RPC port conflicts from + /// multiple Zebra or `zcashd` instances. /// /// For details, see [the `jsonrpc_http_server` documentation](https://docs.rs/jsonrpc-http-server/latest/jsonrpc_http_server/struct.ServerBuilder.html#method.threads). /// /// ## Warning /// - /// Changing this config disables RPC port conflict detection. + /// The default config uses multiple threads, which disables RPC port conflict detection. /// This can allow multiple Zebra instances to share the same RPC port. /// /// If some of those instances are outdated or failed, RPC queries can be slow or inconsistent. diff --git a/zebra-rpc/src/config/mining.rs b/zebra-rpc/src/config/mining.rs index 1a27baa3646..65cea3bbda7 100644 --- a/zebra-rpc/src/config/mining.rs +++ b/zebra-rpc/src/config/mining.rs @@ -15,6 +15,28 @@ pub struct Config { /// `getblocktemplate` RPC coinbase transaction. pub miner_address: Option, + /// Mine blocks using Zebra's internal miner, without an external mining pool or equihash solver. + /// + /// This experimental feature is only supported on testnet. + /// Mainnet miners should use a mining pool with GPUs or ASICs designed for efficient mining. + /// + /// The internal miner is off by default. + #[cfg(feature = "internal-miner")] + pub internal_miner: bool, + + /// The number of internal miner threads used by Zebra. + /// These threads are scheduled at low priority. + /// + /// The number of threads is limited by the available parallelism reported by the OS. + /// If the number of threads isn't configured, or can't be detected, Zebra uses one thread. + /// This is different from Zebra's other parallelism configs, because mining runs constantly and + /// uses a large amount of memory. (144 MB of RAM and 100% of a core per thread.) + /// + /// If the number of threads is set to zero, Zebra disables mining. + /// This matches `zcashd`'s behaviour, but is different from Zebra's other parallelism configs. + #[cfg(feature = "internal-miner")] + pub internal_miner_threads: usize, + /// Extra data to include in coinbase transaction inputs. /// Limited to around 95 bytes by the consensus rules. /// @@ -36,6 +58,12 @@ impl Default for Config { // TODO: do we want to default to v5 transactions and Zebra coinbase data? extra_coinbase_data: None, debug_like_zcashd: true, + // TODO: ignore and warn rather than panicking if these fields are in the config, + // but the feature isn't enabled. + #[cfg(feature = "internal-miner")] + internal_miner: false, + #[cfg(feature = "internal-miner")] + internal_miner_threads: 1, } } } @@ -48,4 +76,10 @@ impl Config { pub fn skip_getblocktemplate(&self) -> bool { !cfg!(feature = "getblocktemplate-rpcs") } + + /// Is the internal miner enabled using at least one thread? + #[cfg(feature = "internal-miner")] + pub fn is_internal_miner_enabled(&self) -> bool { + self.internal_miner && self.internal_miner_threads > 0 + } } diff --git a/zebra-rpc/src/methods.rs b/zebra-rpc/src/methods.rs index ea44310f3ce..9fc190a9a64 100644 --- a/zebra-rpc/src/methods.rs +++ b/zebra-rpc/src/methods.rs @@ -6,7 +6,7 @@ //! Some parts of the `zcashd` RPC documentation are outdated. //! So this implementation follows the `zcashd` server and `lightwalletd` client implementations. -use std::{collections::HashSet, sync::Arc}; +use std::{collections::HashSet, fmt::Debug, sync::Arc}; use chrono::Utc; use futures::{FutureExt, TryFutureExt}; @@ -15,7 +15,7 @@ use indexmap::IndexMap; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; use tokio::{sync::broadcast, task::JoinHandle}; -use tower::{buffer::Buffer, Service, ServiceExt}; +use tower::{Service, ServiceExt}; use tracing::Instrument; use zebra_chain::{ @@ -268,19 +268,28 @@ pub trait Rpc { } /// RPC method implementations. +#[derive(Clone)] pub struct RpcImpl where Mempool: Service< - mempool::Request, - Response = mempool::Response, - Error = zebra_node_services::BoxError, - >, + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, State: Service< - zebra_state::ReadRequest, - Response = zebra_state::ReadResponse, - Error = zebra_state::BoxError, - >, - Tip: ChainTip, + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, { // Configuration // @@ -304,7 +313,7 @@ where // Services // /// A handle to the mempool service. - mempool: Buffer, + mempool: Mempool, /// A handle to the state service. state: State, @@ -318,13 +327,51 @@ where queue_sender: broadcast::Sender, } +impl Debug for RpcImpl +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + State::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Skip fields without Debug impls, and skip channels + f.debug_struct("RpcImpl") + .field("build_version", &self.build_version) + .field("user_agent", &self.user_agent) + .field("network", &self.network) + .field("debug_force_finished_sync", &self.debug_force_finished_sync) + .field("debug_like_zcashd", &self.debug_like_zcashd) + .finish() + } +} + impl RpcImpl where Mempool: Service< mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, - > + 'static, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, State: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, @@ -333,6 +380,7 @@ where + Send + Sync + 'static, + State::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, { /// Create a new instance of the RPC handler. @@ -346,15 +394,13 @@ where network: Network, debug_force_finished_sync: bool, debug_like_zcashd: bool, - mempool: Buffer, + mempool: Mempool, state: State, latest_chain_tip: Tip, ) -> (Self, JoinHandle<()>) where VersionString: ToString + Clone + Send + 'static, UserAgentString: ToString + Clone + Send + 'static, - >::Future: Send, - >::Future: Send, { let (runner, queue_sender) = Queue::start(); @@ -391,11 +437,14 @@ where impl Rpc for RpcImpl where - Mempool: tower::Service< + Mempool: Service< mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, - > + 'static, + > + Clone + + Send + + Sync + + 'static, Mempool::Future: Send, State: Service< zebra_state::ReadRequest, diff --git a/zebra-rpc/src/methods/get_block_template_rpcs.rs b/zebra-rpc/src/methods/get_block_template_rpcs.rs index e99258ad78f..7d0550c0598 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs.rs @@ -1,11 +1,11 @@ //! RPC methods related to mining only available with `getblocktemplate-rpcs` rust feature. -use std::{sync::Arc, time::Duration}; +use std::{fmt::Debug, sync::Arc, time::Duration}; use futures::{future::OptionFuture, FutureExt, TryFutureExt}; use jsonrpc_core::{self, BoxFuture, Error, ErrorCode, Result}; use jsonrpc_derive::rpc; -use tower::{buffer::Buffer, Service, ServiceExt}; +use tower::{Service, ServiceExt}; use zcash_address::{self, unified::Encoding, TryFromAddress}; @@ -223,6 +223,7 @@ pub trait GetBlockTemplateRpc { } /// RPC method implementations. +#[derive(Clone)] pub struct GetBlockTemplateRpcImpl< Mempool, State, @@ -232,22 +233,32 @@ pub struct GetBlockTemplateRpcImpl< AddressBook, > where Mempool: Service< - mempool::Request, - Response = mempool::Response, - Error = zebra_node_services::BoxError, - >, + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, State: Service< - zebra_state::ReadRequest, - Response = zebra_state::ReadResponse, - Error = zebra_state::BoxError, - >, + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, BlockVerifierRouter: Service + Clone + Send + Sync + 'static, + >::Future: Send, SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, - AddressBook: AddressBookPeers, + AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { // Configuration // @@ -270,7 +281,7 @@ pub struct GetBlockTemplateRpcImpl< // Services // /// A handle to the mempool service. - mempool: Buffer, + mempool: Mempool, /// A handle to the state service. state: State, @@ -288,6 +299,48 @@ pub struct GetBlockTemplateRpcImpl< address_book: AddressBook, } +impl Debug + for GetBlockTemplateRpcImpl +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, + BlockVerifierRouter: Service + + Clone + + Send + + Sync + + 'static, + >::Future: Send, + SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, + AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // Skip fields without debug impls + f.debug_struct("GetBlockTemplateRpcImpl") + .field("network", &self.network) + .field("miner_address", &self.miner_address) + .field("extra_coinbase_data", &self.extra_coinbase_data) + .field("debug_like_zcashd", &self.debug_like_zcashd) + .finish() + } +} + impl GetBlockTemplateRpcImpl where @@ -295,7 +348,11 @@ where mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, - > + 'static, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, State: Service< zebra_state::ReadRequest, Response = zebra_state::ReadResponse, @@ -304,12 +361,14 @@ where + Send + Sync + 'static, + >::Future: Send, Tip: ChainTip + Clone + Send + Sync + 'static, BlockVerifierRouter: Service + Clone + Send + Sync + 'static, + >::Future: Send, SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, { @@ -322,13 +381,24 @@ where pub fn new( network: Network, mining_config: crate::config::mining::Config, - mempool: Buffer, + mempool: Mempool, state: State, latest_chain_tip: Tip, block_verifier_router: BlockVerifierRouter, sync_status: SyncStatus, address_book: AddressBook, ) -> Self { + // Prevent loss of miner funds due to an unsupported or incorrect address type. + if let Some(miner_address) = mining_config.miner_address { + assert_eq!( + miner_address.network(), + network, + "incorrect miner address config: {miner_address} \ + network.network {network} and miner address network {} must match", + miner_address.network(), + ); + } + // A limit on the configured extra coinbase data, regardless of the current block height. // This is different from the consensus rule, which limits the total height + data. const EXTRA_COINBASE_DATA_LIMIT: usize = @@ -378,7 +448,10 @@ where mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, - > + 'static, + > + Clone + + Send + + Sync + + 'static, Mempool::Future: Send, State: Service< zebra_state::ReadRequest, @@ -473,9 +546,7 @@ where async move { get_block_template::check_parameters(¶meters)?; - let client_long_poll_id = parameters - .as_ref() - .and_then(|params| params.long_poll_id.clone()); + let client_long_poll_id = parameters.as_ref().and_then(|params| params.long_poll_id); // - One-off checks @@ -498,6 +569,10 @@ where // - add `async changed()` method to ChainSyncStatus (like `ChainTip`) check_synced_to_tip(network, latest_chain_tip.clone(), sync_status.clone())?; + // TODO: return an error if we have no peers, like `zcashd` does, + // and add a developer config that mines regardless of how many peers we have. + // https://github.com/zcash/zcash/blob/6fdd9f1b81d3b228326c9826fa10696fc516444b/src/miner.cpp#L865-L880 + // We're just about to fetch state data, then maybe wait for any changes. // Mark all the changes before the fetch as seen. // Changes are also ignored in any clones made after the mark. @@ -585,7 +660,9 @@ where )); // Return immediately if the chain tip has changed. - let wait_for_best_tip_change = latest_chain_tip.best_tip_changed(); + // The clone preserves the seen status of the chain tip. + let mut wait_for_best_tip_change = latest_chain_tip.clone(); + let wait_for_best_tip_change = wait_for_best_tip_change.best_tip_changed(); // Wait for the maximum block time to elapse. This can change the block header // on testnet. (On mainnet it can happen due to a network disconnection, or a @@ -612,7 +689,6 @@ where // But the coinbase value depends on the selected transactions, so this needs // further analysis to check if it actually saves us any time. - // TODO: change logging to debug after testing tokio::select! { // Poll the futures in the listed order, for efficiency. // We put the most frequent conditions first. @@ -620,7 +696,7 @@ where // This timer elapses every few seconds _elapsed = wait_for_mempool_request => { - tracing::info!( + tracing::debug!( ?max_time, ?cur_time, ?server_long_poll_id, @@ -634,7 +710,32 @@ where tip_changed_result = wait_for_best_tip_change => { match tip_changed_result { Ok(()) => { - tracing::info!( + // Spurious updates shouldn't happen in the state, because the + // difficulty and hash ordering is a stable total order. But + // since they could cause a busy-loop, guard against them here. + latest_chain_tip.mark_best_tip_seen(); + + let new_tip_hash = latest_chain_tip.best_tip_hash(); + if new_tip_hash == Some(tip_hash) { + tracing::debug!( + ?max_time, + ?cur_time, + ?server_long_poll_id, + ?client_long_poll_id, + ?tip_hash, + ?tip_height, + "ignoring spurious state change notification" + ); + + // Wait for the mempool interval, then check for any changes. + tokio::time::sleep(Duration::from_secs( + GET_BLOCK_TEMPLATE_MEMPOOL_LONG_POLL_INTERVAL, + )).await; + + continue; + } + + tracing::debug!( ?max_time, ?cur_time, ?server_long_poll_id, @@ -644,8 +745,7 @@ where } Err(recv_error) => { - // This log should stay at info when the others go to debug, - // it will help with debugging. + // This log is rare and helps with debugging, so it's ok to be info. tracing::info!( ?recv_error, ?max_time, @@ -668,8 +768,7 @@ where // The max time does not elapse during normal operation on mainnet, // and it rarely elapses on testnet. Some(_elapsed) = wait_for_max_time => { - // This log should stay at info when the others go to debug, - // it's very rare. + // This log is very rare so it's ok to be info. tracing::info!( ?max_time, ?cur_time, diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs index 617b80080c2..92ab9d00593 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types/get_block_template.rs @@ -346,3 +346,21 @@ pub enum Response { /// `getblocktemplate` RPC request in proposal mode. ProposalMode(ProposalResponse), } + +impl Response { + /// Returns the inner template, if the response is in template mode. + pub fn try_into_template(self) -> Option { + match self { + Response::TemplateMode(template) => Some(*template), + Response::ProposalMode(_) => None, + } + } + + /// Returns the inner proposal, if the response is in proposal mode. + pub fn try_into_proposal(self) -> Option { + match self { + Response::TemplateMode(_) => None, + Response::ProposalMode(proposal) => Some(proposal), + } + } +} diff --git a/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs b/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs index 73ea1c015af..9f9a316166a 100644 --- a/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs +++ b/zebra-rpc/src/methods/get_block_template_rpcs/types/long_poll.rs @@ -113,7 +113,7 @@ impl LongPollInput { /// /// `zcashd` IDs are currently 69 hex/decimal digits long. /// Since Zebra's IDs are only 46 hex/decimal digits, mining pools should be able to handle them. -#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(try_from = "String", into = "String")] pub struct LongPollId { // Fields that invalidate old work: diff --git a/zebra-rpc/src/methods/tests/prop.rs b/zebra-rpc/src/methods/tests/prop.rs index cf2cc7c1bd5..c6c4fa69693 100644 --- a/zebra-rpc/src/methods/tests/prop.rs +++ b/zebra-rpc/src/methods/tests/prop.rs @@ -45,7 +45,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -100,7 +100,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -160,7 +160,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -228,7 +228,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -285,7 +285,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -340,7 +340,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -441,7 +441,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -500,7 +500,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -548,7 +548,7 @@ proptest! { network, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -599,7 +599,7 @@ proptest! { network, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), chain_tip, ); @@ -686,7 +686,7 @@ proptest! { network, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), chain_tip, ); @@ -750,7 +750,7 @@ proptest! { network, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), chain_tip, ); @@ -802,7 +802,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); @@ -892,7 +892,7 @@ proptest! { Mainnet, false, true, - Buffer::new(mempool.clone(), 1), + mempool.clone(), Buffer::new(state.clone(), 1), NoChainTip, ); diff --git a/zebra-rpc/src/methods/tests/snapshot.rs b/zebra-rpc/src/methods/tests/snapshot.rs index 8b74f393e83..2a8e9149f56 100644 --- a/zebra-rpc/src/methods/tests/snapshot.rs +++ b/zebra-rpc/src/methods/tests/snapshot.rs @@ -8,6 +8,7 @@ use std::{collections::BTreeMap, sync::Arc}; use insta::dynamic_redaction; +use tower::buffer::Buffer; use zebra_chain::{ block::Block, @@ -338,7 +339,7 @@ async fn test_mocked_rpc_response_data_for_network(network: Network) { network, false, true, - Buffer::new(mempool, 1), + mempool, state.clone(), latest_chain_tip, ); diff --git a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs index 29dd8c577d6..73e7f43db58 100644 --- a/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs +++ b/zebra-rpc/src/methods/tests/snapshot/get_block_template_rpcs.rs @@ -92,10 +92,13 @@ pub async fn test_responses( let mut mock_sync_status = MockSyncStatus::default(); mock_sync_status.set_is_close_to_tip(true); + #[allow(clippy::unnecessary_struct_initialization)] let mining_config = crate::config::mining::Config { miner_address: Some(transparent::Address::from_script_hash(network, [0xad; 20])), extra_coinbase_data: None, debug_like_zcashd: true, + // Use default field values when optional features are enabled in tests + ..Default::default() }; // nu5 block height diff --git a/zebra-rpc/src/methods/tests/vectors.rs b/zebra-rpc/src/methods/tests/vectors.rs index 63e53386d5c..bfcc2affbef 100644 --- a/zebra-rpc/src/methods/tests/vectors.rs +++ b/zebra-rpc/src/methods/tests/vectors.rs @@ -1099,7 +1099,7 @@ async fn rpc_getmininginfo() { let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( Mainnet, Default::default(), - Buffer::new(MockService::build().for_unit_tests(), 1), + MockService::build().for_unit_tests(), read_state, latest_chain_tip.clone(), MockService::build().for_unit_tests(), @@ -1135,7 +1135,7 @@ async fn rpc_getnetworksolps() { let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( Mainnet, Default::default(), - Buffer::new(MockService::build().for_unit_tests(), 1), + MockService::build().for_unit_tests(), read_state, latest_chain_tip.clone(), MockService::build().for_unit_tests(), @@ -1231,10 +1231,13 @@ async fn rpc_getblocktemplate_mining_address(use_p2pkh: bool) { true => Some(transparent::Address::from_pub_key_hash(Mainnet, [0x7e; 20])), }; + #[allow(clippy::unnecessary_struct_initialization)] let mining_config = crate::config::mining::Config { miner_address, extra_coinbase_data: None, debug_like_zcashd: true, + // Use default field values when optional features are enabled in tests + ..Default::default() }; // nu5 block height @@ -1575,7 +1578,7 @@ async fn rpc_validateaddress() { let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( Mainnet, Default::default(), - Buffer::new(MockService::build().for_unit_tests(), 1), + MockService::build().for_unit_tests(), MockService::build().for_unit_tests(), mock_chain_tip, MockService::build().for_unit_tests(), @@ -1620,7 +1623,7 @@ async fn rpc_z_validateaddress() { let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( Mainnet, Default::default(), - Buffer::new(MockService::build().for_unit_tests(), 1), + MockService::build().for_unit_tests(), MockService::build().for_unit_tests(), mock_chain_tip, MockService::build().for_unit_tests(), @@ -1677,10 +1680,13 @@ async fn rpc_getdifficulty() { let mut mock_sync_status = MockSyncStatus::default(); mock_sync_status.set_is_close_to_tip(true); + #[allow(clippy::unnecessary_struct_initialization)] let mining_config = Config { miner_address: None, extra_coinbase_data: None, debug_like_zcashd: true, + // Use default field values when optional features are enabled in tests + ..Default::default() }; // nu5 block height @@ -1825,7 +1831,7 @@ async fn rpc_z_listunifiedreceivers() { let get_block_template_rpc = get_block_template_rpcs::GetBlockTemplateRpcImpl::new( Mainnet, Default::default(), - Buffer::new(MockService::build().for_unit_tests(), 1), + MockService::build().for_unit_tests(), MockService::build().for_unit_tests(), mock_chain_tip, MockService::build().for_unit_tests(), diff --git a/zebra-rpc/src/server.rs b/zebra-rpc/src/server.rs index 44e6afe84f5..c8e13a9a62f 100644 --- a/zebra-rpc/src/server.rs +++ b/zebra-rpc/src/server.rs @@ -7,13 +7,12 @@ //! See the full list of //! [Differences between JSON-RPC 1.0 and 2.0.](https://www.simple-is-better.org/rpc/#differences-between-1-0-and-2-0) -use std::{fmt, panic}; +use std::{fmt, panic, thread::available_parallelism}; use jsonrpc_core::{Compatibility, MetaIoHandler}; use jsonrpc_http_server::{CloseHandle, ServerBuilder}; use tokio::task::JoinHandle; -use tower::{buffer::Buffer, Service}; - +use tower::Service; use tracing::{Instrument, *}; use zebra_chain::{ @@ -99,7 +98,7 @@ impl RpcServer { mining_config: crate::config::mining::Config, build_version: VersionString, user_agent: UserAgentString, - mempool: Buffer, + mempool: Mempool, state: State, #[cfg_attr(not(feature = "getblocktemplate-rpcs"), allow(unused_variables))] block_verifier_router: BlockVerifierRouter, @@ -117,7 +116,10 @@ impl RpcServer { mempool::Request, Response = mempool::Response, Error = zebra_node_services::BoxError, - > + 'static, + > + Clone + + Send + + Sync + + 'static, Mempool::Future: Send, State: Service< zebra_state::ReadRequest, @@ -150,17 +152,6 @@ impl RpcServer { #[cfg(feature = "getblocktemplate-rpcs")] { - // Prevent loss of miner funds due to an unsupported or incorrect address type. - if let Some(miner_address) = mining_config.miner_address { - assert_eq!( - miner_address.network(), - network, - "incorrect miner address config: {miner_address} \ - network.network {network} and miner address network {} must match", - miner_address.network(), - ); - } - // Initialize the getblocktemplate rpc method handler let get_block_template_rpc_impl = GetBlockTemplateRpcImpl::new( network, @@ -196,7 +187,7 @@ impl RpcServer { // If zero, automatically scale threads to the number of CPU cores let mut parallel_cpu_threads = config.parallel_cpu_threads; if parallel_cpu_threads == 0 { - parallel_cpu_threads = num_cpus::get(); + parallel_cpu_threads = available_parallelism().map(usize::from).unwrap_or(1); } // The server is a blocking task, which blocks on executor shutdown. diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index f19b427f5dc..3439e05b5b1 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -22,7 +22,7 @@ progress-bar = [ "howudoin", ] -# Experimental mining RPC support +# Mining RPC support getblocktemplate-rpcs = [ "zebra-chain/getblocktemplate-rpcs", ] diff --git a/zebra-state/src/service/finalized_state/disk_db.rs b/zebra-state/src/service/finalized_state/disk_db.rs index 6bcf15d569d..825edbc6657 100644 --- a/zebra-state/src/service/finalized_state/disk_db.rs +++ b/zebra-state/src/service/finalized_state/disk_db.rs @@ -23,7 +23,7 @@ use rlimit::increase_nofile_limit; use rocksdb::ReadOptions; use semver::Version; -use zebra_chain::parameters::Network; +use zebra_chain::{parameters::Network, primitives::byte_array::increment_big_endian}; use crate::{ service::finalized_state::disk_format::{FromDisk, IntoDisk}, @@ -640,11 +640,8 @@ impl DiskDb { Included(mut bound) => { // Increment the last byte in the upper bound that is less than u8::MAX, and // clear any bytes after it to increment the next key in lexicographic order - // (next big-endian number) this Vec represents to RocksDB. - let is_wrapped_overflow = bound.iter_mut().rev().all(|v| { - *v = v.wrapping_add(1); - v == &0 - }); + // (next big-endian number). RocksDB uses lexicographic order for keys. + let is_wrapped_overflow = increment_big_endian(&mut bound); if is_wrapped_overflow { bound.insert(0, 0x01) diff --git a/zebra-state/src/service/watch_receiver.rs b/zebra-state/src/service/watch_receiver.rs index 6540ccf98d4..6c4aba0b564 100644 --- a/zebra-state/src/service/watch_receiver.rs +++ b/zebra-state/src/service/watch_receiver.rs @@ -99,15 +99,30 @@ where self.receiver.borrow().clone() } - /// Calls [`watch::Receiver::changed`] and returns the result. + /// Calls [`watch::Receiver::changed()`] and returns the result. + /// Returns when the inner value has been updated, even if the old and new values are equal. /// /// Marks the watched data as seen. pub async fn changed(&mut self) -> Result<(), watch::error::RecvError> { self.receiver.changed().await } + /// Calls [`watch::Receiver::has_changed()`] and returns the result. + /// Returns `true` when the inner value has been updated, even if the old and new values are equal. + /// + /// Does not mark the watched data as seen. + pub fn has_changed(&self) -> Result { + self.receiver.has_changed() + } + /// Marks the watched data as seen. pub fn mark_as_seen(&mut self) { self.receiver.borrow_and_update(); } + + /// Marks the watched data as unseen. + /// Calls [`watch::Receiver::mark_changed()`]. + pub fn mark_changed(&mut self) { + self.receiver.mark_changed(); + } } diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 9df4307ad80..a2ef9d82876 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -68,6 +68,15 @@ getblocktemplate-rpcs = [ "zebra-chain/getblocktemplate-rpcs", ] +# Experimental internal miner support +internal-miner = [ + "thread-priority", + "zebra-chain/internal-miner", + # TODO: move common code into zebra-chain or zebra-node-services and remove the RPC dependency + "zebra-rpc/internal-miner", + "zebra-rpc/getblocktemplate-rpcs", +] + # Experimental shielded blockchain scanning shielded-scan = ["zebra-scan"] @@ -200,6 +209,9 @@ atty = "0.2.14" num-integer = "0.1.45" rand = "0.8.5" +# prod feature internal-miner +thread-priority = { version = "0.15.1", optional = true } + # prod feature sentry sentry = { version = "0.32.1", default-features = false, features = ["backtrace", "contexts", "reqwest", "rustls", "tracing"], optional = true } diff --git a/zebrad/src/application.rs b/zebrad/src/application.rs index 24c8ac9b784..96264f2f051 100644 --- a/zebrad/src/application.rs +++ b/zebrad/src/application.rs @@ -78,9 +78,7 @@ fn vergen_build_version() -> Option { // - optional pre-release: `-`tag[`.`tag ...] // - optional build: `+`tag[`.`tag ...] // change the git describe format to the semver 2.0 format - let Some(vergen_git_describe) = VERGEN_GIT_DESCRIBE else { - return None; - }; + let vergen_git_describe = VERGEN_GIT_DESCRIBE?; // `git describe` uses "dirty" for uncommitted changes, // but users won't understand what that means. @@ -90,10 +88,7 @@ fn vergen_build_version() -> Option { let mut vergen_git_describe = vergen_git_describe.split('-').peekable(); // Check the "version core" part. - let version = vergen_git_describe.next(); - let Some(mut version) = version else { - return None; - }; + let mut version = vergen_git_describe.next()?; // strip the leading "v", if present. version = version.strip_prefix('v').unwrap_or(version); diff --git a/zebrad/src/commands/start.rs b/zebrad/src/commands/start.rs index ec59f409c5e..e9b98cc523d 100644 --- a/zebrad/src/commands/start.rs +++ b/zebrad/src/commands/start.rs @@ -45,6 +45,16 @@ //! * Progress Task //! * logs progress towards the chain tip //! +//! Shielded Scanning: +//! * Shielded Scanner Task +//! * if the user has configured Zebra with their shielded viewing keys, scans new and existing +//! blocks for transactions that use those keys +//! +//! Block Mining: +//! * Internal Miner Task +//! * if the user has configured Zebra to mine blocks, spawns tasks to generate new blocks, +//! and submits them for verification. This automatically shares these new blocks with peers. +//! //! Mempool Transactions: //! * Mempool Service //! * activates when the syncer is near the chain tip @@ -220,10 +230,10 @@ impl StartCmd { build_version(), user_agent(), mempool.clone(), - read_only_state_service, - block_verifier_router, + read_only_state_service.clone(), + block_verifier_router.clone(), sync_status.clone(), - address_book, + address_book.clone(), latest_chain_tip.clone(), config.network.network, ); @@ -267,7 +277,8 @@ impl StartCmd { // Spawn never ending end of support task. info!("spawning end of support checking task"); let end_of_support_task_handle = tokio::spawn( - sync::end_of_support::start(config.network.network, latest_chain_tip).in_current_span(), + sync::end_of_support::start(config.network.network, latest_chain_tip.clone()) + .in_current_span(), ); // Give the inbound service more time to clear its queue, @@ -281,7 +292,7 @@ impl StartCmd { &config.mempool, peer_set, mempool.clone(), - sync_status, + sync_status.clone(), chain_tip_change.clone(), ); @@ -308,6 +319,33 @@ impl StartCmd { let scan_task_handle: tokio::task::JoinHandle> = tokio::spawn(std::future::pending().in_current_span()); + // And finally, spawn the internal Zcash miner, if it is enabled. + // + // TODO: add a config to enable the miner rather than a feature. + #[cfg(feature = "internal-miner")] + let miner_task_handle = if config.mining.is_internal_miner_enabled() { + info!("spawning Zcash miner"); + let rpc = zebra_rpc::methods::get_block_template_rpcs::GetBlockTemplateRpcImpl::new( + config.network.network, + config.mining.clone(), + mempool, + read_only_state_service, + latest_chain_tip, + block_verifier_router, + sync_status, + address_book, + ); + + crate::components::miner::spawn_init(&config.mining, rpc) + } else { + tokio::spawn(std::future::pending().in_current_span()) + }; + + #[cfg(not(feature = "internal-miner"))] + // Spawn a dummy miner task which doesn't do anything and never finishes. + let miner_task_handle: tokio::task::JoinHandle> = + tokio::spawn(std::future::pending().in_current_span()); + info!("spawned initial Zebra tasks"); // TODO: put tasks into an ongoing FuturesUnordered and a startup FuturesUnordered? @@ -323,6 +361,7 @@ impl StartCmd { pin!(progress_task_handle); pin!(end_of_support_task_handle); pin!(scan_task_handle); + pin!(miner_task_handle); // startup tasks let BackgroundTaskHandles { @@ -413,6 +452,10 @@ impl StartCmd { scan_result = &mut scan_task_handle => scan_result .expect("unexpected panic in the scan task") .map(|_| info!("scan task exited")), + + miner_result = &mut miner_task_handle => miner_result + .expect("unexpected panic in the miner task") + .map(|_| info!("miner task exited")), }; // Stop Zebra if a task finished and returned an error, @@ -439,6 +482,7 @@ impl StartCmd { progress_task_handle.abort(); end_of_support_task_handle.abort(); scan_task_handle.abort(); + miner_task_handle.abort(); // startup tasks state_checkpoint_verify_handle.abort(); diff --git a/zebrad/src/components.rs b/zebrad/src/components.rs index 5daf65a4a79..43b051f1209 100644 --- a/zebrad/src/components.rs +++ b/zebrad/src/components.rs @@ -16,5 +16,8 @@ pub mod tokio; #[allow(missing_docs)] pub mod tracing; +#[cfg(feature = "internal-miner")] +pub mod miner; + pub use inbound::Inbound; pub use sync::ChainSync; diff --git a/zebrad/src/components/miner.rs b/zebrad/src/components/miner.rs new file mode 100644 index 00000000000..b337c7669ee --- /dev/null +++ b/zebrad/src/components/miner.rs @@ -0,0 +1,526 @@ +//! Internal mining in Zebra. +//! +//! # TODO +//! - pause mining if we have no peers, like `zcashd` does, +//! and add a developer config that mines regardless of how many peers we have. +//! +//! - move common code into zebra-chain or zebra-node-services and remove the RPC dependency. + +use std::{cmp::min, sync::Arc, thread::available_parallelism, time::Duration}; + +use color_eyre::Report; +use futures::{stream::FuturesUnordered, StreamExt}; +use thread_priority::{ThreadBuilder, ThreadPriority}; +use tokio::{select, sync::watch, task::JoinHandle, time::sleep}; +use tower::Service; +use tracing::{Instrument, Span}; + +use zebra_chain::{ + block::{self, Block}, + chain_sync_status::ChainSyncStatus, + chain_tip::ChainTip, + diagnostic::task::WaitForPanics, + serialization::{AtLeastOne, ZcashSerialize}, + shutdown::is_shutting_down, + work::equihash::{Solution, SolverCancelled}, +}; +use zebra_network::AddressBookPeers; +use zebra_node_services::mempool; +use zebra_rpc::{ + config::mining::Config, + methods::{ + get_block_template_rpcs::{ + constants::GET_BLOCK_TEMPLATE_MEMPOOL_LONG_POLL_INTERVAL, + get_block_template::{ + self, proposal::TimeSource, proposal_block_from_template, + GetBlockTemplateCapability::*, GetBlockTemplateRequestMode::*, + }, + types::hex_data::HexData, + }, + GetBlockTemplateRpc, GetBlockTemplateRpcImpl, + }, +}; +use zebra_state::WatchReceiver; + +/// The amount of time we wait between block template retries. +pub const BLOCK_TEMPLATE_WAIT_TIME: Duration = Duration::from_secs(20); + +/// Initialize the miner based on its config, and spawn a task for it. +/// +/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured +/// mining thread. +/// +/// See [`run_mining_solver()`] for more details. +pub fn spawn_init( + config: &Config, + rpc: GetBlockTemplateRpcImpl, +) -> JoinHandle> +// TODO: simplify or avoid repeating these generics (how?) +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, + BlockVerifierRouter: Service + + Clone + + Send + + Sync + + 'static, + >::Future: Send, + SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, + AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, +{ + let config = config.clone(); + + // TODO: spawn an entirely new executor here, so mining is isolated from higher priority tasks. + tokio::spawn(init(config, rpc).in_current_span()) +} + +/// Initialize the miner based on its config. +/// +/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core per configured +/// mining thread. +/// +/// See [`run_mining_solver()`] for more details. +pub async fn init( + config: Config, + rpc: GetBlockTemplateRpcImpl, +) -> Result<(), Report> +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, + BlockVerifierRouter: Service + + Clone + + Send + + Sync + + 'static, + >::Future: Send, + SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, + AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, +{ + let configured_threads = config.internal_miner_threads; + // If we can't detect the number of cores, use the configured number. + let available_threads = available_parallelism() + .map(usize::from) + .unwrap_or(configured_threads); + + // Use the minimum of the configured and available threads. + let solver_count = min(configured_threads, available_threads); + + info!( + ?solver_count, + "launching mining tasks with parallel solvers" + ); + + let (template_sender, template_receiver) = watch::channel(None); + let template_receiver = WatchReceiver::new(template_receiver); + + // Spawn these tasks, to avoid blocked cooperative futures, and improve shutdown responsiveness. + // This is particularly important when there are a large number of solver threads. + let mut abort_handles = Vec::new(); + + let template_generator = tokio::task::spawn( + generate_block_templates(rpc.clone(), template_sender).in_current_span(), + ); + abort_handles.push(template_generator.abort_handle()); + let template_generator = template_generator.wait_for_panics(); + + let mut mining_solvers = FuturesUnordered::new(); + for solver_id in 0..solver_count { + // Assume there are less than 256 cores. If there are more, only run 256 tasks. + let solver_id = min(solver_id, usize::from(u8::MAX)) + .try_into() + .expect("just limited to u8::MAX"); + + let solver = tokio::task::spawn( + run_mining_solver(solver_id, template_receiver.clone(), rpc.clone()).in_current_span(), + ); + abort_handles.push(solver.abort_handle()); + + mining_solvers.push(solver.wait_for_panics()); + } + + // These tasks run forever unless there is a fatal error or shutdown. + // When that happens, the first task to error returns, and the other JoinHandle futures are + // cancelled. + let first_result; + select! { + result = template_generator => { first_result = result; } + result = mining_solvers.next() => { + first_result = result + .expect("stream never terminates because there is at least one solver task"); + } + } + + // But the spawned async tasks keep running, so we need to abort them here. + for abort_handle in abort_handles { + abort_handle.abort(); + } + + // Any spawned blocking threads will keep running. When this task returns and drops the + // `template_sender`, it cancels all the spawned miner threads. This works because we've + // aborted the `template_generator` task, which owns the `template_sender`. (And it doesn't + // spawn any blocking threads.) + first_result +} + +/// Generates block templates using `rpc`, and sends them to mining threads using `template_sender`. +#[instrument(skip(rpc, template_sender))] +pub async fn generate_block_templates< + Mempool, + State, + Tip, + BlockVerifierRouter, + SyncStatus, + AddressBook, +>( + rpc: GetBlockTemplateRpcImpl, + template_sender: watch::Sender>>, +) -> Result<(), Report> +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, + BlockVerifierRouter: Service + + Clone + + Send + + Sync + + 'static, + >::Future: Send, + SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, + AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, +{ + // Pass the correct arguments, even if Zebra currently ignores them. + let mut parameters = get_block_template::JsonParameters { + mode: Template, + data: None, + capabilities: vec![LongPoll, CoinbaseTxn], + long_poll_id: None, + _work_id: None, + }; + + // Shut down the task when all the template receivers are dropped, or Zebra shuts down. + while !template_sender.is_closed() && !is_shutting_down() { + let template = rpc.get_block_template(Some(parameters.clone())).await; + + // Wait for the chain to sync so we get a valid template. + let Ok(template) = template else { + debug!( + ?BLOCK_TEMPLATE_WAIT_TIME, + "waiting for a valid block template", + ); + + // Skip the wait if we got an error because we are shutting down. + if !is_shutting_down() { + sleep(BLOCK_TEMPLATE_WAIT_TIME).await; + } + + continue; + }; + + // Convert from RPC GetBlockTemplate to Block + let template = template + .try_into_template() + .expect("invalid RPC response: proposal in response to a template request"); + + info!( + height = ?template.height, + transactions = ?template.transactions.len(), + "mining with an updated block template", + ); + + // Tell the next get_block_template() call to wait until the template has changed. + parameters.long_poll_id = Some(template.long_poll_id); + + let block = proposal_block_from_template(&template, TimeSource::CurTime) + .expect("unexpected invalid block template"); + + // If the template has actually changed, send an updated template. + template_sender.send_if_modified(|old_block| { + if old_block.as_ref().map(|b| *b.header) == Some(*block.header) { + return false; + } + *old_block = Some(Arc::new(block)); + true + }); + + // If the blockchain is changing rapidly, limit how often we'll update the template. + // But if we're shutting down, do that immediately. + if !template_sender.is_closed() && !is_shutting_down() { + sleep(Duration::from_secs( + GET_BLOCK_TEMPLATE_MEMPOOL_LONG_POLL_INTERVAL, + )) + .await; + } + } + + Ok(()) +} + +/// Runs a single mining thread that gets blocks from the `template_receiver`, calculates equihash +/// solutions with nonces based on `solver_id`, and submits valid blocks to Zebra's block validator. +/// +/// This method is CPU and memory-intensive. It uses 144 MB of RAM and one CPU core while running. +/// It can run for minutes or hours if the network difficulty is high. Mining uses a thread with +/// low CPU priority. +#[instrument(skip(template_receiver, rpc))] +pub async fn run_mining_solver( + solver_id: u8, + mut template_receiver: WatchReceiver>>, + rpc: GetBlockTemplateRpcImpl, +) -> Result<(), Report> +where + Mempool: Service< + mempool::Request, + Response = mempool::Response, + Error = zebra_node_services::BoxError, + > + Clone + + Send + + Sync + + 'static, + Mempool::Future: Send, + State: Service< + zebra_state::ReadRequest, + Response = zebra_state::ReadResponse, + Error = zebra_state::BoxError, + > + Clone + + Send + + Sync + + 'static, + >::Future: Send, + Tip: ChainTip + Clone + Send + Sync + 'static, + BlockVerifierRouter: Service + + Clone + + Send + + Sync + + 'static, + >::Future: Send, + SyncStatus: ChainSyncStatus + Clone + Send + Sync + 'static, + AddressBook: AddressBookPeers + Clone + Send + Sync + 'static, +{ + // Shut down the task when the template sender is dropped, or Zebra shuts down. + while template_receiver.has_changed().is_ok() && !is_shutting_down() { + // Get the latest block template, and mark the current value as seen. + // We mark the value first to avoid missed updates. + template_receiver.mark_as_seen(); + let template = template_receiver.cloned_watch_data(); + + let Some(template) = template else { + if solver_id == 0 { + info!( + ?solver_id, + ?BLOCK_TEMPLATE_WAIT_TIME, + "solver waiting for initial block template" + ); + } else { + debug!( + ?solver_id, + ?BLOCK_TEMPLATE_WAIT_TIME, + "solver waiting for initial block template" + ); + } + + // Skip the wait if we didn't get a template because we are shutting down. + if !is_shutting_down() { + sleep(BLOCK_TEMPLATE_WAIT_TIME).await; + } + + continue; + }; + + let height = template.coinbase_height().expect("template is valid"); + + // Set up the cancellation conditions for the miner. + let mut cancel_receiver = template_receiver.clone(); + let old_header = *template.header; + let cancel_fn = move || match cancel_receiver.has_changed() { + // Guard against get_block_template() providing an identical header. This could happen + // if something irrelevant to the block data changes, the time was within 1 second, or + // there is a spurious channel change. + Ok(has_changed) => { + cancel_receiver.mark_as_seen(); + + // We only need to check header equality, because the block data is bound to the + // header. + if has_changed + && Some(old_header) != cancel_receiver.cloned_watch_data().map(|b| *b.header) + { + Err(SolverCancelled) + } else { + Ok(()) + } + } + // If the sender was dropped, we're likely shutting down, so cancel the solver. + Err(_sender_dropped) => Err(SolverCancelled), + }; + + // Mine at least one block using the equihash solver. + let Ok(blocks) = mine_a_block(solver_id, template, cancel_fn).await else { + // If the solver was cancelled, we're either shutting down, or we have a new template. + if solver_id == 0 { + info!( + ?height, + ?solver_id, + new_template = ?template_receiver.has_changed(), + shutting_down = ?is_shutting_down(), + "solver cancelled: getting a new block template or shutting down" + ); + } else { + debug!( + ?height, + ?solver_id, + new_template = ?template_receiver.has_changed(), + shutting_down = ?is_shutting_down(), + "solver cancelled: getting a new block template or shutting down" + ); + } + + // If the blockchain is changing rapidly, limit how often we'll update the template. + // But if we're shutting down, do that immediately. + if template_receiver.has_changed().is_ok() && !is_shutting_down() { + sleep(Duration::from_secs(1)).await; + } + + continue; + }; + + // Submit the newly mined blocks to the verifiers. + // + // TODO: if there is a new template (`cancel_fn().is_err()`), and + // GetBlockTemplate.submit_old is false, return immediately, and skip submitting the + // blocks. + for block in blocks { + let data = block + .zcash_serialize_to_vec() + .expect("serializing to Vec never fails"); + + match rpc.submit_block(HexData(data), None).await { + Ok(success) => info!( + ?height, + hash = ?block.hash(), + ?solver_id, + ?success, + "successfully mined a new block", + ), + Err(error) => info!( + ?height, + hash = ?block.hash(), + ?solver_id, + ?error, + "validating a newly mined block failed, trying again", + ), + } + } + } + + Ok(()) +} + +/// Mines one or more blocks based on `template`. Calculates equihash solutions, checks difficulty, +/// and returns as soon as it has at least one block. Uses a different nonce range for each +/// `solver_id`. +/// +/// If `cancel_fn()` returns an error, returns early with `Err(SolverCancelled)`. +/// +/// See [`run_mining_solver()`] for more details. +pub async fn mine_a_block( + solver_id: u8, + template: Arc, + cancel_fn: F, +) -> Result, SolverCancelled> +where + F: FnMut() -> Result<(), SolverCancelled> + Send + Sync + 'static, +{ + // TODO: Replace with Arc::unwrap_or_clone() when it stabilises: + // https://github.com/rust-lang/rust/issues/93610 + let mut header = *template.header; + + // Use a different nonce for each solver thread. + // Change both the first and last bytes, so we don't have to care if the nonces are incremented in + // big-endian or little-endian order. And we can see the thread that mined a block from the nonce. + *header.nonce.first_mut().unwrap() = solver_id; + *header.nonce.last_mut().unwrap() = solver_id; + + // Mine one or more blocks using the solver, in a low-priority blocking thread. + let span = Span::current(); + let solved_headers = + tokio::task::spawn_blocking(move || span.in_scope(move || { + let miner_thread_handle = ThreadBuilder::default().name("zebra-miner").priority(ThreadPriority::Min).spawn(move |priority_result| { + if let Err(error) = priority_result { + info!(?error, "could not set miner to run at a low priority: running at default priority"); + } + + Solution::solve(header, cancel_fn) + }).expect("unable to spawn miner thread"); + + miner_thread_handle.wait_for_panics() + })) + .wait_for_panics() + .await?; + + // Modify the template into solved blocks. + + // TODO: Replace with Arc::unwrap_or_clone() when it stabilises + let block = (*template).clone(); + + let solved_blocks: Vec = solved_headers + .into_iter() + .map(|header| { + let mut block = block.clone(); + block.header = Arc::new(header); + block + }) + .collect(); + + Ok(solved_blocks + .try_into() + .expect("a 1:1 mapping of AtLeastOne produces at least one block")) +} diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 160e27777f3..651dacc1f8d 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -261,7 +261,8 @@ pub struct Config { /// The number of threads used to verify signatures, proofs, and other CPU-intensive code. /// - /// Set to `0` by default, which uses one thread per available CPU core. + /// If the number of threads is not configured or zero, Zebra uses the number of logical cores. + /// If the number of logical cores can't be detected, Zebra uses one thread. /// For details, see [the `rayon` documentation](https://docs.rs/rayon/latest/rayon/struct.ThreadPoolBuilder.html#method.num_threads). pub parallel_cpu_threads: usize, } diff --git a/zebrad/src/lib.rs b/zebrad/src/lib.rs index ab4c790e45a..2043c082521 100644 --- a/zebrad/src/lib.rs +++ b/zebrad/src/lib.rs @@ -111,9 +111,12 @@ //! ### Experimental //! //! * `elasticsearch`: save block data into elasticsearch database. Read the [elasticsearch](https://zebra.zfnd.org/user/elasticsearch.html) -//! section of the book for more details. +//! section of the book for more details. //! * `shielded-scan`: enable experimental support for scanning shielded transactions. Read the [shielded-scan](https://zebra.zfnd.org/user/shielded-scan.html) -//! section of the book for more details. +//! section of the book for more details. +//! * `internal-miner`: enable experimental support for mining inside Zebra, without an external +//! mining pool. This feature is only supported on testnet. Use a GPU or ASIC on mainnet for +//! efficient mining. #![doc(html_favicon_url = "https://zfnd.org/wp-content/uploads/2022/03/zebra-favicon-128.png")] #![doc(html_logo_url = "https://zfnd.org/wp-content/uploads/2022/03/zebra-icon.png")] diff --git a/zebrad/tests/common/get_block_template_rpcs/get_block_template.rs b/zebrad/tests/common/get_block_template_rpcs/get_block_template.rs index e6bd3d3d9c4..56a62ff4346 100644 --- a/zebrad/tests/common/get_block_template_rpcs/get_block_template.rs +++ b/zebrad/tests/common/get_block_template_rpcs/get_block_template.rs @@ -169,7 +169,7 @@ async fn try_validate_block_template(client: &RpcRequestClient) -> Result<()> { { let client = client.clone(); - let mut long_poll_id = response_json_result.long_poll_id.clone(); + let mut long_poll_id = response_json_result.long_poll_id; tokio::spawn(async move { loop { @@ -196,7 +196,7 @@ async fn try_validate_block_template(client: &RpcRequestClient) -> Result<()> { } long_poll_result = long_poll_request => { - long_poll_id = long_poll_result.long_poll_id.clone(); + long_poll_id = long_poll_result.long_poll_id; if let Some(false) = long_poll_result.submit_old { let _ = long_poll_result_tx.send(long_poll_result);