diff --git a/crates/sc-consensus-subspace/src/lib.rs b/crates/sc-consensus-subspace/src/lib.rs index d45b8a5def..51e8873f07 100644 --- a/crates/sc-consensus-subspace/src/lib.rs +++ b/crates/sc-consensus-subspace/src/lib.rs @@ -570,7 +570,11 @@ impl SubspaceLink { self.archived_segment_notification_stream.clone() } - /// Get stream with notifications about each imported block. + /// Get stream with notifications about each imported block right BEFORE import actually + /// happens. + /// + /// NOTE: all Subspace checks have already happened for this block, but block can still + /// potentially fail to import in Substrate's internals. pub fn block_importing_notification_stream( &self, ) -> SubspaceNotificationStream> { @@ -599,8 +603,6 @@ where { inner: I, client: Arc, - block_importing_notification_sender: - SubspaceNotificationSender>, subspace_link: SubspaceLink, create_inherent_data_providers: CIDP, chain_constants: ChainConstants, @@ -620,7 +622,6 @@ where SubspaceBlockImport { inner: self.inner.clone(), client: self.client.clone(), - block_importing_notification_sender: self.block_importing_notification_sender.clone(), subspace_link: self.subspace_link.clone(), create_inherent_data_providers: self.create_inherent_data_providers.clone(), chain_constants: self.chain_constants, @@ -641,14 +642,9 @@ where AS: AuxStore + Send + Sync + 'static, BlockNumber: From<<::Header as HeaderT>::Number>, { - // TODO: Create a struct for these parameters - #[allow(clippy::too_many_arguments)] fn new( client: Arc, block_import: I, - block_importing_notification_sender: SubspaceNotificationSender< - BlockImportingNotification, - >, subspace_link: SubspaceLink, create_inherent_data_providers: CIDP, chain_constants: ChainConstants, @@ -658,7 +654,6 @@ where Self { client, inner: block_import, - block_importing_notification_sender, subspace_link, create_inherent_data_providers, chain_constants, @@ -1000,8 +995,6 @@ where let added_weight = calculate_block_weight(subspace_digest_items.solution_range); let total_weight = parent_weight + added_weight; - let info = self.client.info(); - aux_schema::write_block_weight(block_hash, total_weight, |values| { block .auxiliary @@ -1034,12 +1027,14 @@ where } } - // The fork choice rule is that we pick the heaviest chain (i.e. smallest solution - // range), if there's a tie we go with the longest chain. + // The fork choice rule is that we pick the heaviest chain (i.e. smallest solution range), + // if there's a tie we go with the longest chain let fork_choice = { + let info = self.client.info(); + let last_best_weight = if &info.best_hash == block.header.parent_hash() { - // the parent=genesis case is already covered for loading parent weight, - // so we don't need to cover again here. + // the parent=genesis case is already covered for loading parent weight, so we don't + // need to cover again here parent_weight } else { aux_schema::load_block_weight(&*self.client, info.best_hash) @@ -1055,20 +1050,20 @@ where }; block.fork_choice = Some(fork_choice); - let import_result = self.inner.import_block(block).await?; let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0); - self.block_importing_notification_sender + self.subspace_link + .block_importing_notification_sender .notify(move || BlockImportingNotification { block_number, acknowledgement_sender, }); - while (acknowledgement_receiver.next().await).is_some() { + while acknowledgement_receiver.next().await.is_some() { // Wait for all the acknowledgements to finish. } - Ok(import_result) + self.inner.import_block(block).await } async fn check_block( @@ -1129,7 +1124,7 @@ where reward_signing_notification_stream, archived_segment_notification_sender, archived_segment_notification_stream, - block_importing_notification_sender: block_importing_notification_sender.clone(), + block_importing_notification_sender, block_importing_notification_stream, // TODO: Consider making `confirmation_depth_k` non-zero segment_headers: Arc::new(Mutex::new(LruCache::new( @@ -1145,7 +1140,6 @@ where let import = SubspaceBlockImport::new( client, block_import_inner, - block_importing_notification_sender, link.clone(), create_inherent_data_providers, chain_constants, diff --git a/crates/sc-consensus-subspace/src/slot_worker.rs b/crates/sc-consensus-subspace/src/slot_worker.rs index 7b4f3adf8b..f4c134880e 100644 --- a/crates/sc-consensus-subspace/src/slot_worker.rs +++ b/crates/sc-consensus-subspace/src/slot_worker.rs @@ -16,10 +16,7 @@ // along with this program. If not, see . use crate::archiver::SegmentHeadersStore; -use crate::{ - BlockImportingNotification, NewSlotInfo, NewSlotNotification, RewardSigningNotification, - SubspaceLink, -}; +use crate::{NewSlotInfo, NewSlotNotification, RewardSigningNotification, SubspaceLink}; use futures::channel::mpsc; use futures::{StreamExt, TryFutureExt}; use log::{debug, error, info, warn}; @@ -555,25 +552,6 @@ where } } - // TODO: This is a workaround for potential root cause of - // https://github.com/subspace/subspace/issues/871, also being discussed in - // https://substrate.stackexchange.com/questions/7886/is-block-creation-guaranteed-to-be-running-after-parent-block-is-fully-imported - if maybe_pre_digest.is_some() { - let block_number = *parent_header.number() + One::one(); - let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0); - - self.subspace_link - .block_importing_notification_sender - .notify(move || BlockImportingNotification { - block_number, - acknowledgement_sender, - }); - - while (acknowledgement_receiver.next().await).is_some() { - // Wait for all the acknowledgements to finish. - } - } - maybe_pre_digest.map(|pre_digest| (pre_digest, pot_justification)) } diff --git a/domains/client/domain-operator/src/domain_worker.rs b/domains/client/domain-operator/src/domain_worker.rs index 9d2a2ef000..6fd4165699 100644 --- a/domains/client/domain-operator/src/domain_worker.rs +++ b/domains/client/domain-operator/src/domain_worker.rs @@ -97,6 +97,7 @@ where let _ = block_info_sender.feed(Some(block_info)).await; } maybe_block_importing = blocks_importing.next() => { + // TODO: remove the `block_number` from the notification since it is not used let (_block_number, mut acknowledgement_sender) = match maybe_block_importing { Some(block_importing) => block_importing, diff --git a/test/subspace-test-service/src/lib.rs b/test/subspace-test-service/src/lib.rs index 28668c2ca6..7365bd8078 100644 --- a/test/subspace-test-service/src/lib.rs +++ b/test/subspace-test-service/src/lib.rs @@ -469,17 +469,32 @@ impl MockConsensusNode { } // Wait for all the acknowledgements to progress and proactively drop closed subscribers. - loop { - select! { - res = acknowledgement_receiver.next() => if res.is_none() { - break; - }, - // TODO: Workaround for https://github.com/smol-rs/async-channel/issues/23, remove once fix is released - _ = futures_timer::Delay::new(time::Duration::from_millis(500)).fuse() => { - self.acknowledgement_sender_subscribers.retain(|subscriber| !subscriber.is_closed()); - } - } + while acknowledgement_receiver.next().await.is_some() { + // Wait for all the acknowledgements to finish. + } + } + + /// Wait for the operator finish processing the consensus block before return + pub async fn confirm_block_import_processed(&mut self) { + // Send one more notification to ensure the previous consensus block import notificaion + // have received by the operator + let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0); + { + // Must drop `block_import_acknowledgement_sender` after the notification otherwise + // the receiver will block forever as there is still a sender not closed. + // NOTE: it is okay to use the default block number since it is ignored in the consumer side. + let value = (NumberFor::::default(), acknowledgement_sender); + self.block_import + .block_importing_notification_subscribers + .lock() + .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok()); + } + while acknowledgement_receiver.next().await.is_some() { + // Wait for all the acknowledgements to finish. } + + // Ensure the operator finish processing the consensus block + self.confirm_acknowledgement().await; } /// Subscribe the block importing notification @@ -704,7 +719,7 @@ impl MockConsensusNode { } err => err, }; - self.confirm_acknowledgement().await; + self.confirm_block_import_processed().await; res } @@ -854,7 +869,6 @@ where let block_number = *block.header.number(); block.fork_choice = Some(ForkChoiceStrategy::LongestChain); - let import_result = self.inner.import_block(block).await?; let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0); // Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver @@ -863,35 +877,14 @@ where let value = (block_number, acknowledgement_sender); self.block_importing_notification_subscribers .lock() - .retain(|subscriber| { - // It is necessary to notify the subscriber twice for each importing block in the test to ensure - // the imported block must be fully processed by the executor when all acknowledgements responded. - // This is because the `futures::channel::mpsc::channel` used in the executor have 1 slot even the - // `consensus_block_import_throttling_buffer_size` is set to 0 in the test, notify one more time can - // ensure the previously sent `block_imported` notification must be fully processed by the executor - // when the second acknowledgements responded. - // Please see https://github.com/subspace/subspace/pull/1363#discussion_r1162571291 for more details. - subscriber - .unbounded_send(value.clone()) - .and_then(|_| subscriber.unbounded_send(value.clone())) - .is_ok() - }); + .retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok()); } - // Wait for all the acknowledgements to progress and proactively drop closed subscribers. - loop { - select! { - res = acknowledgement_receiver.next() => if res.is_none() { - break; - }, - // TODO: Workaround for https://github.com/smol-rs/async-channel/issues/23, remove once fix is released - _ = futures_timer::Delay::new(time::Duration::from_millis(500)).fuse() => { - self.block_importing_notification_subscribers.lock().retain(|subscriber| !subscriber.is_closed()); - } - } + while acknowledgement_receiver.next().await.is_some() { + // Wait for all the acknowledgements to finish. } - Ok(import_result) + self.inner.import_block(block).await } async fn check_block(