diff --git a/src/storage/src/internal_control.rs b/src/storage/src/internal_control.rs index 904cf68bd5ee1..1e28b0e96f309 100644 --- a/src/storage/src/internal_control.rs +++ b/src/storage/src/internal_control.rs @@ -81,6 +81,8 @@ pub enum InternalStorageCommand { /// A frontier in the source time domain with the property that all updates not beyond it /// have already been durably ingested. source_resume_uppers: BTreeMap>, + /// The upper of the ingestion collection's shard. + ingestion_upper: Antichain, }, /// Render a sink dataflow. RunSinkDataflow( diff --git a/src/storage/src/storage_state.rs b/src/storage/src/storage_state.rs index 19049058903a7..262e6b18c8f26 100644 --- a/src/storage/src/storage_state.rs +++ b/src/storage/src/storage_state.rs @@ -96,7 +96,8 @@ use mz_storage_types::configuration::StorageConfiguration; use mz_storage_types::connections::ConnectionContext; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::sinks::{MetadataFilled, StorageSinkDesc}; -use mz_storage_types::sources::IngestionDescription; +use mz_storage_types::sources::envelope::{KeyEnvelope, NoneEnvelope}; +use mz_storage_types::sources::{IngestionDescription, SourceEnvelope}; use mz_storage_types::AlterCompatible; use mz_timely_util::builder_async::PressOnDropButton; use mz_txn_wal::operator::TxnsContext; @@ -504,6 +505,7 @@ impl<'w, A: Allocate> Worker<'w, A> { as_of, resume_uppers, source_resume_uppers, + ingestion_upper, } => { // NOTE: If we want to share the load of async processing we // have to change `handle_storage_command` and change this @@ -520,6 +522,7 @@ impl<'w, A: Allocate> Worker<'w, A> { as_of, resume_uppers, source_resume_uppers, + ingestion_upper, }, ); } @@ -623,6 +626,7 @@ impl<'w, A: Allocate> Worker<'w, A> { as_of, resume_uppers, source_resume_uppers, + ingestion_upper, } => { info!( ?as_of, @@ -632,6 +636,7 @@ impl<'w, A: Allocate> Worker<'w, A> { self.timely_worker.peers(), ); + let mut ingestion_stats_initialized = false; for (export_id, export) in ingestion_description.source_exports.iter() { let resume_upper = resume_uppers[export_id].clone(); self.storage_state.aggregated_statistics.initialize_source( @@ -649,6 +654,33 @@ impl<'w, A: Allocate> Worker<'w, A> { ) }, ); + if *export_id == ingestion_id { + ingestion_stats_initialized = true; + } + } + // TODO(database-issues#8620): Unconditionally create ingestion statistics once + // sources no longer export to primary collections and only export to explicit + // SourceExports (tables). + if !ingestion_stats_initialized { + self.storage_state.aggregated_statistics.initialize_source( + ingestion_id, + ingestion_upper.clone(), + || { + SourceStatistics::new( + ingestion_id, + self.storage_state.timely_worker_index, + &self.storage_state.metrics.source_statistics, + ingestion_id, + &ingestion_description.ingestion_metadata.data_shard, + // TODO(jkosh44) Does this envelope make sense? + SourceEnvelope::None(NoneEnvelope { + key_envelope: KeyEnvelope::None, + key_arity: 0, + }), + ingestion_upper, + ) + }, + ); } for id in ingestion_description.collection_ids() { diff --git a/src/storage/src/storage_state/async_storage_worker.rs b/src/storage/src/storage_state/async_storage_worker.rs index ef6673b418a1f..e17e2b6c17d7e 100644 --- a/src/storage/src/storage_state/async_storage_worker.rs +++ b/src/storage/src/storage_state/async_storage_worker.rs @@ -72,6 +72,8 @@ pub enum AsyncStorageWorkerResponse { /// A frontier in the source time domain with the property that all updates not beyond it /// have already been durably ingested. source_resume_uppers: BTreeMap>, + /// The upper of the ingestion collection's shard. + ingestion_upper: Antichain, }, } @@ -245,7 +247,7 @@ impl AsyncStorageWorker { ) .await .expect("error creating persist client"); - let read_handle = client + let ingestion_read_handle = client .open_leased_reader::( remap_shard, Arc::new(ingestion_description.desc.connection.timestamp_desc()), @@ -260,13 +262,34 @@ impl AsyncStorageWorker { ) .await .unwrap(); - let as_of = read_handle.since().clone(); + let as_of = ingestion_read_handle.since().clone(); mz_ore::task::spawn(move || "deferred_expire", async move { tokio::time::sleep(std::time::Duration::from_secs(300)).await; - read_handle.expire().await; + ingestion_read_handle.expire().await; }); let seen_remap_shard = remap_shard.clone(); + let mut ingestion_write_handle = client + .open_writer::( + ingestion_description.ingestion_metadata.data_shard, + Arc::new( + ingestion_description + .ingestion_metadata + .relation_desc + .clone(), + ), + Arc::new(UnitSchema), + Diagnostics { + shard_name: id.to_string(), + handle_purpose: format!("resumption data {}", id), + }, + ) + .await + .unwrap(); + let ingestion_upper = + ingestion_write_handle.fetch_recent_upper().await.clone(); + ingestion_write_handle.expire().await; + for (id, export) in ingestion_description.source_exports.iter() { // Explicit destructuring to force a compile error when the metadata change let CollectionMetadata { @@ -385,6 +408,7 @@ impl AsyncStorageWorker { as_of, resume_uppers, source_resume_uppers, + ingestion_upper, }); if let Err(_err) = res {