Skip to content

Commit

Permalink
storage: Simplify dataflow as_of computation
Browse files Browse the repository at this point in the history
Previously, async storage workers would calculate the as_of of a
dataflow from the since of the dataflow's output's remap shards. If
there was more than one distinct remap shard among the outputs, then
the storage worker would panic. It's expected that the only collection
that will ever have a remap shard is the ingestion collection itself.

Furthermore, we are planning to remove the ingestion collection from
the outputs of the dataflow (in fact there's already a feature flag
that does this). If the ingestion is removed from the outputs, then no
output will have a remap shard, and the as_of will always be empty.

This commit simplifies the existing as_of calculation and fixes the
as_of calculation when the ingestion collection is removed from the
outputs. It does this by calculating the as_of directly from the
ingestion's remap shard. Additionally, it asserts that if any of the
outputs have a remap shard, then it must be equal to the ingestion's
remap shard.

Works towards resolving #MaterializeInc/database-issues/issues/8620
  • Loading branch information
jkosh44 committed Jan 8, 2025
1 parent 42af451 commit a8ad4ba
Showing 1 changed file with 52 additions and 58 deletions.
110 changes: 52 additions & 58 deletions src/storage/src/storage_state/async_storage_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,55 @@ impl<T: Timestamp + Lattice + Codec64 + Display + Sync> AsyncStorageWorker<T> {
// arbitrarily hold back collections to perform historical queries and when
// the storage command protocol is updated such that these calculations are
// performed by the controller and not here.
let mut as_of = Antichain::new();
let mut resume_uppers = BTreeMap::new();
let mut seen_remap_shard = None;

// TODO(petrosagg): The as_of of the ingestion should normally be based
// on the since frontiers of its outputs. Even though the storage
// controller makes sure to make downgrade decisions in an organized
// and ordered fashion, it then proceeds to persist them in an
// asynchronous and disorganized fashion to persist. The net effect is
// that upon restart, or upon observing the persist state like this
// function, one can see non-sensical results like the since of A be in
// advance of B even when B depends on A! This can happen because the
// downgrade of B gets reordered and lost. Here is our best attempt at
// playing detective of what the controller meant to do by blindly
// assuming that the since of the remap shard is a suitable since
// frontier without consulting the since frontier of the outputs. One
// day we will enforce order to chaos and this comment will be deleted.
let remap_shard = ingestion_description
.ingestion_metadata
.remap_shard
.expect("ingestions must have a remap shard");
let client = persist_clients
.open(
ingestion_description
.ingestion_metadata
.persist_location
.clone(),
)
.await
.expect("error creating persist client");
let read_handle = client
.open_leased_reader::<SourceData, (), T, Diff>(
remap_shard,
Arc::new(ingestion_description.desc.connection.timestamp_desc()),
Arc::new(UnitSchema),
Diagnostics {
shard_name: ingestion_description
.remap_collection_id
.to_string(),
handle_purpose: format!("resumption data for {}", id),
},
false,
)
.await
.unwrap();
let as_of = read_handle.since().clone();
mz_ore::task::spawn(move || "deferred_expire", async move {
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
read_handle.expire().await;
});
let seen_remap_shard = remap_shard.clone();

for (id, export) in ingestion_description.source_exports.iter() {
// Explicit destructuring to force a compile error when the metadata change
Expand Down Expand Up @@ -263,63 +309,11 @@ impl<T: Timestamp + Lattice + Codec64 + Display + Sync> AsyncStorageWorker<T> {
resume_uppers.insert(*id, upper);
write_handle.expire().await;

// TODO(petrosagg): The as_of of the ingestion should normally be based
// on the since frontiers of its outputs. Even though the storage
// controller makes sure to make downgrade decisions in an organized
// and ordered fashion, it then proceeds to persist them in an
// asynchronous and disorganized fashion to persist. The net effect is
// that upon restart, or upon observing the persist state like this
// function, one can see non-sensical results like the since of A be in
// advance of B even when B depends on A! This can happen because the
// downgrade of B gets reordered and lost. Here is our best attempt at
// playing detective of what the controller meant to do by blindly
// assuming that the since of the remap shard is a suitable since
// frontier without consulting the since frontier of the outputs. One
// day we will enforce order to chaos and this comment will be deleted.
if let Some(remap_shard) = remap_shard {
match seen_remap_shard.as_ref() {
None => {
let read_handle = client
.open_leased_reader::<SourceData, (), T, Diff>(
*remap_shard,
Arc::new(
ingestion_description
.desc
.connection
.timestamp_desc(),
),
Arc::new(UnitSchema),
Diagnostics {
shard_name: ingestion_description
.remap_collection_id
.to_string(),
handle_purpose: format!(
"resumption data for {}",
id
),
},
false,
)
.await
.unwrap();
as_of.clone_from(read_handle.since());
mz_ore::task::spawn(
move || "deferred_expire",
async move {
tokio::time::sleep(std::time::Duration::from_secs(
300,
))
.await;
read_handle.expire().await;
},
);
seen_remap_shard = Some(remap_shard.clone());
}
Some(shard) => assert_eq!(
shard, remap_shard,
"ingestion with multiple remap shards"
),
}
assert_eq!(
seen_remap_shard, *remap_shard,
"ingestion with multiple remap shards"
);
}
}

Expand Down

0 comments on commit a8ad4ba

Please sign in to comment.