diff --git a/src/storage/src/storage_state/async_storage_worker.rs b/src/storage/src/storage_state/async_storage_worker.rs index 0ecbf7d7681ab..ef6673b418a1f 100644 --- a/src/storage/src/storage_state/async_storage_worker.rs +++ b/src/storage/src/storage_state/async_storage_worker.rs @@ -217,9 +217,55 @@ impl AsyncStorageWorker { // arbitrarily hold back collections to perform historical queries and when // the storage command protocol is updated such that these calculations are // performed by the controller and not here. - let mut as_of = Antichain::new(); let mut resume_uppers = BTreeMap::new(); - let mut seen_remap_shard = None; + + // TODO(petrosagg): The as_of of the ingestion should normally be based + // on the since frontiers of its outputs. Even though the storage + // controller makes sure to make downgrade decisions in an organized + // and ordered fashion, it then proceeds to persist them in an + // asynchronous and disorganized fashion to persist. The net effect is + // that upon restart, or upon observing the persist state like this + // function, one can see non-sensical results like the since of A be in + // advance of B even when B depends on A! This can happen because the + // downgrade of B gets reordered and lost. Here is our best attempt at + // playing detective of what the controller meant to do by blindly + // assuming that the since of the remap shard is a suitable since + // frontier without consulting the since frontier of the outputs. One + // day we will enforce order to chaos and this comment will be deleted. + let remap_shard = ingestion_description + .ingestion_metadata + .remap_shard + .expect("ingestions must have a remap shard"); + let client = persist_clients + .open( + ingestion_description + .ingestion_metadata + .persist_location + .clone(), + ) + .await + .expect("error creating persist client"); + let read_handle = client + .open_leased_reader::( + remap_shard, + Arc::new(ingestion_description.desc.connection.timestamp_desc()), + Arc::new(UnitSchema), + Diagnostics { + shard_name: ingestion_description + .remap_collection_id + .to_string(), + handle_purpose: format!("resumption data for {}", id), + }, + false, + ) + .await + .unwrap(); + let as_of = read_handle.since().clone(); + mz_ore::task::spawn(move || "deferred_expire", async move { + tokio::time::sleep(std::time::Duration::from_secs(300)).await; + read_handle.expire().await; + }); + let seen_remap_shard = remap_shard.clone(); for (id, export) in ingestion_description.source_exports.iter() { // Explicit destructuring to force a compile error when the metadata change @@ -263,63 +309,11 @@ impl AsyncStorageWorker { resume_uppers.insert(*id, upper); write_handle.expire().await; - // TODO(petrosagg): The as_of of the ingestion should normally be based - // on the since frontiers of its outputs. Even though the storage - // controller makes sure to make downgrade decisions in an organized - // and ordered fashion, it then proceeds to persist them in an - // asynchronous and disorganized fashion to persist. The net effect is - // that upon restart, or upon observing the persist state like this - // function, one can see non-sensical results like the since of A be in - // advance of B even when B depends on A! This can happen because the - // downgrade of B gets reordered and lost. Here is our best attempt at - // playing detective of what the controller meant to do by blindly - // assuming that the since of the remap shard is a suitable since - // frontier without consulting the since frontier of the outputs. One - // day we will enforce order to chaos and this comment will be deleted. if let Some(remap_shard) = remap_shard { - match seen_remap_shard.as_ref() { - None => { - let read_handle = client - .open_leased_reader::( - *remap_shard, - Arc::new( - ingestion_description - .desc - .connection - .timestamp_desc(), - ), - Arc::new(UnitSchema), - Diagnostics { - shard_name: ingestion_description - .remap_collection_id - .to_string(), - handle_purpose: format!( - "resumption data for {}", - id - ), - }, - false, - ) - .await - .unwrap(); - as_of.clone_from(read_handle.since()); - mz_ore::task::spawn( - move || "deferred_expire", - async move { - tokio::time::sleep(std::time::Duration::from_secs( - 300, - )) - .await; - read_handle.expire().await; - }, - ); - seen_remap_shard = Some(remap_shard.clone()); - } - Some(shard) => assert_eq!( - shard, remap_shard, - "ingestion with multiple remap shards" - ), - } + assert_eq!( + seen_remap_shard, *remap_shard, + "ingestion with multiple remap shards" + ); } }