diff --git a/crates/subspace-node/src/bin/subspace-node.rs b/crates/subspace-node/src/bin/subspace-node.rs index 4607d2f244..246801a526 100644 --- a/crates/subspace-node/src/bin/subspace-node.rs +++ b/crates/subspace-node/src/bin/subspace-node.rs @@ -632,7 +632,6 @@ fn main() -> Result<(), Error> { .new_slot_notification_stream .clone(), consensus_sync_service: consensus_chain_node.sync_service.clone(), - select_chain: consensus_chain_node.select_chain.clone(), domain_message_receiver, gossip_message_sink: xdm_gossip_worker_builder.gossip_msg_sink(), }; diff --git a/crates/subspace-node/src/domain/domain_instance_starter.rs b/crates/subspace-node/src/domain/domain_instance_starter.rs index c8924b9925..5b3b4f1d9e 100644 --- a/crates/subspace-node/src/domain/domain_instance_starter.rs +++ b/crates/subspace-node/src/domain/domain_instance_starter.rs @@ -19,7 +19,7 @@ use sp_domains::{DomainInstanceData, RuntimeType}; use std::sync::Arc; use subspace_runtime::RuntimeApi as CRuntimeApi; use subspace_runtime_primitives::opaque::Block as CBlock; -use subspace_service::{FullClient as CFullClient, FullSelectChain}; +use subspace_service::FullClient as CFullClient; /// `DomainInstanceStarter` used to start a domain instance node based on the given /// bootstrap result @@ -32,7 +32,6 @@ pub struct DomainInstanceStarter { SubspaceNotificationStream>, pub new_slot_notification_stream: SubspaceNotificationStream, pub consensus_sync_service: Arc>, - pub select_chain: FullSelectChain, pub domain_message_receiver: TracingUnboundedReceiver>, pub gossip_message_sink: TracingUnboundedSender, } @@ -61,7 +60,6 @@ impl DomainInstanceStarter { block_importing_notification_stream, new_slot_notification_stream, consensus_sync_service, - select_chain, domain_message_receiver, gossip_message_sink, } = self; @@ -93,7 +91,6 @@ impl DomainInstanceStarter { ( slot_notification.new_slot_info.slot, slot_notification.new_slot_info.global_randomness, - None::>, ) }) }; @@ -104,6 +101,7 @@ impl DomainInstanceStarter { block_importing_notification_stream: block_importing_notification_stream(), imported_block_notification_stream, new_slot_notification_stream: new_slot_notification_stream(), + acknowledgement_sender_stream: futures::stream::empty(), _phantom: Default::default(), }; @@ -135,7 +133,6 @@ impl DomainInstanceStarter { consensus_client, consensus_offchain_tx_pool_factory, consensus_network_sync_oracle: consensus_sync_service.clone(), - select_chain, operator_streams, gossip_message_sink, domain_message_receiver, diff --git a/domains/client/domain-operator/src/domain_bundle_producer.rs b/domains/client/domain-operator/src/domain_bundle_producer.rs index 943a8ca7fb..4881f2d42c 100644 --- a/domains/client/domain-operator/src/domain_bundle_producer.rs +++ b/domains/client/domain-operator/src/domain_bundle_producer.rs @@ -14,7 +14,7 @@ use sp_domains::{ SealedBundleHeader, }; use sp_keystore::KeystorePtr; -use sp_runtime::traits::{Block as BlockT, One, Saturating, Zero}; +use sp_runtime::traits::{Block as BlockT, Zero}; use sp_runtime::RuntimeAppPublic; use std::convert::{AsRef, Into}; use std::marker::PhantomData; @@ -149,26 +149,7 @@ where global_randomness, } = slot_info; - let best_receipt_is_written = crate::aux_schema::latest_consensus_block_hash_for::< - _, - _, - CBlock::Hash, - >(&*self.client, &self.client.info().best_hash)? - .is_some(); - - // TODO: remove once the receipt generation can be done before the domain block is - // committed to the database, in other words, only when the receipt of block N+1 has - // been generated can the `client.info().best_number` be updated from N to N+1. - // - // This requires: - // 1. Reimplement `runtime_api.intermediate_roots()` on the client side. - // 2. Add a hook before the upstream `client.commit_operation(op)`. - let domain_best_number = if best_receipt_is_written { - self.client.info().best_number - } else { - self.client.info().best_number.saturating_sub(One::one()) - }; - + let domain_best_number = self.client.info().best_number; let parent_chain_best_hash = self.parent_chain.best_hash(); let should_skip_slot = { let head_receipt_number = self diff --git a/domains/client/domain-operator/src/domain_bundle_proposer.rs b/domains/client/domain-operator/src/domain_bundle_proposer.rs index 6b069a1efb..7be3c95009 100644 --- a/domains/client/domain-operator/src/domain_bundle_proposer.rs +++ b/domains/client/domain-operator/src/domain_bundle_proposer.rs @@ -9,7 +9,7 @@ use sp_api::{HeaderT, NumberFor, ProvideRuntimeApi}; use sp_block_builder::BlockBuilder; use sp_blockchain::HeaderBackend; use sp_domains::{BundleHeader, ExecutionReceipt, ProofOfElection}; -use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Hash as HashT, One, Saturating, Zero}; +use sp_runtime::traits::{BlakeTwo256, Block as BlockT, Hash as HashT, One, Zero}; use sp_weights::Weight; use std::marker::PhantomData; use std::sync::Arc; @@ -153,7 +153,7 @@ where sp_core::storage::StateVersion::V1, ); - let receipt = self.load_bundle_receipt(parent_number, parent_hash, parent_chain)?; + let receipt = self.load_bundle_receipt(parent_number, parent_chain)?; let header = BundleHeader { proof_of_election, @@ -170,7 +170,6 @@ where fn load_bundle_receipt( &self, header_number: NumberFor, - header_hash: Block::Hash, parent_chain: ParentChain, ) -> sp_blockchain::Result> where @@ -179,37 +178,15 @@ where { let parent_chain_block_hash = parent_chain.best_hash(); let head_receipt_number = parent_chain.head_receipt_number(parent_chain_block_hash)?; - let max_drift = parent_chain.block_tree_pruning_depth(parent_chain_block_hash)?; + let receipt_number = (head_receipt_number + One::one()).min(header_number); tracing::trace!( ?header_number, ?head_receipt_number, - ?max_drift, + ?receipt_number, "Collecting receipts at {parent_chain_block_hash:?}" ); - let header_block_receipt_is_written = crate::aux_schema::latest_consensus_block_hash_for::< - _, - _, - CBlock::Hash, - >(&*self.client, &header_hash)? - .is_some(); - - // TODO: remove once the receipt generation can be done before the domain block is - // committed to the database, in other words, only when the receipt of block N+1 has - // been generated can the `client.info().best_number` be updated from N to N+1. - // - // This requires: - // 1. Reimplement `runtime_api.intermediate_roots()` on the client side. - // 2. Add a hook before the upstream `client.commit_operation(op)`. - let available_best_receipt_number = if header_block_receipt_is_written { - header_number - } else { - header_number.saturating_sub(One::one()) - }; - - let receipt_number = (head_receipt_number + One::one()).min(available_best_receipt_number); - if receipt_number.is_zero() { let genesis_hash = self.client.info().genesis_hash; let genesis_header = self.client.header(genesis_hash)?.ok_or_else(|| { diff --git a/domains/client/domain-operator/src/domain_worker.rs b/domains/client/domain-operator/src/domain_worker.rs index 56e777a3ae..2dd812c330 100644 --- a/domains/client/domain-operator/src/domain_worker.rs +++ b/domains/client/domain-operator/src/domain_worker.rs @@ -1,6 +1,6 @@ //! Shared domain worker functions. -use crate::utils::{to_number_primitive, BlockInfo, OperatorSlotInfo}; +use crate::utils::{BlockInfo, OperatorSlotInfo}; use futures::channel::mpsc; use futures::{SinkExt, Stream, StreamExt}; use sc_client_api::{BlockBackend, BlockImportNotification, BlockchainEvents}; @@ -9,11 +9,10 @@ use sp_api::{ApiError, ApiExt, BlockT, ProvideRuntimeApi}; use sp_blockchain::{HashAndNumber, HeaderBackend}; use sp_core::traits::SpawnEssentialNamed; use sp_domains::{DomainsApi, OpaqueBundle}; -use sp_runtime::traits::{Header as HeaderT, NumberFor, One, Saturating}; -use std::collections::hash_map::Entry; -use std::collections::HashMap; +use sp_runtime::traits::{Header as HeaderT, NumberFor}; use std::future::Future; use std::pin::Pin; +use std::sync::Arc; use subspace_runtime_primitives::Balance; type OpaqueBundleFor = OpaqueBundle< @@ -24,181 +23,106 @@ type OpaqueBundleFor = OpaqueBundle< Balance, >; -pub(crate) async fn handle_slot_notifications( - consensus_client: &CClient, - consensus_offchain_tx_pool_factory: &OffchainTransactionPoolFactory, - bundler: BundlerFn, - mut slots: impl Stream>)> + Unpin, -) where - Block: BlockT, - CBlock: BlockT, - CClient: HeaderBackend + ProvideRuntimeApi, - CClient::Api: DomainsApi, Block::Hash>, - BundlerFn: Fn( - HashAndNumber, - OperatorSlotInfo, - ) -> Pin>> + Send>> - + Send - + Sync, -{ - while let Some((operator_slot_info, slot_acknowledgement_sender)) = slots.next().await { - let slot = operator_slot_info.slot; - if let Err(error) = on_new_slot::( - consensus_client, - consensus_offchain_tx_pool_factory.clone(), - &bundler, - operator_slot_info, - ) - .await - { - tracing::error!( - ?error, - "Error occurred on producing a bundle at slot {slot}" - ); - break; - } - if let Some(mut sender) = slot_acknowledgement_sender { - let _ = sender.send(()).await; - } - } -} - +/// Throttle the consensus block import notification based on the `consensus_block_import_throttling_buffer_size` +/// to pause the consensus block import in case the consensus chain runs much faster than the domain. +/// +/// Return the throttled block import notification stream #[allow(clippy::too_many_arguments)] -pub(crate) async fn handle_block_import_notifications< +pub(crate) fn throttling_block_import_notifications< Block, CBlock, CClient, - ProcessorFn, BlocksImporting, BlocksImported, >( spawn_essential: Box, - consensus_client: &CClient, - best_domain_number: NumberFor, - processor: ProcessorFn, - mut leaves: Vec<(CBlock::Hash, NumberFor, bool)>, + consensus_client: Arc, mut blocks_importing: BlocksImporting, mut blocks_imported: BlocksImported, consensus_block_import_throttling_buffer_size: u32, -) where +) -> mpsc::Receiver>> +where Block: BlockT, CBlock: BlockT, CClient: HeaderBackend + BlockBackend + ProvideRuntimeApi - + BlockchainEvents, - CClient::Api: DomainsApi, Block::Hash>, - ProcessorFn: Fn( - (CBlock::Hash, NumberFor, bool), - ) -> Pin> + Send>> - + Send - + Sync + + BlockchainEvents + 'static, - BlocksImporting: Stream, mpsc::Sender<()>)> + Unpin, - BlocksImported: Stream> + Unpin, + CClient::Api: DomainsApi, Block::Hash>, + BlocksImporting: Stream, mpsc::Sender<()>)> + Unpin + Send + 'static, + BlocksImported: Stream> + Unpin + Send + 'static, { - let mut active_leaves = HashMap::with_capacity(leaves.len()); - - let best_domain_number = to_number_primitive(best_domain_number); - - // Notify about active leaves on startup before starting the loop - for (hash, number, is_new_best) in std::mem::take(&mut leaves) { - let _ = active_leaves.insert(hash, number); - // Skip the blocks that have been processed by the execution chain. - if number > best_domain_number.into() { - if let Err(error) = processor((hash, number, is_new_best)).await { - tracing::error!(?error, "Failed to process consensus block on startup"); - // Bring down the service as bundles processor is an essential task. - // TODO: more graceful shutdown. - return; - } - } - } - // The consensus chain can be ahead of the domain by up to `block_import_throttling_buffer_size/2` // blocks, for there are two notifications per block sent to this buffer (one will be actually // consumed by the domain processor, the other from `sc-consensus-subspace` is used to discontinue // the consensus block import in case the consensus chain runs much faster than the domain.). - let (mut block_info_sender, mut block_info_receiver) = - mpsc::channel(consensus_block_import_throttling_buffer_size as usize); + let (mut block_info_sender, block_info_receiver) = mpsc::channel::>>( + consensus_block_import_throttling_buffer_size as usize, + ); - // Run the actual processor in a dedicated task, otherwise `tokio::select!` might hang forever - // when the throttling buffer is full. - spawn_essential.spawn_essential_blocking( - "consensus-block-processor", + spawn_essential.spawn_essential( + "consensus-block-import-throttler", None, Box::pin(async move { - while let Some(maybe_block_info) = block_info_receiver.next().await { - if let Some(block_info) = maybe_block_info { - if let Err(error) = - block_imported::(&processor, &mut active_leaves, block_info) - .await - { - tracing::error!(?error, "Failed to process consensus block"); - // Bring down the service as bundles processor is an essential task. - // TODO: more graceful shutdown. - break; + loop { + tokio::select! { + // Ensure the `blocks_imported` branch will be checked before the `blocks_importing` branch. + // Currently this is only necessary for the test to ensure when both `block_imported` notification + // and `blocks_importing` notification are arrived, the `block_imported` notification will be processed + // first, such that we can ensure when the `blocks_importing` acknowledgement is responded, the + // imported block must being processed by the executor. + // Please see https://github.com/subspace/subspace/pull/1363#discussion_r1162571291 for more details. + biased; + + maybe_block_imported = blocks_imported.next() => { + let block_imported = match maybe_block_imported { + Some(block_imported) => block_imported, + None => { + // Can be None on graceful shutdown. + break; + } + }; + let header = match consensus_client.header(block_imported.hash) { + Ok(Some(header)) => header, + res => { + tracing::error!( + result = ?res, + header = ?block_imported.header, + "Imported consensus block header not found", + ); + return; + } + }; + let block_info = BlockInfo { + hash: header.hash(), + number: *header.number(), + is_new_best: block_imported.is_new_best, + }; + let _ = block_info_sender.feed(Some(block_info)).await; + } + maybe_block_importing = blocks_importing.next() => { + let (_block_number, mut acknowledgement_sender) = + match maybe_block_importing { + Some(block_importing) => block_importing, + None => { + // Can be None on graceful shutdown. + break; + } + }; + // Pause the consensus block import when the sink is full. + let _ = block_info_sender.feed(None).await; + let _ = acknowledgement_sender.send(()).await; } } } }), ); - loop { - tokio::select! { - // Ensure the `blocks_imported` branch will be checked before the `blocks_importing` branch. - // Currently this is only necessary for the test to ensure when both `block_imported` notification - // and `blocks_importing` notification are arrived, the `block_imported` notification will be processed - // first, such that we can ensure when the `blocks_importing` acknowledgement is responded, the - // imported block must being processed by the executor. - // Please see https://github.com/subspace/subspace/pull/1363#discussion_r1162571291 for more details. - biased; - - maybe_block_imported = blocks_imported.next() => { - let block_imported = match maybe_block_imported { - Some(block_imported) => block_imported, - None => { - // Can be None on graceful shutdown. - break; - } - }; - let header = match consensus_client.header(block_imported.hash) { - Ok(Some(header)) => header, - res => { - tracing::error!( - result = ?res, - header = ?block_imported.header, - "Imported consensus block header not found", - ); - return; - } - }; - let block_info = BlockInfo { - hash: header.hash(), - parent_hash: *header.parent_hash(), - number: *header.number(), - is_new_best: block_imported.is_new_best, - }; - let _ = block_info_sender.feed(Some(block_info)).await; - } - maybe_block_importing = blocks_importing.next() => { - let (_block_number, mut acknowledgement_sender) = - match maybe_block_importing { - Some(block_importing) => block_importing, - None => { - // Can be None on graceful shutdown. - break; - } - }; - // Pause the consensus block import when the sink is full. - let _ = block_info_sender.feed(None).await; - let _ = acknowledgement_sender.send(()).await; - } - } - } + block_info_receiver } -async fn on_new_slot( +pub(crate) async fn on_new_slot( consensus_client: &CClient, consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, bundler: &BundlerFn, @@ -242,33 +166,3 @@ where Ok(()) } - -async fn block_imported( - processor: &ProcessorFn, - active_leaves: &mut HashMap>, - block_info: BlockInfo, -) -> Result<(), ApiError> -where - CBlock: BlockT, - ProcessorFn: Fn( - (CBlock::Hash, NumberFor, bool), - ) -> Pin> + Send>> - + Send - + Sync, -{ - match active_leaves.entry(block_info.hash) { - Entry::Vacant(entry) => entry.insert(block_info.number), - Entry::Occupied(entry) => { - debug_assert_eq!(*entry.get(), block_info.number); - return Ok(()); - } - }; - - if let Some(number) = active_leaves.remove(&block_info.parent_hash) { - debug_assert_eq!(block_info.number.saturating_sub(One::one()), number); - } - - processor((block_info.hash, block_info.number, block_info.is_new_best)).await?; - - Ok(()) -} diff --git a/domains/client/domain-operator/src/domain_worker_starter.rs b/domains/client/domain-operator/src/domain_worker_starter.rs index 62930c253b..5ef1da3fa3 100644 --- a/domains/client/domain-operator/src/domain_worker_starter.rs +++ b/domains/client/domain-operator/src/domain_worker_starter.rs @@ -16,13 +16,13 @@ use crate::bundle_processor::BundleProcessor; use crate::domain_bundle_producer::DomainBundleProducer; -use crate::domain_worker::{handle_block_import_notifications, handle_slot_notifications}; +use crate::domain_worker::{on_new_slot, throttling_block_import_notifications}; use crate::parent_chain::DomainParentChain; -use crate::utils::{BlockInfo, OperatorSlotInfo}; +use crate::utils::OperatorSlotInfo; use crate::{NewSlotNotification, OperatorStreams}; use domain_runtime_primitives::{DomainCoreApi, InherentExtrinsicApi}; use futures::channel::mpsc; -use futures::{future, FutureExt, Stream, StreamExt, TryFutureExt}; +use futures::{FutureExt, SinkExt, Stream, StreamExt, TryFutureExt}; use sc_client_api::{ AuxStore, BlockBackend, BlockImportNotification, BlockchainEvents, Finalizer, ProofProvider, }; @@ -50,12 +50,12 @@ pub(super) async fn start_worker< IBNS, CIBNS, NSNS, + ASS, E, >( spawn_essential: Box, consensus_client: Arc, consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, - client: Arc, is_authority: bool, bundle_producer: DomainBundleProducer< Block, @@ -67,8 +67,7 @@ pub(super) async fn start_worker< TransactionPool, >, bundle_processor: BundleProcessor, - operator_streams: OperatorStreams, - active_leaves: Vec>, + operator_streams: OperatorStreams, ) where Block: BlockT, Block::Hash: Into, @@ -102,6 +101,7 @@ pub(super) async fn start_worker< IBNS: Stream, mpsc::Sender<()>)> + Send + 'static, CIBNS: Stream> + Send + 'static, NSNS: Stream + Send + 'static, + ASS: Stream> + Send + 'static, E: CodeExecutor, { let span = tracing::Span::current(); @@ -111,77 +111,115 @@ pub(super) async fn start_worker< block_importing_notification_stream, imported_block_notification_stream, new_slot_notification_stream, + acknowledgement_sender_stream, _phantom, } = operator_streams; - let handle_block_import_notifications_fut = - handle_block_import_notifications::( + let mut throttled_block_import_notification_stream = + throttling_block_import_notifications::( spawn_essential, - consensus_client.as_ref(), - client.info().best_number, - { - let span = span.clone(); - - move |consensus_block_info| { - bundle_processor - .clone() - .process_bundles(consensus_block_info) - .instrument(span.clone()) - .boxed() - } - }, - active_leaves - .into_iter() - .map( - |BlockInfo { - hash, - parent_hash: _, - number, - is_new_best, - }| (hash, number, is_new_best), - ) - .collect(), + consensus_client.clone(), Box::pin(block_importing_notification_stream), Box::pin(imported_block_notification_stream), consensus_block_import_throttling_buffer_size, ); - let handle_slot_notifications_fut = handle_slot_notifications::( - consensus_client.as_ref(), - &consensus_offchain_tx_pool_factory, - move |consensus_block_info, slot_info| { - bundle_producer - .clone() - .produce_bundle(consensus_block_info.clone(), slot_info) - .instrument(span.clone()) - .unwrap_or_else(move |error| { - tracing::error!(?consensus_block_info, ?error, "Error at producing bundle."); - None - }) - .boxed() - }, - Box::pin(new_slot_notification_stream.map( - |(slot, global_randomness, acknowledgement_sender)| { - ( - OperatorSlotInfo { - slot, - global_randomness, - }, - acknowledgement_sender, - ) - }, - )), - ); - if is_authority { - info!("🧑‍🌾 Running as Operator..."); - let _ = future::select( - Box::pin(handle_block_import_notifications_fut), - Box::pin(handle_slot_notifications_fut), - ) - .await; - } else { + if !is_authority { info!("🧑‍ Running as Full node..."); - drop(handle_slot_notifications_fut); - handle_block_import_notifications_fut.await + drop(new_slot_notification_stream); + drop(acknowledgement_sender_stream); + while let Some(maybe_block_info) = throttled_block_import_notification_stream.next().await { + if let Some(block_info) = maybe_block_info { + if let Err(error) = bundle_processor + .clone() + .process_bundles((block_info.hash, block_info.number, block_info.is_new_best)) + .instrument(span.clone()) + .await + { + tracing::error!(?error, "Failed to process consensus block"); + // Bring down the service as bundles processor is an essential task. + // TODO: more graceful shutdown. + break; + } + } + } + } else { + info!("🧑‍🌾 Running as Operator..."); + let bundler_fn = { + let span = span.clone(); + move |consensus_block_info: sp_blockchain::HashAndNumber, slot_info| { + bundle_producer + .clone() + .produce_bundle(consensus_block_info.clone(), slot_info) + .instrument(span.clone()) + .unwrap_or_else(move |error| { + tracing::error!( + ?consensus_block_info, + ?error, + "Error at producing bundle." + ); + None + }) + .boxed() + } + }; + let mut new_slot_notification_stream = Box::pin(new_slot_notification_stream); + let mut acknowledgement_sender_stream = Box::pin(acknowledgement_sender_stream); + loop { + tokio::select! { + // Ensure any new slot/block import must handle first before the `acknowledgement_sender_stream` + // NOTE: this is only necessary for the test. + biased; + + Some((slot, global_randomness)) = new_slot_notification_stream.next() => { + if let Err(error) = on_new_slot::( + consensus_client.as_ref(), + consensus_offchain_tx_pool_factory.clone(), + &bundler_fn, + OperatorSlotInfo { + slot, + global_randomness, + }, + ) + .await + { + tracing::error!( + ?error, + "Error occurred on producing a bundle at slot {slot}" + ); + break; + } + } + Some(maybe_block_info) = throttled_block_import_notification_stream.next() => { + if let Some(block_info) = maybe_block_info { + if let Err(error) = bundle_processor + .clone() + .process_bundles(( + block_info.hash, + block_info.number, + block_info.is_new_best, + )) + .instrument(span.clone()) + .await + { + tracing::error!(?error, "Failed to process consensus block"); + // Bring down the service as bundles processor is an essential task. + // TODO: more graceful shutdown. + break; + } + } + } + // In production the `acknowledgement_sender_stream` is an empty stream, it only set to + // real stream in test + Some(mut acknowledgement_sender) = acknowledgement_sender_stream.next() => { + if let Err(err) = acknowledgement_sender.send(()).await { + tracing::error!( + ?err, + "Failed to send acknowledgement" + ); + } + } + } + } } } diff --git a/domains/client/domain-operator/src/lib.rs b/domains/client/domain-operator/src/lib.rs index a698694c4f..717345ab0e 100644 --- a/domains/client/domain-operator/src/lib.rs +++ b/domains/client/domain-operator/src/lib.rs @@ -82,21 +82,19 @@ pub use self::bootstrapper::{BootstrapResult, Bootstrapper}; pub use self::operator::Operator; pub use self::parent_chain::DomainParentChain; pub use self::utils::{DomainBlockImportNotification, DomainImportNotifications}; -use crate::utils::BlockInfo; use futures::channel::mpsc; use futures::Stream; use sc_client_api::{AuxStore, BlockImportNotification}; use sc_consensus::SharedBlockImport; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sc_utils::mpsc::TracingUnboundedSender; -use sp_api::ProvideRuntimeApi; use sp_blockchain::HeaderBackend; -use sp_consensus::{SelectChain, SyncOracle}; +use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use sp_domain_digests::AsPredigest; use sp_domains::{Bundle, DomainId, ExecutionReceipt}; use sp_keystore::KeystorePtr; -use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor, One, Saturating, Zero}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; use sp_runtime::DigestItem; use std::marker::PhantomData; use std::sync::Arc; @@ -123,7 +121,7 @@ type BundleSender = TracingUnboundedSender< >; /// Notification streams from the consensus chain driving the executor. -pub struct OperatorStreams { +pub struct OperatorStreams { /// Pause the consensus block import when the consensus chain client /// runs much faster than the domain client. pub consensus_block_import_throttling_buffer_size: u32, @@ -137,10 +135,13 @@ pub struct OperatorStreams { pub imported_block_notification_stream: CIBNS, /// New slot arrives. pub new_slot_notification_stream: NSNS, + /// The acknowledgement sender only used in test to ensure all of + /// the operator's previous tasks are finished + pub acknowledgement_sender_stream: ASS, pub _phantom: PhantomData, } -type NewSlotNotification = (Slot, Randomness, Option>); +type NewSlotNotification = (Slot, Randomness); pub struct OperatorParams< Block, @@ -153,12 +154,14 @@ pub struct OperatorParams< IBNS, CIBNS, NSNS, + ASS, > where Block: BlockT, CBlock: BlockT, IBNS: Stream, mpsc::Sender<()>)> + Send + 'static, CIBNS: Stream> + Send + 'static, NSNS: Stream + Send + 'static, + ASS: Stream> + Send + 'static, { pub domain_id: DomainId, pub domain_created_at: NumberFor, @@ -172,72 +175,11 @@ pub struct OperatorParams< pub is_authority: bool, pub keystore: KeystorePtr, pub bundle_sender: Arc>, - pub operator_streams: OperatorStreams, + pub operator_streams: OperatorStreams, pub domain_confirmation_depth: NumberFor, pub block_import: SharedBlockImport, } -/// Returns the active leaves the operator should start with. -/// -/// The longest chain is used as the fork choice for the leaves as the consensus block's fork choice -/// is only available in the imported consensus block notifications. -async fn active_leaves( - client: &CClient, - select_chain: &SC, -) -> Result>, sp_consensus::Error> -where - CBlock: BlockT, - CClient: HeaderBackend + ProvideRuntimeApi + 'static, - SC: SelectChain, -{ - let best_block = select_chain.best_chain().await?; - - // No leaves if starting from the genesis. - if best_block.number().is_zero() { - return Ok(Vec::new()); - } - - let mut leaves = select_chain - .leaves() - .await - .unwrap_or_default() - .into_iter() - .filter_map(|hash| { - let number = client.number(hash).ok()??; - - // Only consider leaves that are in maximum an uncle of the best block. - if number < best_block.number().saturating_sub(One::one()) || hash == best_block.hash() - { - return None; - }; - - let parent_hash = *client.header(hash).ok()??.parent_hash(); - - Some(BlockInfo { - hash, - parent_hash, - number, - is_new_best: false, - }) - }) - .collect::>(); - - // Sort by block number and get the maximum number of leaves - leaves.sort_by_key(|b| b.number); - - leaves.push(BlockInfo { - hash: best_block.hash(), - parent_hash: *best_block.parent_hash(), - number: *best_block.number(), - is_new_best: true, - }); - - /// The maximum number of active leaves we forward to the [`Operator`] on startup. - const MAX_ACTIVE_LEAVES: usize = 4; - - Ok(leaves.into_iter().rev().take(MAX_ACTIVE_LEAVES).collect()) -} - pub(crate) fn load_execution_receipt_by_domain_hash( domain_client: &Client, domain_hash: Block::Hash, diff --git a/domains/client/domain-operator/src/operator.rs b/domains/client/domain-operator/src/operator.rs index 95c56ae8d6..3ff560bd7f 100644 --- a/domains/client/domain-operator/src/operator.rs +++ b/domains/client/domain-operator/src/operator.rs @@ -4,7 +4,7 @@ use crate::domain_bundle_producer::DomainBundleProducer; use crate::domain_bundle_proposer::DomainBundleProposer; use crate::fraud_proof::FraudProofGenerator; use crate::parent_chain::DomainParentChain; -use crate::{active_leaves, DomainImportNotifications, NewSlotNotification, OperatorParams}; +use crate::{DomainImportNotifications, NewSlotNotification, OperatorParams}; use domain_runtime_primitives::{DomainCoreApi, InherentExtrinsicApi}; use futures::channel::mpsc; use futures::{FutureExt, Stream}; @@ -14,7 +14,6 @@ use sc_client_api::{ use sc_utils::mpsc::tracing_unbounded; use sp_api::ProvideRuntimeApi; use sp_blockchain::{HeaderBackend, HeaderMetadata}; -use sp_consensus::SelectChain; use sp_core::traits::{CodeExecutor, SpawnEssentialNamed}; use sp_core::H256; use sp_domains::{BundleProducerElectionApi, DomainsApi}; @@ -94,9 +93,8 @@ where E: CodeExecutor, { /// Create a new instance. - pub async fn new( + pub async fn new( spawn_essential: Box, - select_chain: &SC, params: OperatorParams< Block, CBlock, @@ -108,16 +106,15 @@ where IBNS, CIBNS, NSNS, + ASS, >, ) -> Result where - SC: SelectChain, IBNS: Stream, mpsc::Sender<()>)> + Send + 'static, CIBNS: Stream> + Send + 'static, NSNS: Stream + Send + 'static, + ASS: Stream> + Send + 'static, { - let active_leaves = active_leaves(params.consensus_client.as_ref(), select_chain).await?; - let parent_chain = DomainParentChain::new(params.domain_id, params.consensus_client.clone()); @@ -182,12 +179,10 @@ where spawn_essential.clone(), params.consensus_client.clone(), params.consensus_offchain_tx_pool_factory.clone(), - params.client.clone(), params.is_authority, bundle_producer, bundle_processor.clone(), params.operator_streams, - active_leaves, ) .boxed(), ); diff --git a/domains/client/domain-operator/src/utils.rs b/domains/client/domain-operator/src/utils.rs index 262cbb7bfe..4f3ec650b1 100644 --- a/domains/client/domain-operator/src/utils.rs +++ b/domains/client/domain-operator/src/utils.rs @@ -22,8 +22,6 @@ where { /// hash of the block. pub hash: Block::Hash, - /// hash of the parent block. - pub parent_hash: Block::Hash, /// block's number. pub number: NumberFor, /// Is this the new best block. diff --git a/domains/service/src/domain.rs b/domains/service/src/domain.rs index b851800361..8c6540d8c2 100644 --- a/domains/service/src/domain.rs +++ b/domains/service/src/domain.rs @@ -24,7 +24,7 @@ use serde::de::DeserializeOwned; use sp_api::{ApiExt, BlockT, ConstructRuntimeApi, Metadata, NumberFor, ProvideRuntimeApi}; use sp_block_builder::BlockBuilder; use sp_blockchain::{HeaderBackend, HeaderMetadata}; -use sp_consensus::{SelectChain, SyncOracle}; +use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; use sp_core::traits::SpawnEssentialNamed; use sp_core::{Decode, Encode}; @@ -233,7 +233,7 @@ where Ok(params) } -pub struct DomainParams +pub struct DomainParams where CBlock: BlockT, { @@ -243,8 +243,7 @@ where pub consensus_client: Arc, pub consensus_offchain_tx_pool_factory: OffchainTransactionPoolFactory, pub consensus_network_sync_oracle: Arc, - pub select_chain: SC, - pub operator_streams: OperatorStreams, + pub operator_streams: OperatorStreams, pub gossip_message_sink: GossipMessageSink, pub domain_message_receiver: TracingUnboundedReceiver>, pub provider: Provider, @@ -254,16 +253,16 @@ where pub async fn new_full< CBlock, CClient, - SC, IBNS, CIBNS, NSNS, + ASS, RuntimeApi, ExecutorDispatch, AccountId, Provider, >( - domain_params: DomainParams, + domain_params: DomainParams, ) -> sc_service::error::Result< NewFull< Arc>, @@ -293,10 +292,10 @@ where + RelayerApi> + MessengerApi> + BundleProducerElectionApi, - SC: SelectChain, IBNS: Stream, mpsc::Sender<()>)> + Send + 'static, CIBNS: Stream> + Send + 'static, - NSNS: Stream>)> + Send + 'static, + NSNS: Stream + Send + 'static, + ASS: Stream> + Send + 'static, RuntimeApi: ConstructRuntimeApi> + Send + Sync @@ -346,7 +345,6 @@ where consensus_client, consensus_offchain_tx_pool_factory, consensus_network_sync_oracle, - select_chain, operator_streams, gossip_message_sink, domain_message_receiver, @@ -455,7 +453,6 @@ where let operator = Operator::new( Box::new(spawn_essential.clone()), - &select_chain, OperatorParams { domain_id, domain_created_at, diff --git a/domains/test/service/src/domain.rs b/domains/test/service/src/domain.rs index a4fb71f659..486bc93259 100644 --- a/domains/test/service/src/domain.rs +++ b/domains/test/service/src/domain.rs @@ -211,6 +211,7 @@ where .block_importing_notification_stream(), imported_block_notification_stream, new_slot_notification_stream: mock_consensus_node.new_slot_notification_stream(), + acknowledgement_sender_stream: mock_consensus_node.new_acknowledgement_sender_stream(), _phantom: Default::default(), }; @@ -229,7 +230,6 @@ where mock_consensus_node.transaction_pool.clone(), ), consensus_network_sync_oracle: mock_consensus_node.sync_service.clone(), - select_chain: mock_consensus_node.select_chain.clone(), operator_streams, gossip_message_sink: gossip_msg_sink, domain_message_receiver, diff --git a/test/subspace-test-service/src/lib.rs b/test/subspace-test-service/src/lib.rs index 09e6c742fe..19a29b20de 100644 --- a/test/subspace-test-service/src/lib.rs +++ b/test/subspace-test-service/src/lib.rs @@ -243,8 +243,10 @@ pub struct MockConsensusNode { next_slot: u64, /// The slot notification subscribers #[allow(clippy::type_complexity)] - new_slot_notification_subscribers: - Vec>)>>, + new_slot_notification_subscribers: Vec>, + /// The acknowledgement sender subscribers + #[allow(clippy::type_complexity)] + acknowledgement_sender_subscribers: Vec>>, /// Block import pipeline #[allow(clippy::type_complexity)] block_import: MockBlockImport< @@ -384,6 +386,7 @@ impl MockConsensusNode { network_starter: Some(network_starter), next_slot: 1, new_slot_notification_subscribers: Vec::new(), + acknowledgement_sender_subscribers: Vec::new(), block_import, xdm_gossip_worker_builder: Some(GossipWorkerBuilder::new()), mock_solution, @@ -445,33 +448,11 @@ impl MockConsensusNode { &mut self, slot: Slot, ) -> Option, Hash, DomainNumber, H256, Balance>> { - let (slot_acknowledgement_sender, mut slot_acknowledgement_receiver) = mpsc::channel(0); - - // Must drop `slot_acknowledgement_sender` after the notification otherwise the receiver - // will block forever as there is still a sender not closed. - { - let value = ( - slot, - Randomness::from(Hash::random().to_fixed_bytes()), - Some(slot_acknowledgement_sender), - ); - self.new_slot_notification_subscribers - .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok()); - } - - // Wait for all the acknowledgements to progress and proactively drop closed subscribers. - loop { - select! { - res = slot_acknowledgement_receiver.next() => if res.is_none() { - break; - }, - // TODO: Workaround for https://github.com/smol-rs/async-channel/issues/23, remove once fix is released - _ = futures_timer::Delay::new(time::Duration::from_millis(500)).fuse() => { - self.new_slot_notification_subscribers.retain(|subscriber| !subscriber.is_closed()); - } - } - } + let value = (slot, Randomness::from(Hash::random().to_fixed_bytes())); + self.new_slot_notification_subscribers + .retain(|subscriber| subscriber.unbounded_send(value).is_ok()); + self.confirm_acknowledgement().await; self.get_bundle_from_tx_pool(slot.into()) } @@ -490,14 +471,54 @@ impl MockConsensusNode { } /// Subscribe the new slot notification - pub fn new_slot_notification_stream( - &mut self, - ) -> TracingUnboundedReceiver<(Slot, Randomness, Option>)> { + pub fn new_slot_notification_stream(&mut self) -> TracingUnboundedReceiver<(Slot, Randomness)> { let (tx, rx) = tracing_unbounded("subspace_new_slot_notification_stream", 100); self.new_slot_notification_subscribers.push(tx); rx } + /// Subscribe the acknowledgement sender stream + pub fn new_acknowledgement_sender_stream( + &mut self, + ) -> TracingUnboundedReceiver> { + let (tx, rx) = tracing_unbounded("subspace_acknowledgement_sender_stream", 100); + self.acknowledgement_sender_subscribers.push(tx); + rx + } + + /// Wait for all the acknowledgements before return + /// + /// It is used to wait for the acknowledgement of the domain worker to ensure it have + /// finish all the previous tasks before return + pub async fn confirm_acknowledgement(&mut self) { + let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0); + + // Must drop `acknowledgement_sender` after the notification otherwise the receiver + // will block forever as there is still a sender not closed. + { + self.acknowledgement_sender_subscribers + .retain(|subscriber| { + subscriber + .unbounded_send(acknowledgement_sender.clone()) + .is_ok() + }); + drop(acknowledgement_sender); + } + + // Wait for all the acknowledgements to progress and proactively drop closed subscribers. + loop { + select! { + res = acknowledgement_receiver.next() => if res.is_none() { + break; + }, + // TODO: Workaround for https://github.com/smol-rs/async-channel/issues/23, remove once fix is released + _ = futures_timer::Delay::new(time::Duration::from_millis(500)).fuse() => { + self.acknowledgement_sender_subscribers.retain(|subscriber| !subscriber.is_closed()); + } + } + } + } + /// Subscribe the block importing notification pub fn block_importing_notification_stream( &mut self, @@ -711,7 +732,7 @@ impl MockConsensusNode { log_new_block(&block, block_timer.elapsed().as_millis()); - match self.import_block(block, Some(storage_changes)).await { + let res = match self.import_block(block, Some(storage_changes)).await { Ok(hash) => { // Remove the tx of the imported block from the tx pool incase re-include them // in the future block by accident. @@ -719,7 +740,9 @@ impl MockConsensusNode { Ok(hash) } err => err, - } + }; + self.confirm_acknowledgement().await; + res } /// Produce a new block on top of the current best block, with the extrinsics collected from @@ -929,7 +952,7 @@ macro_rules! produce_blocks { async { let domain_fut = { let mut futs: Vec>>> = Vec::new(); - futs.push(Box::pin($operator_node.wait_for_blocks(1))); + futs.push(Box::pin($operator_node.wait_for_blocks($count))); $( futs.push( Box::pin( $domain_node.wait_for_blocks($count) ) ); )* futures::future::join_all(futs) };