diff --git a/src/storage/src/source/reclock/compat.rs b/src/storage/src/source/reclock/compat.rs index 1e1a4d0255c56..91c52a35b9c1e 100644 --- a/src/storage/src/source/reclock/compat.rs +++ b/src/storage/src/source/reclock/compat.rs @@ -117,6 +117,30 @@ where // Allow manually simulating the scenario where the since of the remap // shard has advanced too far. fail_point!("invalid_remap_as_of"); + + if since.is_empty() { + // This can happen when, say, a source is being dropped but we on + // the cluster are busy and notice that only later. In those cases + // it can happen that we still try to render an ingestion that is + // not valid anymore and where the shards it uses are not valid to + // use anymore. + // + // This is a rare race condition and something that is expected to + // happen every now and then. It's not a bug in the current way of + // how things work. + tracing::info!( + source_id = %id, + %worker_id, + "since of remap shard is the empty antichain, suspending..."); + + // Suspending will make it so we don't make progress and downstream + // operators don't think we're making progress. And this dataflow + // will eventually be removed once the news about removal make it + // from the controller to the cluster. + std::future::pending::<()>().await; + unreachable!("pending future never returns"); + } + assert!( PartialOrder::less_equal(since, &as_of), "invalid as_of: as_of({as_of:?}) < since({since:?}), \