diff --git a/Cargo.lock b/Cargo.lock index 21a1db322eb3f..6797f59041cc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2369,9 +2369,9 @@ dependencies = [ [[package]] name = "differential-dataflow" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f47ab549a056e3959ce2036d6b20ba369a75e2a0605ccf64256e153626b352" +checksum = "75b93af605b7f82fbf6e671a5bb0f940b385e57254a0af59ce6dfb98b8c4b302" dependencies = [ "columnar", "fnv", @@ -2381,9 +2381,9 @@ dependencies = [ [[package]] name = "differential-dogs3" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ece5ac3c62f1cb6e21560b53102963aa91513d3f72911d99b6620f2d3ac6b1" +checksum = "17ba29145a1df1bdc3da1142eeb991f0866620c79ce9d85e83a3837f29112ba0" dependencies = [ "differential-dataflow", "serde", @@ -10406,9 +10406,9 @@ dependencies = [ [[package]] name = "timely" -version = "0.16.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ace21eb2a22c1b80b0b9b5be0627eb0f95a128a5751e866aefc90a85e3007d3" +checksum = "0506b6506ef04c371ea6151942df60e309e7f5e710458a6b533e364ee0b3cab3" dependencies = [ "bincode", "byteorder", diff --git a/src/adapter-types/Cargo.toml b/src/adapter-types/Cargo.toml index 5383238d65749..d1cdc0ae56507 100644 --- a/src/adapter-types/Cargo.toml +++ b/src/adapter-types/Cargo.toml @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" } mz-repr = { path = "../repr" } mz-storage-types = { path = "../storage-types" } serde = "1.0.152" -timely = "0.16.0" +timely = "0.17.0" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } [package.metadata.cargo-udeps.ignore] diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index a4298b2fe1703..92c489f39a4e5 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" enum-kinds = "0.5.1" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" @@ -78,7 +78,7 @@ serde_plain = "1.0.1" sha2 = "0.10.6" smallvec = { version = "1.10.0", features = ["union"] } static_assertions = "1.1" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["rt", "time"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 2751f63aec9c3..6d1e030684df6 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" ipnet = "2.5.0" itertools = "0.12.1" @@ -60,7 +60,7 @@ serde_plain = "1.0.1" static_assertions = "1.1" sha2 = "0.10.6" thiserror = "1.0.37" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0" } tracing = "0.1.37" uuid = "1.2.2" diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index f14e5c808b7a9..a8b218c9fc2b3 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -5567,15 +5567,22 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER: LazyLock = oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER_OID, column_defs: None, sql: "SELECT - address, + addr2.id, + reachability.worker_id, port, - worker_id, update_type, time, pg_catalog.count(*) as count FROM - mz_introspection.mz_dataflow_operator_reachability_raw -GROUP BY address, port, worker_id, update_type, time", + mz_introspection.mz_dataflow_operator_reachability_raw reachability, + mz_introspection.mz_dataflow_addresses_per_worker addr1, + mz_introspection.mz_dataflow_addresses_per_worker addr2 +WHERE + addr2.address = addr1.address || reachability.source + AND addr1.id = reachability.id + AND addr1.worker_id = reachability.worker_id + AND addr2.worker_id = reachability.worker_id +GROUP BY addr2.id, reachability.worker_id, port, update_type, time", access: vec![PUBLIC_SELECT], }); @@ -5587,13 +5594,13 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY: LazyLock = column_defs: None, sql: " SELECT - address, + id, port, update_type, time, pg_catalog.sum(count) as count FROM mz_introspection.mz_dataflow_operator_reachability_per_worker -GROUP BY address, port, update_type, time", +GROUP BY id, port, update_type, time", access: vec![PUBLIC_SELECT], }); diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index d40e4100b2481..43ca3a040c8c2 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.66" async-trait = "0.1.83" crossbeam-channel = "0.5.8" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" mz-cluster-client = { path = "../cluster-client" } mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] } @@ -21,7 +21,7 @@ mz-persist-client = { path = "../persist-client" } mz-service = { path = "../service" } mz-txn-wal = { path = "../txn-wal" } regex = "1.7.0" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute-client/Cargo.toml b/src/compute-client/Cargo.toml index dd09ba95f42ee..c7ed7aa902e90 100644 --- a/src/compute-client/Cargo.toml +++ b/src/compute-client/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytesize = "1.1.0" crossbeam-channel = "0.5.8" derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" http = "1.1.0" mz-build-info = { path = "../build-info" } @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.125" thiserror = "1.0.37" -timely = "0.16.0" +timely = "0.17.0" tokio = "1.38.0" tokio-stream = "0.1.11" tonic = "0.12.1" diff --git a/src/compute-client/src/logging.rs b/src/compute-client/src/logging.rs index c6f56ae1a8982..c0a617d8af4bb 100644 --- a/src/compute-client/src/logging.rs +++ b/src/compute-client/src/logging.rs @@ -448,16 +448,10 @@ impl LogVariant { .finish(), LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder() - .with_column( - "address", - ScalarType::List { - element_type: Box::new(ScalarType::UInt64), - custom_id: None, - } - .nullable(false), - ) - .with_column("port", ScalarType::UInt64.nullable(false)) + .with_column("id", ScalarType::UInt64.nullable(false)) .with_column("worker_id", ScalarType::UInt64.nullable(false)) + .with_column("source", ScalarType::UInt64.nullable(false)) + .with_column("port", ScalarType::UInt64.nullable(false)) .with_column("update_type", ScalarType::String.nullable(false)) .with_column("time", ScalarType::MzTimestamp.nullable(true)) .finish(), diff --git a/src/compute-types/Cargo.toml b/src/compute-types/Cargo.toml index a82db2a67b378..4e368867b8edd 100644 --- a/src/compute-types/Cargo.toml +++ b/src/compute-types/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] columnar = "0.2.2" columnation = "0.1.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" itertools = "0.12.1" mz-dyncfg = { path = "../dyncfg" } mz-expr = { path = "../expr" } @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.16.0" +timely = "0.17.0" tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 3f9ba4d140a3d..e972dbb2f1782 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -16,8 +16,8 @@ bytesize = "1.1.0" columnar = "0.2.2" crossbeam-channel = "0.5.8" dec = { version = "0.4.8", features = ["serde"] } -differential-dataflow = "0.13.3" -differential-dogs3 = "0.1.3" +differential-dataflow = "0.13.4" +differential-dogs3 = "0.1.4" futures = "0.3.25" itertools = "0.12.1" lgalloc = "0.4" @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false } scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } smallvec = { version = "1.10.0", features = ["serde", "union"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index 23a9a18f02f6f..a024dbcf9dc8f 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -115,18 +115,22 @@ where /// /// This is just a bundle-type intended to make passing around its contents in the logging /// initialization code more convenient. +/// +/// The `N` type parameter specifies the number of links to create for the event queue. We need +/// separate links for queues that feed from multiple loggers because the `EventLink` type is not +/// multi-producer safe. #[derive(Clone)] -struct EventQueue { - link: Rc>, +struct EventQueue { + links: [Rc>; N], activator: RcActivator, } -impl EventQueue { +impl EventQueue { fn new(name: &str) -> Self { let activator_name = format!("{name}_activator"); let activate_after = 128; Self { - link: Rc::new(EventLink::new()), + links: [(); N].map(|_| Rc::new(EventLink::new())), activator: RcActivator::new(activator_name, activate_after), } } diff --git a/src/compute/src/logging/compute.rs b/src/compute/src/logging/compute.rs index 049f952b2f50a..8c0610d3b055a 100644 --- a/src/compute/src/logging/compute.rs +++ b/src/compute/src/logging/compute.rs @@ -308,16 +308,16 @@ pub(super) fn construct( worker.dataflow_named("Dataflow: compute logging", move |scope| { let enable_logging = config.enable_logging; - let (logs, token) = Some(event_queue.link).mz_replay::<_, ProvidedBuilder<_>, _>( + let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>( scope, "compute logs", config.interval, event_queue.activator, - move |mut session, data| { + move |mut session, mut data| { // If logging is disabled, we still need to install the indexes, but we can leave them // empty. We do so by immediately filtering all logs events. if enable_logging { - session.give_container(&mut data.clone()) + session.give_container(data.to_mut()) } }, ); diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 42ae6e508bdec..22364045d0522 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -20,15 +20,15 @@ use differential_dataflow::logging::{ }; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; -use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; +use mz_timely_util::containers::{ + columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder, +}; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::operators::Filter; use timely::dataflow::Stream; use crate::extensions::arrange::MzArrangeCore; @@ -57,21 +57,22 @@ pub(super) fn construct( let dataflow_index = worker.next_dataflow_index(); worker.dataflow_named("Dataflow: differential logging", move |scope| { - let (mut logs, token) = Some(event_queue.link) - .mz_replay::<_, CapacityContainerBuilder<_>, _>( + let enable_logging = config.enable_logging; + let (logs, token) = event_queue.links.clone() + .mz_replay::<_, ProvidedBuilder<_>, _>( scope, "differential logs", config.interval, event_queue.activator, - |mut session, data| session.give_iterator(data.iter()), + move |mut session, mut data|{ + // If logging is disabled, we still need to install the indexes, but we can leave them + // empty. We do so by immediately filtering all logs events. + if enable_logging { + session.give_container(data.to_mut()) + } + } ); - // If logging is disabled, we still need to install the indexes, but we can leave them - // empty. We do so by immediately filtering all logs events. - if !config.enable_logging { - logs = logs.filter(|_| false); - } - // Build a demux operator that splits the replayed event stream up into the separate // logging streams. let mut demux = diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 9c4a51fc7adf0..1878f75f1cff5 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -21,8 +21,8 @@ use mz_timely_util::containers::{Column, ColumnBuilder}; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::{ContainerBuilder, PushInto}; -use timely::logging::{ProgressEventTimestamp, TimelyEvent, TimelyEventBuilder}; -use timely::logging_core::Logger; +use timely::logging::{TimelyEvent, TimelyEventBuilder}; +use timely::logging_core::{Logger, Registry}; use timely::order::Product; use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder}; @@ -81,10 +81,7 @@ pub fn initialize( (logger, traces) } -pub(super) type ReachabilityEvent = ( - Vec, - Vec<(usize, usize, bool, Option, Diff)>, -); +pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>); struct LoggingContext<'a, A: Allocate> { worker: &'a mut timely::worker::Worker, @@ -93,7 +90,7 @@ struct LoggingContext<'a, A: Allocate> { now: Instant, start_offset: Duration, t_event_queue: EventQueue>, - r_event_queue: EventQueue>, + r_event_queue: EventQueue, 3>, d_event_queue: EventQueue>, c_event_queue: EventQueue>, shared_state: Rc>, @@ -149,15 +146,35 @@ impl LoggingContext<'_, A> { .collect() } + /// Construct a new reachability logger for timestamp type `T`. + /// + /// Inserts a logger with the name `timely/reachability/{type_name::()}`, following + /// Timely naming convention. + fn register_reachability_logger( + &self, + registry: &mut Registry, + index: usize, + ) { + let logger = self.reachability_logger::(index); + let type_name = std::any::type_name::(); + registry.insert_logger(&format!("timely/reachability/{type_name}"), logger); + } + + /// Register all loggers with the timely worker. + /// + /// Registers the timely, differential, compute, and reachability loggers. fn register_loggers(&self) { let t_logger = self.simple_logger::(self.t_event_queue.clone()); - let r_logger = self.reachability_logger(); let d_logger = self.simple_logger::(self.d_event_queue.clone()); let c_logger = self.simple_logger::(self.c_event_queue.clone()); let mut register = self.worker.log_register(); register.insert_logger("timely", t_logger); - register.insert_logger("timely/reachability", r_logger); + // Note that each reachability logger has a unique index, this is crucial to avoid dropping + // data. + self.register_reachability_logger::(&mut register, 0); + self.register_reachability_logger::>>(&mut register, 1); + self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2); register.insert_logger("differential/arrange", d_logger); register.insert_logger("materialize/compute", c_logger.clone()); @@ -168,7 +185,9 @@ impl LoggingContext<'_, A> { &self, event_queue: EventQueue, ) -> Logger { - let mut logger = BatchLogger::new(event_queue.link, self.interval_ms); + let [link] = event_queue.links; + let mut logger = BatchLogger::new(link, self.interval_ms); + let activator = event_queue.activator.clone(); Logger::new( self.now, self.start_offset, @@ -176,18 +195,23 @@ impl LoggingContext<'_, A> { if let Some(data) = data.take() { logger.publish_batch(data); } else if logger.report_progress(*time) { - event_queue.activator.activate(); + activator.activate(); } }, ) } - fn reachability_logger(&self) -> Logger { - let event_queue = self.r_event_queue.clone(); - - let mut logger = BatchLogger::new(event_queue.link, self.interval_ms); + /// Construct a reachability logger for timestamp type `T`. The index must + /// refer to a unique link in the reachability event queue. + fn reachability_logger(&self, index: usize) -> Logger> + where + T: ExtractTimestamp, + { + let link = Rc::clone(&self.r_event_queue.links[index]); + let mut logger = BatchLogger::new(link, self.interval_ms); let mut massaged = Vec::new(); let mut builder = ColumnBuilder::default(); + let activator = self.r_event_queue.activator.clone(); let action = move |batch_time: &Duration, data: &mut Option>| { if let Some(data) = data { @@ -197,25 +221,23 @@ impl LoggingContext<'_, A> { TrackerEvent::SourceUpdate(update) => { massaged.extend(update.updates.iter().map( |(node, port, time, diff)| { - let ts = try_downcast_timestamp(time); let is_source = true; - (*node, *port, is_source, ts, *diff) + (*node, *port, is_source, T::extract(time), *diff) }, )); - builder.push_into((time, (&update.tracker_id, &massaged))); + builder.push_into((time, (update.tracker_id, &massaged))); massaged.clear(); } TrackerEvent::TargetUpdate(update) => { massaged.extend(update.updates.iter().map( |(node, port, time, diff)| { - let ts = try_downcast_timestamp(time); let is_source = false; - (*node, *port, is_source, ts, *diff) + (*node, *port, is_source, time.extract(), *diff) }, )); - builder.push_into((time, (&update.tracker_id, &massaged))); + builder.push_into((time, (update.tracker_id, &massaged))); massaged.clear(); } } @@ -230,7 +252,7 @@ impl LoggingContext<'_, A> { } if logger.report_progress(*batch_time) { - event_queue.activator.activate(); + activator.activate(); } } }; @@ -239,20 +261,26 @@ impl LoggingContext<'_, A> { } } -/// Extracts a `Timestamp` from a `dyn ProgressEventTimestamp`. -/// -/// For nested timestamps, only extracts the outermost one. The rest of the timestamps are -/// ignored for now. -#[inline] -fn try_downcast_timestamp(time: &dyn ProgressEventTimestamp) -> Option { - let time_any = time.as_any(); - time_any - .downcast_ref::() - .copied() - .or_else(|| { - time_any - .downcast_ref::>>() - .map(|t| t.outer) - }) - .or_else(|| time_any.downcast_ref::<(Timestamp, Subtime)>().map(|t| t.0)) +/// Helper trait to extract a timestamp from various types of timestamp used in rendering. +trait ExtractTimestamp: Clone + 'static { + /// Extracts the timestamp from the type. + fn extract(&self) -> Timestamp; +} + +impl ExtractTimestamp for Timestamp { + fn extract(&self) -> Timestamp { + *self + } +} + +impl ExtractTimestamp for Product> { + fn extract(&self) -> Timestamp { + self.outer + } +} + +impl ExtractTimestamp for (Timestamp, Subtime) { + fn extract(&self) -> Timestamp { + self.0 + } } diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 079ea5972e332..463a30f21430b 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -39,7 +39,7 @@ use crate::typedefs::RowRowSpine; pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, - event_queue: EventQueue>, + event_queue: EventQueue, 3>, ) -> BTreeMap { let interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_index = worker.index(); @@ -48,10 +48,10 @@ pub(super) fn construct( // A dataflow for multiple log-derived arrangements. let traces = worker.dataflow_named("Dataflow: timely reachability logging", move |scope| { let enable_logging = config.enable_logging; - type UpdatesKey = (bool, Vec, usize, usize, Option); + type UpdatesKey = (bool, usize, usize, usize, Timestamp); type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>; - let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>( + let (updates, token) = event_queue.links.mz_replay::<_, CB, _>( scope, "reachability logs", config.interval, @@ -62,11 +62,11 @@ pub(super) fn construct( if !enable_logging { return; } - for (time, (addr, massaged)) in data.iter() { + for (time, (operator_id, massaged)) in data.iter() { let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms; let time_ms: Timestamp = time_ms.try_into().expect("must fit"); for (source, port, update_type, ts, diff) in massaged.into_iter() { - let datum = (update_type, addr, source, port, ts); + let datum = (update_type, operator_id, source, port, ts); session.give(((datum, ()), time_ms, diff)); } } @@ -76,22 +76,17 @@ pub(super) fn construct( // Restrict results by those logs that are meant to be active. let logs_active = [LogVariant::Timely(TimelyLog::Reachability)]; - let mut addr_row = Row::default(); let updates = consolidate_and_pack::<_, Col2ValBatcher, ColumnBuilder<_>, _, _>( &updates, TimelyLog::Reachability, move |((datum, ()), time, diff), packer, session| { - let (update_type, addr, source, port, ts) = datum; + let (update_type, operator_id, source, port, ts) = datum; let update_type = if *update_type { "source" } else { "target" }; - addr_row.packer().push_list( - IntoIterator::into_iter(addr) - .chain(std::iter::once(source)) - .map(|id| Datum::UInt64(u64::cast_from(*id))), - ); let data = packer.pack_slice(&[ - addr_row.iter().next().unwrap(), - Datum::UInt64(u64::cast_from(*port)), + Datum::UInt64(u64::cast_from(*operator_id)), Datum::UInt64(u64::cast_from(worker_index)), + Datum::UInt64(u64::cast_from(*source)), + Datum::UInt64(u64::cast_from(*port)), Datum::String(update_type), Datum::from(*ts), ]); diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index f98f493af8fda..d8e0776e85a43 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -18,11 +18,12 @@ use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; -use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; +use mz_timely_util::containers::{ + columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder, +}; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; use timely::container::columnation::{Columnation, CopyRegion}; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; @@ -61,20 +62,19 @@ pub(super) fn construct( worker.dataflow_named("Dataflow: timely logging", move |scope| { let enable_logging = config.enable_logging; let (logs, token) = - Some(event_queue.link).mz_replay::<_, CapacityContainerBuilder<_>, _>( + event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>( scope, "timely logs", config.interval, event_queue.activator, - move |mut session, data| { + move |mut session, mut data| { // If logging is disabled, we still need to install the indexes, but we can leave them // empty. We do so by immediately filtering all logs events. if enable_logging { - session.give_iterator(data.iter()) + session.give_container(data.to_mut()) } }, ); - let logs = logs.container::>(); // Build a demux operator that splits the replayed event stream up into the separate // logging streams. diff --git a/src/controller/Cargo.toml b/src/controller/Cargo.toml index b31042d9424e7..d73475b4d1d47 100644 --- a/src/controller/Cargo.toml +++ b/src/controller/Cargo.toml @@ -32,7 +32,7 @@ mz-txn-wal = { path = "../txn-wal" } regex = "1.7.0" serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" tokio = "1.38.0" tracing = "0.1.37" uuid = { version = "1.7.0" } diff --git a/src/durable-cache/Cargo.toml b/src/durable-cache/Cargo.toml index 7904408060a62..336e15e10234c 100644 --- a/src/durable-cache/Cargo.toml +++ b/src/durable-cache/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] async-trait = "0.1.83" bytes = { version = "1.3.0" } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" itertools = { version = "0.12.1" } mz-ore = { path = "../ore", features = ["process"] } @@ -23,7 +23,7 @@ mz-timely-util = { path = "../timely-util" } prometheus = { version = "0.13.3", default-features = false } prost = { version = "0.13.1", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive", "rc"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["rt", "rt-multi-thread"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index 0e1225c397095..7aa66f877e1a6 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -145,7 +145,7 @@ reqwest = { version = "0.11.13", features = ["blocking"] } serde_json = "1.0.125" serde_urlencoded = "0.7.1" similar-asserts = "1.4" -timely = "0.16.0" +timely = "0.17.0" tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } [build-dependencies] diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index 4bcbea5da90de..48de6ee02ab19 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -59,7 +59,7 @@ serde_json = "1.0.125" sha1 = "0.10.5" sha2 = "0.10.6" subtle = "2.4.1" -timely = "0.16.0" +timely = "0.17.0" tracing = "0.1.37" uncased = "0.9.7" uuid = { version = "1.7.0", features = ["v5"] } diff --git a/src/interchange/Cargo.toml b/src/interchange/Cargo.toml index f2adb9ac94d18..f9b9580f10e55 100644 --- a/src/interchange/Cargo.toml +++ b/src/interchange/Cargo.toml @@ -20,7 +20,7 @@ byteorder = "1.4.3" bytes = "1.3.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" itertools = "0.12.1" maplit = "1.0.2" mz-avro = { path = "../avro", features = ["snappy"] } @@ -33,7 +33,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } prost-reflect = "0.14.5" seahash = "4" serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["macros", "net", "rt", "rt-multi-thread", "time"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde"] } diff --git a/src/persist-cli/Cargo.toml b/src/persist-cli/Cargo.toml index 648b829feb944..6e73b01894abd 100644 --- a/src/persist-cli/Cargo.toml +++ b/src/persist-cli/Cargo.toml @@ -23,7 +23,7 @@ async-trait = "0.1.83" axum = "0.7.5" bytes = { version = "1.3.0", features = ["serde"] } clap = { version = "4.5.23", features = ["derive", "env"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" humantime = "2.1.0" mz-http-util = { path = "../http-util" } @@ -40,7 +40,7 @@ num_enum = "0.5.7" prometheus = { version = "0.13.3", default-features = false } serde = { version = "1.0.152", features = ["derive", "rc"] } serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] } tracing = "0.1.37" url = "2.3.1" diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 66574c15c148c..eaf6e557c503b 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -35,7 +35,7 @@ async-stream = "0.3.3" async-trait = "0.1.83" bytes = { version = "1.3.0", features = ["serde"] } clap = { version = "4.5.23", features = ["derive"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" futures-util = "0.3" h2 = "0.3.13" @@ -59,7 +59,7 @@ sentry-tracing = "0.29.1" semver = { version = "1.0.16", features = ["serde"] } serde = { version = "1.0.152", features = ["derive", "rc"] } serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" thiserror = "1.0.37" tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] } tokio-metrics = "0.3.0" diff --git a/src/persist-types/Cargo.toml b/src/persist-types/Cargo.toml index 026d084ae9869..fb268f82042b8 100644 --- a/src/persist-types/Cargo.toml +++ b/src/persist-types/Cargo.toml @@ -26,7 +26,7 @@ proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } -timely = "0.16.0" +timely = "0.17.0" tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index 68aca52c1ef49..39e8d8d4e20d3 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -37,7 +37,7 @@ azure_core = "0.21.0" base64 = "0.13.1" bytes = "1.3.0" deadpool-postgres = "0.10.3" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" fail = { version = "0.5.1", features = ["failpoints"] } futures-util = "0.3.25" md-5 = "0.10.5" @@ -58,7 +58,7 @@ proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } rand = { version = "0.8.5", features = ["small_rng"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git a/src/repr/Cargo.toml b/src/repr/Cargo.toml index 4a4f4ff354640..440808c72e692 100644 --- a/src/repr/Cargo.toml +++ b/src/repr/Cargo.toml @@ -36,7 +36,7 @@ columnation = "0.1.0" chrono = { version = "0.4.35", default-features = false, features = ["serde", "std"] } compact_bytes = "0.1.2" dec = "0.4.8" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" enum-kinds = "0.5.1" flatcontainer = "0.5.0" hex = "0.4.3" @@ -68,7 +68,7 @@ serde_json = { version = "1.0.125", features = ["arbitrary_precision", "preserve smallvec = { version = "1.10.0", features = ["serde", "union"] } static_assertions = "1.1" strsim = "0.11.1" -timely = "0.16.0" +timely = "0.17.0" tokio-postgres = { version = "0.7.8" } tracing-core = "0.1.30" url = { version = "2.3.1", features = ["serde"] } diff --git a/src/service/Cargo.toml b/src/service/Cargo.toml index d35313ead9677..095502c93f3d8 100644 --- a/src/service/Cargo.toml +++ b/src/service/Cargo.toml @@ -35,7 +35,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } semver = "1.0.16" serde = { version = "1.0.152", features = ["derive"] } sysinfo = "0.27.2" -timely = "0.16.0" +timely = "0.17.0" tokio = "1.38.0" tokio-stream = "0.1.11" tonic = "0.12.1" diff --git a/src/storage-client/Cargo.toml b/src/storage-client/Cargo.toml index ce9e33156ffbb..25d7a30777e3a 100644 --- a/src/storage-client/Cargo.toml +++ b/src/storage-client/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.66" async-trait = "0.1.83" chrono = { version = "0.4.35", default-features = false, features = ["std"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" http = "1.1.0" itertools = { version = "0.12.1" } @@ -46,7 +46,7 @@ serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } smallvec = { version = "1.10.0", features = ["serde", "union"] } static_assertions = "1.1" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = [ "fs", "rt", diff --git a/src/storage-controller/Cargo.toml b/src/storage-controller/Cargo.toml index 31a898e4838b9..73a5e83ae0e2a 100644 --- a/src/storage-controller/Cargo.toml +++ b/src/storage-controller/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytes = "1.3.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" itertools = { version = "0.12.1" } mz-build-info = { path = "../build-info" } @@ -38,7 +38,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tokio-stream = "0.1.11" diff --git a/src/storage-operators/Cargo.toml b/src/storage-operators/Cargo.toml index 1bf2cd8c0b811..9dcc08ab9c819 100644 --- a/src/storage-operators/Cargo.toml +++ b/src/storage-operators/Cargo.toml @@ -17,7 +17,7 @@ async-stream = "0.3.3" aws-types = "1.1.1" bytes = "1.3.0" bytesize = "1.1.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" csv-async = { version = "1.3.0", features = ["tokio"] } futures = "0.3.25" http = "1.1.0" @@ -41,7 +41,7 @@ reqwest = { version = "0.11.13", features = ["stream"] } sentry = { version = "0.29.1" } serde = { version = "1.0.152", features = ["derive"] } smallvec = { version = "1.10.0", features = ["union"] } -timely = "0.16.0" +timely = "0.17.0" thiserror = "1.0.37" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-stream = "0.1.11" diff --git a/src/storage-types/Cargo.toml b/src/storage-types/Cargo.toml index d04a4341742af..ab09f4d1be935 100644 --- a/src/storage-types/Cargo.toml +++ b/src/storage-types/Cargo.toml @@ -25,7 +25,7 @@ bytes = "1.3.0" columnation = "0.1.0" dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" hex = "0.4.3" http = "1.1.0" itertools = { version = "0.12.1" } @@ -62,7 +62,7 @@ regex = "1.7.0" serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125", features = ["preserve_order"] } thiserror = "1.0.37" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tracing = "0.1.37" diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index a586bd888889a..0bd2af8dacca5 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -26,7 +26,7 @@ columnation = "0.1.0" crossbeam-channel = "0.5.8" csv-core = { version = "0.1.10" } dec = "0.4.8" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" indexmap = { version = "2.0.0", default-features = false, features = ["std"] } @@ -77,7 +77,7 @@ serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } serde_bytes = { version = "0.11.14" } sha2 = "0.10.6" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tokio-stream = "0.1.11" diff --git a/src/timely-util/Cargo.toml b/src/timely-util/Cargo.toml index 9d895e52c755b..88901d8931402 100644 --- a/src/timely-util/Cargo.toml +++ b/src/timely-util/Cargo.toml @@ -15,7 +15,7 @@ bincode = "1.3.3" bytemuck = "1.21.0" columnar = "0.2.2" columnation = "0.1.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" either = "1" futures-util = "0.3.25" lgalloc = "0.4" @@ -23,7 +23,7 @@ mz-ore = { path = "../ore", features = ["async", "process", "tracing_", "test"] num-traits = "0.2" proptest = { version = "1.6.0", default-features = false, features = ["std"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "time"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git a/src/timely-util/src/replay.rs b/src/timely-util/src/replay.rs index d7cb296d1f568..f0fe73ff4ba4e 100644 --- a/src/timely-util/src/replay.rs +++ b/src/timely-util/src/replay.rs @@ -13,6 +13,7 @@ //! provides the protocol and semantics of the [MzReplay] operator. use std::any::Any; +use std::borrow::Cow; use std::rc::Rc; use std::time::{Duration, Instant}; use timely::container::ContainerBuilder; @@ -56,14 +57,14 @@ where ) -> (StreamCore, Rc) where CB: ContainerBuilder, - L: FnMut(Session>>, &C) + L: FnMut(Session>>, Cow) + 'static; } impl MzReplay for I where T: Timestamp, - C: Container, + C: Container + Clone, I: IntoIterator, I::Item: EventIterator + 'static, A: ActivatorTrait + 'static, @@ -78,7 +79,7 @@ where ) -> (StreamCore, Rc) where for<'a> CB: ContainerBuilder, - L: FnMut(Session>>, &C) + L: FnMut(Session>>, Cow) + 'static, { let name = format!("Replay {}", name); @@ -135,13 +136,21 @@ where if weak_token.upgrade().is_some() { for event_stream in event_streams.iter_mut() { while let Some(event) = event_stream.next() { - match &event { - Event::Progress(vec) => { + use Cow::*; + match event { + Owned(Event::Progress(vec)) => { + progress.internals[0].extend(vec.iter().cloned()); + progress_sofar.extend(vec.into_iter()); + } + Owned(Event::Messages(time, data)) => { + logic(output.session_with_builder(&time), Owned(data)); + } + Borrowed(Event::Progress(vec)) => { progress.internals[0].extend(vec.iter().cloned()); progress_sofar.extend(vec.iter().cloned()); } - Event::Messages(time, data) => { - logic(output.session_with_builder(time), data); + Borrowed(Event::Messages(time, data)) => { + logic(output.session_with_builder(time), Borrowed(data)); } } } diff --git a/src/transform/Cargo.toml b/src/transform/Cargo.toml index 980a526f9e0ba..2718a2b83b3c8 100644 --- a/src/transform/Cargo.toml +++ b/src/transform/Cargo.toml @@ -10,7 +10,7 @@ publish = false workspace = true [dependencies] -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" enum-kinds = "0.5.1" itertools = "0.12.1" mz-compute-types = { path = "../compute-types" } diff --git a/src/txn-wal/Cargo.toml b/src/txn-wal/Cargo.toml index e1fce89097ebe..6e7e2bdff4a10 100644 --- a/src/txn-wal/Cargo.toml +++ b/src/txn-wal/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] async-trait = "0.1.83" bytes = { version = "1.3.0" } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" itertools = { version = "0.12.1" } mz-ore = { path = "../ore", features = ["process"] } @@ -23,7 +23,7 @@ mz-timely-util = { path = "../timely-util" } prometheus = { version = "0.13.3", default-features = false } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive", "rc"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["rt", "rt-multi-thread"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } diff --git a/test/sqllogictest/cluster.slt b/test/sqllogictest/cluster.slt index c0976f74e90e8..ca0cb75c4e692 100644 --- a/test/sqllogictest/cluster.slt +++ b/test/sqllogictest/cluster.slt @@ -233,11 +233,12 @@ bar mz_dataflow_addresses_per_worker mz_dataflow_addresses_per_worker_u7_prima bar mz_dataflow_addresses_per_worker mz_dataflow_addresses_per_worker_u7_primary_idx 2 worker_id NULL false bar mz_dataflow_channels_per_worker mz_dataflow_channels_per_worker_u7_primary_idx 1 id NULL false bar mz_dataflow_channels_per_worker mz_dataflow_channels_per_worker_u7_primary_idx 2 worker_id NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 1 address NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 2 port NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 3 worker_id NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 4 update_type NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 5 time NULL true +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 1 id NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 2 worker_id NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 3 source NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 4 port NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 5 update_type NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 6 time NULL true bar mz_dataflow_operators_per_worker mz_dataflow_operators_per_worker_u7_primary_idx 1 id NULL false bar mz_dataflow_operators_per_worker mz_dataflow_operators_per_worker_u7_primary_idx 2 worker_id NULL false bar mz_dataflow_shutdown_durations_histogram_raw mz_dataflow_shutdown_durations_histogram_raw_u7_primary_idx 1 worker_id NULL false diff --git a/test/sqllogictest/mz_catalog_server_index_accounting.slt b/test/sqllogictest/mz_catalog_server_index_accounting.slt index 5202fc259d718..74bc88616e793 100644 --- a/test/sqllogictest/mz_catalog_server_index_accounting.slt +++ b/test/sqllogictest/mz_catalog_server_index_accounting.slt @@ -64,7 +64,7 @@ mz_continual_tasks_ind CREATE␠INDEX␠"mz_continual_tasks_ind"␠IN␠CLUSTER mz_databases_ind CREATE␠INDEX␠"mz_databases_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s457␠AS␠"mz_catalog"."mz_databases"]␠("name") mz_dataflow_addresses_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_addresses_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_addresses_per_worker"␠("id",␠"worker_id") mz_dataflow_channels_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_channels_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_channels_per_worker"␠("id",␠"worker_id") -mz_dataflow_operator_reachability_raw_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_operator_reachability_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_operator_reachability_raw"␠("address",␠"port",␠"worker_id",␠"update_type",␠"time") +mz_dataflow_operator_reachability_raw_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_operator_reachability_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_operator_reachability_raw"␠("id",␠"worker_id",␠"source",␠"port",␠"update_type",␠"time") mz_dataflow_operators_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_operators_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_operators_per_worker"␠("id",␠"worker_id") mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_shutdown_durations_histogram_raw"␠("worker_id",␠"duration_ns") mz_frontiers_ind CREATE␠INDEX␠"mz_frontiers_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s696␠AS␠"mz_internal"."mz_frontiers"]␠("object_id") @@ -369,8 +369,9 @@ mz_dataflow_channels_per_worker id mz_dataflow_channels_per_worker to_index mz_dataflow_channels_per_worker to_port mz_dataflow_channels_per_worker worker_id -mz_dataflow_operator_reachability_raw address +mz_dataflow_operator_reachability_raw id mz_dataflow_operator_reachability_raw port +mz_dataflow_operator_reachability_raw source mz_dataflow_operator_reachability_raw time mz_dataflow_operator_reachability_raw update_type mz_dataflow_operator_reachability_raw worker_id diff --git a/test/testdrive-old-kafka-src-syntax/indexes.td b/test/testdrive-old-kafka-src-syntax/indexes.td index bcd037d2b552b..afe535df1b2a9 100644 --- a/test/testdrive-old-kafka-src-syntax/indexes.td +++ b/test/testdrive-old-kafka-src-syntax/indexes.td @@ -318,7 +318,7 @@ mz_continual_tasks_ind mz_continual_tasks mz_databases_ind mz_databases mz_catalog_server {name} "" mz_dataflow_addresses_per_worker_s2_primary_idx mz_dataflow_addresses_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_channels_per_worker_s2_primary_idx mz_dataflow_channels_per_worker mz_catalog_server {id,worker_id} "" -mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {address,port,worker_id,update_type,time} "" +mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {id,worker_id,source,port,update_type,time} "" mz_dataflow_operators_per_worker_s2_primary_idx mz_dataflow_operators_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx mz_dataflow_shutdown_durations_histogram_raw mz_catalog_server {worker_id,duration_ns} "" mz_frontiers_ind mz_frontiers mz_catalog_server {object_id} "" diff --git a/test/testdrive/indexes.td b/test/testdrive/indexes.td index 45eb7f7eb6959..a2c3af8e7d24f 100644 --- a/test/testdrive/indexes.td +++ b/test/testdrive/indexes.td @@ -335,7 +335,7 @@ mz_continual_tasks_ind mz_continual_tasks mz_databases_ind mz_databases mz_catalog_server {name} "" mz_dataflow_addresses_per_worker_s2_primary_idx mz_dataflow_addresses_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_channels_per_worker_s2_primary_idx mz_dataflow_channels_per_worker mz_catalog_server {id,worker_id} "" -mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {address,port,worker_id,update_type,time} "" +mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {id,worker_id,source,port,update_type,time} "" mz_dataflow_operators_per_worker_s2_primary_idx mz_dataflow_operators_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx mz_dataflow_shutdown_durations_histogram_raw mz_catalog_server {worker_id,duration_ns} "" mz_frontiers_ind mz_frontiers mz_catalog_server {object_id} "" diff --git a/test/testdrive/logging.td b/test/testdrive/logging.td index 7512451095722..b3d9f2949b213 100644 --- a/test/testdrive/logging.td +++ b/test/testdrive/logging.td @@ -156,9 +156,9 @@ SID batch_received 7 bigint ORDER BY position id name position type -------------------------------------- -SID address 1 list -SID port 2 uint8 -SID worker_id 3 uint8 +SID id 1 uint8 +SID worker_id 2 uint8 +SID port 3 uint8 SID update_type 4 text SID time 5 mz_timestamp SID count 6 bigint