Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix inherents race condition #1974

Merged
merged 8 commits into from
Oct 26, 2023
38 changes: 16 additions & 22 deletions crates/sc-consensus-subspace/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,11 @@ impl<Block: BlockT> SubspaceLink<Block> {
self.archived_segment_notification_stream.clone()
}

/// Get stream with notifications about each imported block.
/// Get stream with notifications about each imported block right BEFORE import actually
/// happens.
///
/// NOTE: all Subspace checks have already happened for this block, but block can still
/// potentially fail to import in Substrate's internals.
pub fn block_importing_notification_stream(
&self,
) -> SubspaceNotificationStream<BlockImportingNotification<Block>> {
Expand Down Expand Up @@ -599,8 +603,6 @@ where
{
inner: I,
client: Arc<Client>,
block_importing_notification_sender:
SubspaceNotificationSender<BlockImportingNotification<Block>>,
subspace_link: SubspaceLink<Block>,
create_inherent_data_providers: CIDP,
chain_constants: ChainConstants,
Expand All @@ -620,7 +622,6 @@ where
SubspaceBlockImport {
inner: self.inner.clone(),
client: self.client.clone(),
block_importing_notification_sender: self.block_importing_notification_sender.clone(),
subspace_link: self.subspace_link.clone(),
create_inherent_data_providers: self.create_inherent_data_providers.clone(),
chain_constants: self.chain_constants,
Expand All @@ -641,14 +642,9 @@ where
AS: AuxStore + Send + Sync + 'static,
BlockNumber: From<<<Block as BlockT>::Header as HeaderT>::Number>,
{
// TODO: Create a struct for these parameters
#[allow(clippy::too_many_arguments)]
fn new(
client: Arc<Client>,
block_import: I,
block_importing_notification_sender: SubspaceNotificationSender<
BlockImportingNotification<Block>,
>,
subspace_link: SubspaceLink<Block>,
create_inherent_data_providers: CIDP,
chain_constants: ChainConstants,
Expand All @@ -658,7 +654,6 @@ where
Self {
client,
inner: block_import,
block_importing_notification_sender,
subspace_link,
create_inherent_data_providers,
chain_constants,
Expand Down Expand Up @@ -1000,8 +995,6 @@ where
let added_weight = calculate_block_weight(subspace_digest_items.solution_range);
let total_weight = parent_weight + added_weight;

let info = self.client.info();

aux_schema::write_block_weight(block_hash, total_weight, |values| {
block
.auxiliary
Expand Down Expand Up @@ -1034,12 +1027,14 @@ where
}
}

// The fork choice rule is that we pick the heaviest chain (i.e. smallest solution
// range), if there's a tie we go with the longest chain.
// The fork choice rule is that we pick the heaviest chain (i.e. smallest solution range),
// if there's a tie we go with the longest chain
let fork_choice = {
let info = self.client.info();

let last_best_weight = if &info.best_hash == block.header.parent_hash() {
// the parent=genesis case is already covered for loading parent weight,
// so we don't need to cover again here.
// the parent=genesis case is already covered for loading parent weight, so we don't
// need to cover again here
parent_weight
} else {
aux_schema::load_block_weight(&*self.client, info.best_hash)
Expand All @@ -1055,20 +1050,20 @@ where
};
block.fork_choice = Some(fork_choice);

let import_result = self.inner.import_block(block).await?;
let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);

self.block_importing_notification_sender
self.subspace_link
.block_importing_notification_sender
.notify(move || BlockImportingNotification {
block_number,
acknowledgement_sender,
});

while (acknowledgement_receiver.next().await).is_some() {
while acknowledgement_receiver.next().await.is_some() {
// Wait for all the acknowledgements to finish.
}

Ok(import_result)
self.inner.import_block(block).await
rahulksnv marked this conversation as resolved.
Show resolved Hide resolved
}

async fn check_block(
Expand Down Expand Up @@ -1129,7 +1124,7 @@ where
reward_signing_notification_stream,
archived_segment_notification_sender,
archived_segment_notification_stream,
block_importing_notification_sender: block_importing_notification_sender.clone(),
block_importing_notification_sender,
block_importing_notification_stream,
// TODO: Consider making `confirmation_depth_k` non-zero
segment_headers: Arc::new(Mutex::new(LruCache::new(
Expand All @@ -1145,7 +1140,6 @@ where
let import = SubspaceBlockImport::new(
client,
block_import_inner,
block_importing_notification_sender,
link.clone(),
create_inherent_data_providers,
chain_constants,
Expand Down
24 changes: 1 addition & 23 deletions crates/sc-consensus-subspace/src/slot_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use crate::archiver::SegmentHeadersStore;
use crate::{
BlockImportingNotification, NewSlotInfo, NewSlotNotification, RewardSigningNotification,
SubspaceLink,
};
use crate::{NewSlotInfo, NewSlotNotification, RewardSigningNotification, SubspaceLink};
use futures::channel::mpsc;
use futures::{StreamExt, TryFutureExt};
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -555,25 +552,6 @@ where
}
}

// TODO: This is a workaround for potential root cause of
// https://github.com/subspace/subspace/issues/871, also being discussed in
// https://substrate.stackexchange.com/questions/7886/is-block-creation-guaranteed-to-be-running-after-parent-block-is-fully-imported
if maybe_pre_digest.is_some() {
let block_number = *parent_header.number() + One::one();
let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);

self.subspace_link
.block_importing_notification_sender
.notify(move || BlockImportingNotification {
block_number,
acknowledgement_sender,
});

while (acknowledgement_receiver.next().await).is_some() {
// Wait for all the acknowledgements to finish.
}
}

maybe_pre_digest.map(|pre_digest| (pre_digest, pot_justification))
}

Expand Down
1 change: 1 addition & 0 deletions domains/client/domain-operator/src/domain_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ where
let _ = block_info_sender.feed(Some(block_info)).await;
}
maybe_block_importing = blocks_importing.next() => {
// TODO: remove the `block_number` from the notification since it is not used
let (_block_number, mut acknowledgement_sender) =
match maybe_block_importing {
Some(block_importing) => block_importing,
Expand Down
67 changes: 30 additions & 37 deletions test/subspace-test-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -469,17 +469,32 @@ impl MockConsensusNode {
}

// Wait for all the acknowledgements to progress and proactively drop closed subscribers.
loop {
select! {
res = acknowledgement_receiver.next() => if res.is_none() {
break;
},
// TODO: Workaround for https://github.com/smol-rs/async-channel/issues/23, remove once fix is released
_ = futures_timer::Delay::new(time::Duration::from_millis(500)).fuse() => {
self.acknowledgement_sender_subscribers.retain(|subscriber| !subscriber.is_closed());
}
}
while acknowledgement_receiver.next().await.is_some() {
// Wait for all the acknowledgements to finish.
}
}

/// Wait for the operator finish processing the consensus block before return
pub async fn confirm_block_import_processed(&mut self) {
// Send one more notification to ensure the previous consensus block import notificaion
// have received by the operator
let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);
{
// Must drop `block_import_acknowledgement_sender` after the notification otherwise
// the receiver will block forever as there is still a sender not closed.
// NOTE: it is okay to use the default block number since it is ignored in the consumer side.
let value = (NumberFor::<Block>::default(), acknowledgement_sender);
self.block_import
.block_importing_notification_subscribers
.lock()
.retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
}
while acknowledgement_receiver.next().await.is_some() {
// Wait for all the acknowledgements to finish.
}

// Ensure the operator finish processing the consensus block
self.confirm_acknowledgement().await;
}

/// Subscribe the block importing notification
Expand Down Expand Up @@ -704,7 +719,7 @@ impl MockConsensusNode {
}
err => err,
};
self.confirm_acknowledgement().await;
self.confirm_block_import_processed().await;
res
}

Expand Down Expand Up @@ -854,7 +869,6 @@ where
let block_number = *block.header.number();
block.fork_choice = Some(ForkChoiceStrategy::LongestChain);

let import_result = self.inner.import_block(block).await?;
let (acknowledgement_sender, mut acknowledgement_receiver) = mpsc::channel(0);

// Must drop `block_import_acknowledgement_sender` after the notification otherwise the receiver
Expand All @@ -863,35 +877,14 @@ where
let value = (block_number, acknowledgement_sender);
self.block_importing_notification_subscribers
.lock()
.retain(|subscriber| {
// It is necessary to notify the subscriber twice for each importing block in the test to ensure
// the imported block must be fully processed by the executor when all acknowledgements responded.
// This is because the `futures::channel::mpsc::channel` used in the executor have 1 slot even the
// `consensus_block_import_throttling_buffer_size` is set to 0 in the test, notify one more time can
// ensure the previously sent `block_imported` notification must be fully processed by the executor
// when the second acknowledgements responded.
// Please see https://github.com/subspace/subspace/pull/1363#discussion_r1162571291 for more details.
subscriber
.unbounded_send(value.clone())
.and_then(|_| subscriber.unbounded_send(value.clone()))
.is_ok()
});
.retain(|subscriber| subscriber.unbounded_send(value.clone()).is_ok());
}

// Wait for all the acknowledgements to progress and proactively drop closed subscribers.
loop {
select! {
res = acknowledgement_receiver.next() => if res.is_none() {
break;
},
// TODO: Workaround for https://github.com/smol-rs/async-channel/issues/23, remove once fix is released
_ = futures_timer::Delay::new(time::Duration::from_millis(500)).fuse() => {
self.block_importing_notification_subscribers.lock().retain(|subscriber| !subscriber.is_closed());
}
}
while acknowledgement_receiver.next().await.is_some() {
// Wait for all the acknowledgements to finish.
}

Ok(import_result)
self.inner.import_block(block).await
}

async fn check_block(
Expand Down