From a68da0d8c43b825c38a4b0719c7d68fd8ae9fccb Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 23 Jan 2025 11:48:46 +0100 Subject: [PATCH] Update to latest Timely Point Materialize at latest Timely. We need to incorporate some changes around reachability logging, which is now typed, and event iterators that return cow'ed data. Some of the complexity stems from the fact that event links are single-writer, so we need separate event links for each reachability log variant. Signed-off-by: Moritz Hoffmann --- Cargo.lock | 12 +- src/adapter-types/Cargo.toml | 2 +- src/adapter/Cargo.toml | 4 +- src/catalog/Cargo.toml | 4 +- src/catalog/src/builtin.rs | 19 +++- src/cluster/Cargo.toml | 4 +- src/compute-client/Cargo.toml | 4 +- src/compute-client/src/logging.rs | 12 +- src/compute-types/Cargo.toml | 4 +- src/compute/Cargo.toml | 6 +- src/compute/src/logging.rs | 12 +- src/compute/src/logging/compute.rs | 6 +- src/compute/src/logging/differential.rs | 25 +++-- src/compute/src/logging/initialize.rs | 104 +++++++++++------- src/compute/src/logging/reachability.rs | 23 ++-- src/compute/src/logging/timely.rs | 12 +- src/controller/Cargo.toml | 2 +- src/durable-cache/Cargo.toml | 4 +- src/environmentd/Cargo.toml | 2 +- src/expr/Cargo.toml | 2 +- src/interchange/Cargo.toml | 4 +- src/persist-cli/Cargo.toml | 4 +- src/persist-client/Cargo.toml | 4 +- src/persist-types/Cargo.toml | 2 +- src/persist/Cargo.toml | 4 +- src/repr/Cargo.toml | 4 +- src/service/Cargo.toml | 2 +- src/storage-client/Cargo.toml | 4 +- src/storage-controller/Cargo.toml | 4 +- src/storage-operators/Cargo.toml | 4 +- src/storage-types/Cargo.toml | 4 +- src/storage/Cargo.toml | 4 +- src/timely-util/Cargo.toml | 4 +- src/timely-util/src/replay.rs | 23 ++-- src/transform/Cargo.toml | 2 +- src/txn-wal/Cargo.toml | 4 +- test/sqllogictest/cluster.slt | 11 +- .../mz_catalog_server_index_accounting.slt | 5 +- .../testdrive-old-kafka-src-syntax/indexes.td | 2 +- test/testdrive/indexes.td | 2 +- test/testdrive/logging.td | 6 +- 41 files changed, 203 insertions(+), 163 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 21a1db322eb3f..6797f59041cc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2369,9 +2369,9 @@ dependencies = [ [[package]] name = "differential-dataflow" -version = "0.13.3" +version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86f47ab549a056e3959ce2036d6b20ba369a75e2a0605ccf64256e153626b352" +checksum = "75b93af605b7f82fbf6e671a5bb0f940b385e57254a0af59ce6dfb98b8c4b302" dependencies = [ "columnar", "fnv", @@ -2381,9 +2381,9 @@ dependencies = [ [[package]] name = "differential-dogs3" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8ece5ac3c62f1cb6e21560b53102963aa91513d3f72911d99b6620f2d3ac6b1" +checksum = "17ba29145a1df1bdc3da1142eeb991f0866620c79ce9d85e83a3837f29112ba0" dependencies = [ "differential-dataflow", "serde", @@ -10406,9 +10406,9 @@ dependencies = [ [[package]] name = "timely" -version = "0.16.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ace21eb2a22c1b80b0b9b5be0627eb0f95a128a5751e866aefc90a85e3007d3" +checksum = "0506b6506ef04c371ea6151942df60e309e7f5e710458a6b533e364ee0b3cab3" dependencies = [ "bincode", "byteorder", diff --git a/src/adapter-types/Cargo.toml b/src/adapter-types/Cargo.toml index 5383238d65749..d1cdc0ae56507 100644 --- a/src/adapter-types/Cargo.toml +++ b/src/adapter-types/Cargo.toml @@ -15,7 +15,7 @@ mz-ore = { path = "../ore" } mz-repr = { path = "../repr" } mz-storage-types = { path = "../storage-types" } serde = "1.0.152" -timely = "0.16.0" +timely = "0.17.0" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } [package.metadata.cargo-udeps.ignore] diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index a4298b2fe1703..92c489f39a4e5 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" enum-kinds = "0.5.1" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" @@ -78,7 +78,7 @@ serde_plain = "1.0.1" sha2 = "0.10.6" smallvec = { version = "1.10.0", features = ["union"] } static_assertions = "1.1" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["rt", "time"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index 2751f63aec9c3..6d1e030684df6 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -18,7 +18,7 @@ bytesize = "1.1.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" ipnet = "2.5.0" itertools = "0.12.1" @@ -60,7 +60,7 @@ serde_plain = "1.0.1" static_assertions = "1.1" sha2 = "0.10.6" thiserror = "1.0.37" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0" } tracing = "0.1.37" uuid = "1.2.2" diff --git a/src/catalog/src/builtin.rs b/src/catalog/src/builtin.rs index f14e5c808b7a9..a8b218c9fc2b3 100644 --- a/src/catalog/src/builtin.rs +++ b/src/catalog/src/builtin.rs @@ -5567,15 +5567,22 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER: LazyLock = oid: oid::VIEW_MZ_DATAFLOW_OPERATOR_REACHABILITY_PER_WORKER_OID, column_defs: None, sql: "SELECT - address, + addr2.id, + reachability.worker_id, port, - worker_id, update_type, time, pg_catalog.count(*) as count FROM - mz_introspection.mz_dataflow_operator_reachability_raw -GROUP BY address, port, worker_id, update_type, time", + mz_introspection.mz_dataflow_operator_reachability_raw reachability, + mz_introspection.mz_dataflow_addresses_per_worker addr1, + mz_introspection.mz_dataflow_addresses_per_worker addr2 +WHERE + addr2.address = addr1.address || reachability.source + AND addr1.id = reachability.id + AND addr1.worker_id = reachability.worker_id + AND addr2.worker_id = reachability.worker_id +GROUP BY addr2.id, reachability.worker_id, port, update_type, time", access: vec![PUBLIC_SELECT], }); @@ -5587,13 +5594,13 @@ pub static MZ_DATAFLOW_OPERATOR_REACHABILITY: LazyLock = column_defs: None, sql: " SELECT - address, + id, port, update_type, time, pg_catalog.sum(count) as count FROM mz_introspection.mz_dataflow_operator_reachability_per_worker -GROUP BY address, port, update_type, time", +GROUP BY id, port, update_type, time", access: vec![PUBLIC_SELECT], }); diff --git a/src/cluster/Cargo.toml b/src/cluster/Cargo.toml index d40e4100b2481..43ca3a040c8c2 100644 --- a/src/cluster/Cargo.toml +++ b/src/cluster/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.66" async-trait = "0.1.83" crossbeam-channel = "0.5.8" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" mz-cluster-client = { path = "../cluster-client" } mz-ore = { path = "../ore", features = ["async", "process", "tracing_"] } @@ -21,7 +21,7 @@ mz-persist-client = { path = "../persist-client" } mz-service = { path = "../service" } mz-txn-wal = { path = "../txn-wal" } regex = "1.7.0" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute-client/Cargo.toml b/src/compute-client/Cargo.toml index dd09ba95f42ee..c7ed7aa902e90 100644 --- a/src/compute-client/Cargo.toml +++ b/src/compute-client/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytesize = "1.1.0" crossbeam-channel = "0.5.8" derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" http = "1.1.0" mz-build-info = { path = "../build-info" } @@ -43,7 +43,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.125" thiserror = "1.0.37" -timely = "0.16.0" +timely = "0.17.0" tokio = "1.38.0" tokio-stream = "0.1.11" tonic = "0.12.1" diff --git a/src/compute-client/src/logging.rs b/src/compute-client/src/logging.rs index c6f56ae1a8982..c0a617d8af4bb 100644 --- a/src/compute-client/src/logging.rs +++ b/src/compute-client/src/logging.rs @@ -448,16 +448,10 @@ impl LogVariant { .finish(), LogVariant::Timely(TimelyLog::Reachability) => RelationDesc::builder() - .with_column( - "address", - ScalarType::List { - element_type: Box::new(ScalarType::UInt64), - custom_id: None, - } - .nullable(false), - ) - .with_column("port", ScalarType::UInt64.nullable(false)) + .with_column("id", ScalarType::UInt64.nullable(false)) .with_column("worker_id", ScalarType::UInt64.nullable(false)) + .with_column("source", ScalarType::UInt64.nullable(false)) + .with_column("port", ScalarType::UInt64.nullable(false)) .with_column("update_type", ScalarType::String.nullable(false)) .with_column("time", ScalarType::MzTimestamp.nullable(true)) .finish(), diff --git a/src/compute-types/Cargo.toml b/src/compute-types/Cargo.toml index a82db2a67b378..4e368867b8edd 100644 --- a/src/compute-types/Cargo.toml +++ b/src/compute-types/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] columnar = "0.2.2" columnation = "0.1.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" itertools = "0.12.1" mz-dyncfg = { path = "../dyncfg" } mz-expr = { path = "../expr" } @@ -24,7 +24,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.16.0" +timely = "0.17.0" tracing = "0.1.37" workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/compute/Cargo.toml b/src/compute/Cargo.toml index 3f9ba4d140a3d..e972dbb2f1782 100644 --- a/src/compute/Cargo.toml +++ b/src/compute/Cargo.toml @@ -16,8 +16,8 @@ bytesize = "1.1.0" columnar = "0.2.2" crossbeam-channel = "0.5.8" dec = { version = "0.4.8", features = ["serde"] } -differential-dataflow = "0.13.3" -differential-dogs3 = "0.1.3" +differential-dataflow = "0.13.4" +differential-dogs3 = "0.1.4" futures = "0.3.25" itertools = "0.12.1" lgalloc = "0.4" @@ -39,7 +39,7 @@ prometheus = { version = "0.13.3", default-features = false } scopeguard = "1.1.0" serde = { version = "1.0.152", features = ["derive"] } smallvec = { version = "1.10.0", features = ["serde", "union"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "net"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git a/src/compute/src/logging.rs b/src/compute/src/logging.rs index 23a9a18f02f6f..a024dbcf9dc8f 100644 --- a/src/compute/src/logging.rs +++ b/src/compute/src/logging.rs @@ -115,18 +115,22 @@ where /// /// This is just a bundle-type intended to make passing around its contents in the logging /// initialization code more convenient. +/// +/// The `N` type parameter specifies the number of links to create for the event queue. We need +/// separate links for queues that feed from multiple loggers because the `EventLink` type is not +/// multi-producer safe. #[derive(Clone)] -struct EventQueue { - link: Rc>, +struct EventQueue { + links: [Rc>; N], activator: RcActivator, } -impl EventQueue { +impl EventQueue { fn new(name: &str) -> Self { let activator_name = format!("{name}_activator"); let activate_after = 128; Self { - link: Rc::new(EventLink::new()), + links: [(); N].map(|_| Rc::new(EventLink::new())), activator: RcActivator::new(activator_name, activate_after), } } diff --git a/src/compute/src/logging/compute.rs b/src/compute/src/logging/compute.rs index 049f952b2f50a..8c0610d3b055a 100644 --- a/src/compute/src/logging/compute.rs +++ b/src/compute/src/logging/compute.rs @@ -308,16 +308,16 @@ pub(super) fn construct( worker.dataflow_named("Dataflow: compute logging", move |scope| { let enable_logging = config.enable_logging; - let (logs, token) = Some(event_queue.link).mz_replay::<_, ProvidedBuilder<_>, _>( + let (logs, token) = event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>( scope, "compute logs", config.interval, event_queue.activator, - move |mut session, data| { + move |mut session, mut data| { // If logging is disabled, we still need to install the indexes, but we can leave them // empty. We do so by immediately filtering all logs events. if enable_logging { - session.give_container(&mut data.clone()) + session.give_container(data.to_mut()) } }, ); diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 42ae6e508bdec..22364045d0522 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -20,15 +20,15 @@ use differential_dataflow::logging::{ }; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; -use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; +use mz_timely_util::containers::{ + columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder, +}; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; -use timely::dataflow::operators::Filter; use timely::dataflow::Stream; use crate::extensions::arrange::MzArrangeCore; @@ -57,21 +57,22 @@ pub(super) fn construct( let dataflow_index = worker.next_dataflow_index(); worker.dataflow_named("Dataflow: differential logging", move |scope| { - let (mut logs, token) = Some(event_queue.link) - .mz_replay::<_, CapacityContainerBuilder<_>, _>( + let enable_logging = config.enable_logging; + let (logs, token) = event_queue.links.clone() + .mz_replay::<_, ProvidedBuilder<_>, _>( scope, "differential logs", config.interval, event_queue.activator, - |mut session, data| session.give_iterator(data.iter()), + move |mut session, mut data|{ + // If logging is disabled, we still need to install the indexes, but we can leave them + // empty. We do so by immediately filtering all logs events. + if enable_logging { + session.give_container(data.to_mut()) + } + } ); - // If logging is disabled, we still need to install the indexes, but we can leave them - // empty. We do so by immediately filtering all logs events. - if !config.enable_logging { - logs = logs.filter(|_| false); - } - // Build a demux operator that splits the replayed event stream up into the separate // logging streams. let mut demux = diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 9c4a51fc7adf0..1878f75f1cff5 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -21,8 +21,8 @@ use mz_timely_util::containers::{Column, ColumnBuilder}; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::{ContainerBuilder, PushInto}; -use timely::logging::{ProgressEventTimestamp, TimelyEvent, TimelyEventBuilder}; -use timely::logging_core::Logger; +use timely::logging::{TimelyEvent, TimelyEventBuilder}; +use timely::logging_core::{Logger, Registry}; use timely::order::Product; use timely::progress::reachability::logging::{TrackerEvent, TrackerEventBuilder}; @@ -81,10 +81,7 @@ pub fn initialize( (logger, traces) } -pub(super) type ReachabilityEvent = ( - Vec, - Vec<(usize, usize, bool, Option, Diff)>, -); +pub(super) type ReachabilityEvent = (usize, Vec<(usize, usize, bool, Timestamp, Diff)>); struct LoggingContext<'a, A: Allocate> { worker: &'a mut timely::worker::Worker, @@ -93,7 +90,7 @@ struct LoggingContext<'a, A: Allocate> { now: Instant, start_offset: Duration, t_event_queue: EventQueue>, - r_event_queue: EventQueue>, + r_event_queue: EventQueue, 3>, d_event_queue: EventQueue>, c_event_queue: EventQueue>, shared_state: Rc>, @@ -149,15 +146,35 @@ impl LoggingContext<'_, A> { .collect() } + /// Construct a new reachability logger for timestamp type `T`. + /// + /// Inserts a logger with the name `timely/reachability/{type_name::()}`, following + /// Timely naming convention. + fn register_reachability_logger( + &self, + registry: &mut Registry, + index: usize, + ) { + let logger = self.reachability_logger::(index); + let type_name = std::any::type_name::(); + registry.insert_logger(&format!("timely/reachability/{type_name}"), logger); + } + + /// Register all loggers with the timely worker. + /// + /// Registers the timely, differential, compute, and reachability loggers. fn register_loggers(&self) { let t_logger = self.simple_logger::(self.t_event_queue.clone()); - let r_logger = self.reachability_logger(); let d_logger = self.simple_logger::(self.d_event_queue.clone()); let c_logger = self.simple_logger::(self.c_event_queue.clone()); let mut register = self.worker.log_register(); register.insert_logger("timely", t_logger); - register.insert_logger("timely/reachability", r_logger); + // Note that each reachability logger has a unique index, this is crucial to avoid dropping + // data. + self.register_reachability_logger::(&mut register, 0); + self.register_reachability_logger::>>(&mut register, 1); + self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2); register.insert_logger("differential/arrange", d_logger); register.insert_logger("materialize/compute", c_logger.clone()); @@ -168,7 +185,9 @@ impl LoggingContext<'_, A> { &self, event_queue: EventQueue, ) -> Logger { - let mut logger = BatchLogger::new(event_queue.link, self.interval_ms); + let [link] = event_queue.links; + let mut logger = BatchLogger::new(link, self.interval_ms); + let activator = event_queue.activator.clone(); Logger::new( self.now, self.start_offset, @@ -176,18 +195,23 @@ impl LoggingContext<'_, A> { if let Some(data) = data.take() { logger.publish_batch(data); } else if logger.report_progress(*time) { - event_queue.activator.activate(); + activator.activate(); } }, ) } - fn reachability_logger(&self) -> Logger { - let event_queue = self.r_event_queue.clone(); - - let mut logger = BatchLogger::new(event_queue.link, self.interval_ms); + /// Construct a reachability logger for timestamp type `T`. The index must + /// refer to a unique link in the reachability event queue. + fn reachability_logger(&self, index: usize) -> Logger> + where + T: ExtractTimestamp, + { + let link = Rc::clone(&self.r_event_queue.links[index]); + let mut logger = BatchLogger::new(link, self.interval_ms); let mut massaged = Vec::new(); let mut builder = ColumnBuilder::default(); + let activator = self.r_event_queue.activator.clone(); let action = move |batch_time: &Duration, data: &mut Option>| { if let Some(data) = data { @@ -197,25 +221,23 @@ impl LoggingContext<'_, A> { TrackerEvent::SourceUpdate(update) => { massaged.extend(update.updates.iter().map( |(node, port, time, diff)| { - let ts = try_downcast_timestamp(time); let is_source = true; - (*node, *port, is_source, ts, *diff) + (*node, *port, is_source, T::extract(time), *diff) }, )); - builder.push_into((time, (&update.tracker_id, &massaged))); + builder.push_into((time, (update.tracker_id, &massaged))); massaged.clear(); } TrackerEvent::TargetUpdate(update) => { massaged.extend(update.updates.iter().map( |(node, port, time, diff)| { - let ts = try_downcast_timestamp(time); let is_source = false; - (*node, *port, is_source, ts, *diff) + (*node, *port, is_source, time.extract(), *diff) }, )); - builder.push_into((time, (&update.tracker_id, &massaged))); + builder.push_into((time, (update.tracker_id, &massaged))); massaged.clear(); } } @@ -230,7 +252,7 @@ impl LoggingContext<'_, A> { } if logger.report_progress(*batch_time) { - event_queue.activator.activate(); + activator.activate(); } } }; @@ -239,20 +261,26 @@ impl LoggingContext<'_, A> { } } -/// Extracts a `Timestamp` from a `dyn ProgressEventTimestamp`. -/// -/// For nested timestamps, only extracts the outermost one. The rest of the timestamps are -/// ignored for now. -#[inline] -fn try_downcast_timestamp(time: &dyn ProgressEventTimestamp) -> Option { - let time_any = time.as_any(); - time_any - .downcast_ref::() - .copied() - .or_else(|| { - time_any - .downcast_ref::>>() - .map(|t| t.outer) - }) - .or_else(|| time_any.downcast_ref::<(Timestamp, Subtime)>().map(|t| t.0)) +/// Helper trait to extract a timestamp from various types of timestamp used in rendering. +trait ExtractTimestamp: Clone + 'static { + /// Extracts the timestamp from the type. + fn extract(&self) -> Timestamp; +} + +impl ExtractTimestamp for Timestamp { + fn extract(&self) -> Timestamp { + *self + } +} + +impl ExtractTimestamp for Product> { + fn extract(&self) -> Timestamp { + self.outer + } +} + +impl ExtractTimestamp for (Timestamp, Subtime) { + fn extract(&self) -> Timestamp { + self.0 + } } diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 079ea5972e332..463a30f21430b 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -39,7 +39,7 @@ use crate::typedefs::RowRowSpine; pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, - event_queue: EventQueue>, + event_queue: EventQueue, 3>, ) -> BTreeMap { let interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_index = worker.index(); @@ -48,10 +48,10 @@ pub(super) fn construct( // A dataflow for multiple log-derived arrangements. let traces = worker.dataflow_named("Dataflow: timely reachability logging", move |scope| { let enable_logging = config.enable_logging; - type UpdatesKey = (bool, Vec, usize, usize, Option); + type UpdatesKey = (bool, usize, usize, usize, Timestamp); type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>; - let (updates, token) = Some(event_queue.link).mz_replay::<_, CB, _>( + let (updates, token) = event_queue.links.mz_replay::<_, CB, _>( scope, "reachability logs", config.interval, @@ -62,11 +62,11 @@ pub(super) fn construct( if !enable_logging { return; } - for (time, (addr, massaged)) in data.iter() { + for (time, (operator_id, massaged)) in data.iter() { let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms; let time_ms: Timestamp = time_ms.try_into().expect("must fit"); for (source, port, update_type, ts, diff) in massaged.into_iter() { - let datum = (update_type, addr, source, port, ts); + let datum = (update_type, operator_id, source, port, ts); session.give(((datum, ()), time_ms, diff)); } } @@ -76,22 +76,17 @@ pub(super) fn construct( // Restrict results by those logs that are meant to be active. let logs_active = [LogVariant::Timely(TimelyLog::Reachability)]; - let mut addr_row = Row::default(); let updates = consolidate_and_pack::<_, Col2ValBatcher, ColumnBuilder<_>, _, _>( &updates, TimelyLog::Reachability, move |((datum, ()), time, diff), packer, session| { - let (update_type, addr, source, port, ts) = datum; + let (update_type, operator_id, source, port, ts) = datum; let update_type = if *update_type { "source" } else { "target" }; - addr_row.packer().push_list( - IntoIterator::into_iter(addr) - .chain(std::iter::once(source)) - .map(|id| Datum::UInt64(u64::cast_from(*id))), - ); let data = packer.pack_slice(&[ - addr_row.iter().next().unwrap(), - Datum::UInt64(u64::cast_from(*port)), + Datum::UInt64(u64::cast_from(*operator_id)), Datum::UInt64(u64::cast_from(worker_index)), + Datum::UInt64(u64::cast_from(*source)), + Datum::UInt64(u64::cast_from(*port)), Datum::String(update_type), Datum::from(*ts), ]); diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index f98f493af8fda..d8e0776e85a43 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -18,11 +18,12 @@ use differential_dataflow::consolidation::ConsolidatingContainerBuilder; use mz_compute_client::logging::LoggingConfig; use mz_ore::cast::CastFrom; use mz_repr::{Datum, Diff, Timestamp}; -use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, ColumnBuilder}; +use mz_timely_util::containers::{ + columnar_exchange, Col2ValBatcher, ColumnBuilder, ProvidedBuilder, +}; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; use timely::container::columnation::{Columnation, CopyRegion}; -use timely::container::CapacityContainerBuilder; use timely::dataflow::channels::pact::{ExchangeCore, Pipeline}; use timely::dataflow::channels::pushers::buffer::Session; use timely::dataflow::channels::pushers::{Counter, Tee}; @@ -61,20 +62,19 @@ pub(super) fn construct( worker.dataflow_named("Dataflow: timely logging", move |scope| { let enable_logging = config.enable_logging; let (logs, token) = - Some(event_queue.link).mz_replay::<_, CapacityContainerBuilder<_>, _>( + event_queue.links.mz_replay::<_, ProvidedBuilder<_>, _>( scope, "timely logs", config.interval, event_queue.activator, - move |mut session, data| { + move |mut session, mut data| { // If logging is disabled, we still need to install the indexes, but we can leave them // empty. We do so by immediately filtering all logs events. if enable_logging { - session.give_iterator(data.iter()) + session.give_container(data.to_mut()) } }, ); - let logs = logs.container::>(); // Build a demux operator that splits the replayed event stream up into the separate // logging streams. diff --git a/src/controller/Cargo.toml b/src/controller/Cargo.toml index b31042d9424e7..d73475b4d1d47 100644 --- a/src/controller/Cargo.toml +++ b/src/controller/Cargo.toml @@ -32,7 +32,7 @@ mz-txn-wal = { path = "../txn-wal" } regex = "1.7.0" serde = { version = "1.0.152", features = ["derive"] } serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" tokio = "1.38.0" tracing = "0.1.37" uuid = { version = "1.7.0" } diff --git a/src/durable-cache/Cargo.toml b/src/durable-cache/Cargo.toml index 7904408060a62..336e15e10234c 100644 --- a/src/durable-cache/Cargo.toml +++ b/src/durable-cache/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] async-trait = "0.1.83" bytes = { version = "1.3.0" } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" itertools = { version = "0.12.1" } mz-ore = { path = "../ore", features = ["process"] } @@ -23,7 +23,7 @@ mz-timely-util = { path = "../timely-util" } prometheus = { version = "0.13.3", default-features = false } prost = { version = "0.13.1", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive", "rc"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["rt", "rt-multi-thread"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index 0e1225c397095..7aa66f877e1a6 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -145,7 +145,7 @@ reqwest = { version = "0.11.13", features = ["blocking"] } serde_json = "1.0.125" serde_urlencoded = "0.7.1" similar-asserts = "1.4" -timely = "0.16.0" +timely = "0.17.0" tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } [build-dependencies] diff --git a/src/expr/Cargo.toml b/src/expr/Cargo.toml index 4bcbea5da90de..48de6ee02ab19 100644 --- a/src/expr/Cargo.toml +++ b/src/expr/Cargo.toml @@ -59,7 +59,7 @@ serde_json = "1.0.125" sha1 = "0.10.5" sha2 = "0.10.6" subtle = "2.4.1" -timely = "0.16.0" +timely = "0.17.0" tracing = "0.1.37" uncased = "0.9.7" uuid = { version = "1.7.0", features = ["v5"] } diff --git a/src/interchange/Cargo.toml b/src/interchange/Cargo.toml index f2adb9ac94d18..f9b9580f10e55 100644 --- a/src/interchange/Cargo.toml +++ b/src/interchange/Cargo.toml @@ -20,7 +20,7 @@ byteorder = "1.4.3" bytes = "1.3.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } clap = { version = "4.5.23", features = ["derive"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" itertools = "0.12.1" maplit = "1.0.2" mz-avro = { path = "../avro", features = ["snappy"] } @@ -33,7 +33,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } prost-reflect = "0.14.5" seahash = "4" serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["macros", "net", "rt", "rt-multi-thread", "time"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde"] } diff --git a/src/persist-cli/Cargo.toml b/src/persist-cli/Cargo.toml index 648b829feb944..6e73b01894abd 100644 --- a/src/persist-cli/Cargo.toml +++ b/src/persist-cli/Cargo.toml @@ -23,7 +23,7 @@ async-trait = "0.1.83" axum = "0.7.5" bytes = { version = "1.3.0", features = ["serde"] } clap = { version = "4.5.23", features = ["derive", "env"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" humantime = "2.1.0" mz-http-util = { path = "../http-util" } @@ -40,7 +40,7 @@ num_enum = "0.5.7" prometheus = { version = "0.13.3", default-features = false } serde = { version = "1.0.152", features = ["derive", "rc"] } serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] } tracing = "0.1.37" url = "2.3.1" diff --git a/src/persist-client/Cargo.toml b/src/persist-client/Cargo.toml index 66574c15c148c..eaf6e557c503b 100644 --- a/src/persist-client/Cargo.toml +++ b/src/persist-client/Cargo.toml @@ -35,7 +35,7 @@ async-stream = "0.3.3" async-trait = "0.1.83" bytes = { version = "1.3.0", features = ["serde"] } clap = { version = "4.5.23", features = ["derive"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" futures-util = "0.3" h2 = "0.3.13" @@ -59,7 +59,7 @@ sentry-tracing = "0.29.1" semver = { version = "1.0.16", features = ["serde"] } serde = { version = "1.0.152", features = ["derive", "rc"] } serde_json = "1.0.125" -timely = "0.16.0" +timely = "0.17.0" thiserror = "1.0.37" tokio = { version = "1.38.0", default-features = false, features = ["macros", "sync", "rt", "rt-multi-thread", "time"] } tokio-metrics = "0.3.0" diff --git a/src/persist-types/Cargo.toml b/src/persist-types/Cargo.toml index 026d084ae9869..fb268f82042b8 100644 --- a/src/persist-types/Cargo.toml +++ b/src/persist-types/Cargo.toml @@ -26,7 +26,7 @@ proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } -timely = "0.16.0" +timely = "0.17.0" tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true } diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index 68aca52c1ef49..39e8d8d4e20d3 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -37,7 +37,7 @@ azure_core = "0.21.0" base64 = "0.13.1" bytes = "1.3.0" deadpool-postgres = "0.10.3" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" fail = { version = "0.5.1", features = ["failpoints"] } futures-util = "0.3.25" md-5 = "0.10.5" @@ -58,7 +58,7 @@ proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } rand = { version = "0.8.5", features = ["small_rng"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] } tokio-postgres = { version = "0.7.8" } tracing = "0.1.37" diff --git a/src/repr/Cargo.toml b/src/repr/Cargo.toml index 4a4f4ff354640..440808c72e692 100644 --- a/src/repr/Cargo.toml +++ b/src/repr/Cargo.toml @@ -36,7 +36,7 @@ columnation = "0.1.0" chrono = { version = "0.4.35", default-features = false, features = ["serde", "std"] } compact_bytes = "0.1.2" dec = "0.4.8" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" enum-kinds = "0.5.1" flatcontainer = "0.5.0" hex = "0.4.3" @@ -68,7 +68,7 @@ serde_json = { version = "1.0.125", features = ["arbitrary_precision", "preserve smallvec = { version = "1.10.0", features = ["serde", "union"] } static_assertions = "1.1" strsim = "0.11.1" -timely = "0.16.0" +timely = "0.17.0" tokio-postgres = { version = "0.7.8" } tracing-core = "0.1.30" url = { version = "2.3.1", features = ["serde"] } diff --git a/src/service/Cargo.toml b/src/service/Cargo.toml index d35313ead9677..095502c93f3d8 100644 --- a/src/service/Cargo.toml +++ b/src/service/Cargo.toml @@ -35,7 +35,7 @@ prost = { version = "0.13.2", features = ["no-recursion-limit"] } semver = "1.0.16" serde = { version = "1.0.152", features = ["derive"] } sysinfo = "0.27.2" -timely = "0.16.0" +timely = "0.17.0" tokio = "1.38.0" tokio-stream = "0.1.11" tonic = "0.12.1" diff --git a/src/storage-client/Cargo.toml b/src/storage-client/Cargo.toml index ce9e33156ffbb..25d7a30777e3a 100644 --- a/src/storage-client/Cargo.toml +++ b/src/storage-client/Cargo.toml @@ -13,7 +13,7 @@ workspace = true anyhow = "1.0.66" async-trait = "0.1.83" chrono = { version = "0.4.35", default-features = false, features = ["std"] } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" http = "1.1.0" itertools = { version = "0.12.1" } @@ -46,7 +46,7 @@ serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } smallvec = { version = "1.10.0", features = ["serde", "union"] } static_assertions = "1.1" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = [ "fs", "rt", diff --git a/src/storage-controller/Cargo.toml b/src/storage-controller/Cargo.toml index 31a898e4838b9..73a5e83ae0e2a 100644 --- a/src/storage-controller/Cargo.toml +++ b/src/storage-controller/Cargo.toml @@ -15,7 +15,7 @@ async-trait = "0.1.83" bytes = "1.3.0" chrono = { version = "0.4.35", default-features = false, features = ["std"] } derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" itertools = { version = "0.12.1" } mz-build-info = { path = "../build-info" } @@ -38,7 +38,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tokio-stream = "0.1.11" diff --git a/src/storage-operators/Cargo.toml b/src/storage-operators/Cargo.toml index 1bf2cd8c0b811..9dcc08ab9c819 100644 --- a/src/storage-operators/Cargo.toml +++ b/src/storage-operators/Cargo.toml @@ -17,7 +17,7 @@ async-stream = "0.3.3" aws-types = "1.1.1" bytes = "1.3.0" bytesize = "1.1.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" csv-async = { version = "1.3.0", features = ["tokio"] } futures = "0.3.25" http = "1.1.0" @@ -41,7 +41,7 @@ reqwest = { version = "0.11.13", features = ["stream"] } sentry = { version = "0.29.1" } serde = { version = "1.0.152", features = ["derive"] } smallvec = { version = "1.10.0", features = ["union"] } -timely = "0.16.0" +timely = "0.17.0" thiserror = "1.0.37" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-stream = "0.1.11" diff --git a/src/storage-types/Cargo.toml b/src/storage-types/Cargo.toml index d04a4341742af..ab09f4d1be935 100644 --- a/src/storage-types/Cargo.toml +++ b/src/storage-types/Cargo.toml @@ -25,7 +25,7 @@ bytes = "1.3.0" columnation = "0.1.0" dec = "0.4.8" derivative = "2.2.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" hex = "0.4.3" http = "1.1.0" itertools = { version = "0.12.1" } @@ -62,7 +62,7 @@ regex = "1.7.0" serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125", features = ["preserve_order"] } thiserror = "1.0.37" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util", "time"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tracing = "0.1.37" diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index a586bd888889a..0bd2af8dacca5 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -26,7 +26,7 @@ columnation = "0.1.0" crossbeam-channel = "0.5.8" csv-core = { version = "0.1.10" } dec = "0.4.8" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" fail = { version = "0.5.1", features = ["failpoints"] } futures = "0.3.25" indexmap = { version = "2.0.0", default-features = false, features = ["std"] } @@ -77,7 +77,7 @@ serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.125" } serde_bytes = { version = "0.11.14" } sha2 = "0.10.6" -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["fs", "rt", "sync", "test-util"] } tokio-postgres = { version = "0.7.8", features = ["serde"] } tokio-stream = "0.1.11" diff --git a/src/timely-util/Cargo.toml b/src/timely-util/Cargo.toml index 9d895e52c755b..88901d8931402 100644 --- a/src/timely-util/Cargo.toml +++ b/src/timely-util/Cargo.toml @@ -15,7 +15,7 @@ bincode = "1.3.3" bytemuck = "1.21.0" columnar = "0.2.2" columnation = "0.1.0" -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" either = "1" futures-util = "0.3.25" lgalloc = "0.4" @@ -23,7 +23,7 @@ mz-ore = { path = "../ore", features = ["async", "process", "tracing_", "test"] num-traits = "0.2" proptest = { version = "1.6.0", default-features = false, features = ["std"] } serde = { version = "1.0.152", features = ["derive"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread", "time"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["serde", "v4"] } diff --git a/src/timely-util/src/replay.rs b/src/timely-util/src/replay.rs index d7cb296d1f568..f0fe73ff4ba4e 100644 --- a/src/timely-util/src/replay.rs +++ b/src/timely-util/src/replay.rs @@ -13,6 +13,7 @@ //! provides the protocol and semantics of the [MzReplay] operator. use std::any::Any; +use std::borrow::Cow; use std::rc::Rc; use std::time::{Duration, Instant}; use timely::container::ContainerBuilder; @@ -56,14 +57,14 @@ where ) -> (StreamCore, Rc) where CB: ContainerBuilder, - L: FnMut(Session>>, &C) + L: FnMut(Session>>, Cow) + 'static; } impl MzReplay for I where T: Timestamp, - C: Container, + C: Container + Clone, I: IntoIterator, I::Item: EventIterator + 'static, A: ActivatorTrait + 'static, @@ -78,7 +79,7 @@ where ) -> (StreamCore, Rc) where for<'a> CB: ContainerBuilder, - L: FnMut(Session>>, &C) + L: FnMut(Session>>, Cow) + 'static, { let name = format!("Replay {}", name); @@ -135,13 +136,21 @@ where if weak_token.upgrade().is_some() { for event_stream in event_streams.iter_mut() { while let Some(event) = event_stream.next() { - match &event { - Event::Progress(vec) => { + use Cow::*; + match event { + Owned(Event::Progress(vec)) => { + progress.internals[0].extend(vec.iter().cloned()); + progress_sofar.extend(vec.into_iter()); + } + Owned(Event::Messages(time, data)) => { + logic(output.session_with_builder(&time), Owned(data)); + } + Borrowed(Event::Progress(vec)) => { progress.internals[0].extend(vec.iter().cloned()); progress_sofar.extend(vec.iter().cloned()); } - Event::Messages(time, data) => { - logic(output.session_with_builder(time), data); + Borrowed(Event::Messages(time, data)) => { + logic(output.session_with_builder(time), Borrowed(data)); } } } diff --git a/src/transform/Cargo.toml b/src/transform/Cargo.toml index 980a526f9e0ba..2718a2b83b3c8 100644 --- a/src/transform/Cargo.toml +++ b/src/transform/Cargo.toml @@ -10,7 +10,7 @@ publish = false workspace = true [dependencies] -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" enum-kinds = "0.5.1" itertools = "0.12.1" mz-compute-types = { path = "../compute-types" } diff --git a/src/txn-wal/Cargo.toml b/src/txn-wal/Cargo.toml index e1fce89097ebe..6e7e2bdff4a10 100644 --- a/src/txn-wal/Cargo.toml +++ b/src/txn-wal/Cargo.toml @@ -12,7 +12,7 @@ workspace = true [dependencies] async-trait = "0.1.83" bytes = { version = "1.3.0" } -differential-dataflow = "0.13.3" +differential-dataflow = "0.13.4" futures = "0.3.25" itertools = { version = "0.12.1" } mz-ore = { path = "../ore", features = ["process"] } @@ -23,7 +23,7 @@ mz-timely-util = { path = "../timely-util" } prometheus = { version = "0.13.3", default-features = false } prost = { version = "0.13.2", features = ["no-recursion-limit"] } serde = { version = "1.0.152", features = ["derive", "rc"] } -timely = "0.16.0" +timely = "0.17.0" tokio = { version = "1.38.0", default-features = false, features = ["rt", "rt-multi-thread"] } tracing = "0.1.37" uuid = { version = "1.7.0", features = ["v4"] } diff --git a/test/sqllogictest/cluster.slt b/test/sqllogictest/cluster.slt index c0976f74e90e8..ca0cb75c4e692 100644 --- a/test/sqllogictest/cluster.slt +++ b/test/sqllogictest/cluster.slt @@ -233,11 +233,12 @@ bar mz_dataflow_addresses_per_worker mz_dataflow_addresses_per_worker_u7_prima bar mz_dataflow_addresses_per_worker mz_dataflow_addresses_per_worker_u7_primary_idx 2 worker_id NULL false bar mz_dataflow_channels_per_worker mz_dataflow_channels_per_worker_u7_primary_idx 1 id NULL false bar mz_dataflow_channels_per_worker mz_dataflow_channels_per_worker_u7_primary_idx 2 worker_id NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 1 address NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 2 port NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 3 worker_id NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 4 update_type NULL false -bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 5 time NULL true +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 1 id NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 2 worker_id NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 3 source NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 4 port NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 5 update_type NULL false +bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 6 time NULL true bar mz_dataflow_operators_per_worker mz_dataflow_operators_per_worker_u7_primary_idx 1 id NULL false bar mz_dataflow_operators_per_worker mz_dataflow_operators_per_worker_u7_primary_idx 2 worker_id NULL false bar mz_dataflow_shutdown_durations_histogram_raw mz_dataflow_shutdown_durations_histogram_raw_u7_primary_idx 1 worker_id NULL false diff --git a/test/sqllogictest/mz_catalog_server_index_accounting.slt b/test/sqllogictest/mz_catalog_server_index_accounting.slt index 5202fc259d718..74bc88616e793 100644 --- a/test/sqllogictest/mz_catalog_server_index_accounting.slt +++ b/test/sqllogictest/mz_catalog_server_index_accounting.slt @@ -64,7 +64,7 @@ mz_continual_tasks_ind CREATE␠INDEX␠"mz_continual_tasks_ind"␠IN␠CLUSTER mz_databases_ind CREATE␠INDEX␠"mz_databases_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s457␠AS␠"mz_catalog"."mz_databases"]␠("name") mz_dataflow_addresses_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_addresses_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_addresses_per_worker"␠("id",␠"worker_id") mz_dataflow_channels_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_channels_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_channels_per_worker"␠("id",␠"worker_id") -mz_dataflow_operator_reachability_raw_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_operator_reachability_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_operator_reachability_raw"␠("address",␠"port",␠"worker_id",␠"update_type",␠"time") +mz_dataflow_operator_reachability_raw_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_operator_reachability_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_operator_reachability_raw"␠("id",␠"worker_id",␠"source",␠"port",␠"update_type",␠"time") mz_dataflow_operators_per_worker_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_operators_per_worker_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_operators_per_worker"␠("id",␠"worker_id") mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx CREATE␠INDEX␠"mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx"␠IN␠CLUSTER␠[s2]␠ON␠"mz_introspection"."mz_dataflow_shutdown_durations_histogram_raw"␠("worker_id",␠"duration_ns") mz_frontiers_ind CREATE␠INDEX␠"mz_frontiers_ind"␠IN␠CLUSTER␠[s2]␠ON␠[s696␠AS␠"mz_internal"."mz_frontiers"]␠("object_id") @@ -369,8 +369,9 @@ mz_dataflow_channels_per_worker id mz_dataflow_channels_per_worker to_index mz_dataflow_channels_per_worker to_port mz_dataflow_channels_per_worker worker_id -mz_dataflow_operator_reachability_raw address +mz_dataflow_operator_reachability_raw id mz_dataflow_operator_reachability_raw port +mz_dataflow_operator_reachability_raw source mz_dataflow_operator_reachability_raw time mz_dataflow_operator_reachability_raw update_type mz_dataflow_operator_reachability_raw worker_id diff --git a/test/testdrive-old-kafka-src-syntax/indexes.td b/test/testdrive-old-kafka-src-syntax/indexes.td index bcd037d2b552b..afe535df1b2a9 100644 --- a/test/testdrive-old-kafka-src-syntax/indexes.td +++ b/test/testdrive-old-kafka-src-syntax/indexes.td @@ -318,7 +318,7 @@ mz_continual_tasks_ind mz_continual_tasks mz_databases_ind mz_databases mz_catalog_server {name} "" mz_dataflow_addresses_per_worker_s2_primary_idx mz_dataflow_addresses_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_channels_per_worker_s2_primary_idx mz_dataflow_channels_per_worker mz_catalog_server {id,worker_id} "" -mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {address,port,worker_id,update_type,time} "" +mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {id,worker_id,source,port,update_type,time} "" mz_dataflow_operators_per_worker_s2_primary_idx mz_dataflow_operators_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx mz_dataflow_shutdown_durations_histogram_raw mz_catalog_server {worker_id,duration_ns} "" mz_frontiers_ind mz_frontiers mz_catalog_server {object_id} "" diff --git a/test/testdrive/indexes.td b/test/testdrive/indexes.td index 45eb7f7eb6959..a2c3af8e7d24f 100644 --- a/test/testdrive/indexes.td +++ b/test/testdrive/indexes.td @@ -335,7 +335,7 @@ mz_continual_tasks_ind mz_continual_tasks mz_databases_ind mz_databases mz_catalog_server {name} "" mz_dataflow_addresses_per_worker_s2_primary_idx mz_dataflow_addresses_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_channels_per_worker_s2_primary_idx mz_dataflow_channels_per_worker mz_catalog_server {id,worker_id} "" -mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {address,port,worker_id,update_type,time} "" +mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_catalog_server {id,worker_id,source,port,update_type,time} "" mz_dataflow_operators_per_worker_s2_primary_idx mz_dataflow_operators_per_worker mz_catalog_server {id,worker_id} "" mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx mz_dataflow_shutdown_durations_histogram_raw mz_catalog_server {worker_id,duration_ns} "" mz_frontiers_ind mz_frontiers mz_catalog_server {object_id} "" diff --git a/test/testdrive/logging.td b/test/testdrive/logging.td index 7512451095722..b3d9f2949b213 100644 --- a/test/testdrive/logging.td +++ b/test/testdrive/logging.td @@ -156,9 +156,9 @@ SID batch_received 7 bigint ORDER BY position id name position type -------------------------------------- -SID address 1 list -SID port 2 uint8 -SID worker_id 3 uint8 +SID id 1 uint8 +SID worker_id 2 uint8 +SID port 3 uint8 SID update_type 4 text SID time 5 mz_timestamp SID count 6 bigint