Skip to content

Commit

Permalink
storage: Always initialize ingestion statistics
Browse files Browse the repository at this point in the history
Previously, collection statistics were only initialized for ingestion
dataflow outputs. When the `force_source_table_syntax` flag is enabled,
the ingestion collection is excluded from the ingestion dataflow
outputs. As a result, statistics are never created for the ingestion
collection. This causes later parts of the code to panic because it is
assumed that all ingestion collections have statistics initialized.

This commit fixes the issue by ensuring that statistics are always
initialized for ingestion collections, even if it's not included in
the dataflow outputs.

Works towards resolving #MaterializeInc/database-issues/issues/8620
  • Loading branch information
jkosh44 committed Jan 9, 2025
1 parent dff8199 commit 9ade587
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
2 changes: 2 additions & 0 deletions src/storage/src/internal_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub enum InternalStorageCommand {
/// A frontier in the source time domain with the property that all updates not beyond it
/// have already been durably ingested.
source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
/// The upper of the ingestion collection's shard.
ingestion_upper: Antichain<mz_repr::Timestamp>,
},
/// Render a sink dataflow.
RunSinkDataflow(
Expand Down
34 changes: 33 additions & 1 deletion src/storage/src/storage_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ use mz_storage_types::configuration::StorageConfiguration;
use mz_storage_types::connections::ConnectionContext;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc};
use mz_storage_types::sources::IngestionDescription;
use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope};
use mz_storage_types::sources::{IngestionDescription, SourceEnvelope};
use mz_storage_types::AlterCompatible;
use mz_timely_util::builder_async::PressOnDropButton;
use mz_txn_wal::operator::TxnsContext;
Expand Down Expand Up @@ -504,6 +505,7 @@ impl<'w, A: Allocate> Worker<'w, A> {
as_of,
resume_uppers,
source_resume_uppers,
ingestion_upper,
} => {
// NOTE: If we want to share the load of async processing we
// have to change `handle_storage_command` and change this
Expand All @@ -520,6 +522,7 @@ impl<'w, A: Allocate> Worker<'w, A> {
as_of,
resume_uppers,
source_resume_uppers,
ingestion_upper,
},
);
}
Expand Down Expand Up @@ -623,6 +626,7 @@ impl<'w, A: Allocate> Worker<'w, A> {
as_of,
resume_uppers,
source_resume_uppers,
ingestion_upper,
} => {
info!(
?as_of,
Expand All @@ -632,6 +636,7 @@ impl<'w, A: Allocate> Worker<'w, A> {
self.timely_worker.peers(),
);

let mut ingestion_stats_initialized = false;
for (export_id, export) in ingestion_description.source_exports.iter() {
let resume_upper = resume_uppers[export_id].clone();
self.storage_state.aggregated_statistics.initialize_source(
Expand All @@ -649,6 +654,33 @@ impl<'w, A: Allocate> Worker<'w, A> {
)
},
);
if *export_id == ingestion_id {
ingestion_stats_initialized = true;
}
}
// TODO(database-issues#8620): Unconditionally create ingestion statistics once
// sources no longer export to primary collections and only export to explicit
// SourceExports (tables).
if !ingestion_stats_initialized {
self.storage_state.aggregated_statistics.initialize_source(
ingestion_id,
ingestion_upper.clone(),
|| {
SourceStatistics::new(
ingestion_id,
self.storage_state.timely_worker_index,
&self.storage_state.metrics.source_statistics,
ingestion_id,
&ingestion_description.ingestion_metadata.data_shard,
// TODO(jkosh44) Does this envelope make sense?
SourceEnvelope::None(NoneEnvelope {
key_envelope: KeyEnvelope::None,
key_arity: 0,
}),
ingestion_upper,
)
},
);
}

for id in ingestion_description.collection_ids() {
Expand Down
30 changes: 27 additions & 3 deletions src/storage/src/storage_state/async_storage_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub enum AsyncStorageWorkerResponse<T: Timestamp + Lattice + Codec64> {
/// A frontier in the source time domain with the property that all updates not beyond it
/// have already been durably ingested.
source_resume_uppers: BTreeMap<GlobalId, Vec<Row>>,
/// The upper of the ingestion collection's shard.
ingestion_upper: Antichain<T>,
},
}

Expand Down Expand Up @@ -245,7 +247,7 @@ impl<T: Timestamp + Lattice + Codec64 + Display + Sync> AsyncStorageWorker<T> {
)
.await
.expect("error creating persist client");
let read_handle = client
let ingestion_read_handle = client
.open_leased_reader::<SourceData, (), T, Diff>(
remap_shard,
Arc::new(ingestion_description.desc.connection.timestamp_desc()),
Expand All @@ -260,13 +262,34 @@ impl<T: Timestamp + Lattice + Codec64 + Display + Sync> AsyncStorageWorker<T> {
)
.await
.unwrap();
let as_of = read_handle.since().clone();
let as_of = ingestion_read_handle.since().clone();
mz_ore::task::spawn(move || "deferred_expire", async move {
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
read_handle.expire().await;
ingestion_read_handle.expire().await;
});
let seen_remap_shard = remap_shard.clone();

let mut ingestion_write_handle = client
.open_writer::<SourceData, (), T, Diff>(
ingestion_description.ingestion_metadata.data_shard,
Arc::new(
ingestion_description
.ingestion_metadata
.relation_desc
.clone(),
),
Arc::new(UnitSchema),
Diagnostics {
shard_name: id.to_string(),
handle_purpose: format!("resumption data {}", id),
},
)
.await
.unwrap();
let ingestion_upper =
ingestion_write_handle.fetch_recent_upper().await.clone();
ingestion_write_handle.expire().await;

for (id, export) in ingestion_description.source_exports.iter() {
// Explicit destructuring to force a compile error when the metadata change
let CollectionMetadata {
Expand Down Expand Up @@ -385,6 +408,7 @@ impl<T: Timestamp + Lattice + Codec64 + Display + Sync> AsyncStorageWorker<T> {
as_of,
resume_uppers,
source_resume_uppers,
ingestion_upper,
});

if let Err(_err) = res {
Expand Down

0 comments on commit 9ade587

Please sign in to comment.