Skip to content

Commit

Permalink
Address feedback and correct for WMR
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jan 29, 2025
1 parent a68da0d commit 7d426db
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 4 deletions.
5 changes: 4 additions & 1 deletion src/catalog/src/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5578,7 +5578,10 @@ FROM
mz_introspection.mz_dataflow_addresses_per_worker addr1,
mz_introspection.mz_dataflow_addresses_per_worker addr2
WHERE
addr2.address = addr1.address || reachability.source
CASE
WHEN source = 0 THEN addr2.address = addr1.address
ELSE addr2.address = addr1.address || reachability.source
END
AND addr1.id = reachability.id
AND addr1.worker_id = reachability.worker_id
AND addr2.worker_id = reachability.worker_id
Expand Down
3 changes: 2 additions & 1 deletion src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ where
///
/// The `N` type parameter specifies the number of links to create for the event queue. We need
/// separate links for queues that feed from multiple loggers because the `EventLink` type is not
/// multi-producer safe.
/// multi-producer safe (it is a linked-list, and multiple writers would blindly append, replacing
/// existing new data, and cutting off other writers).
#[derive(Clone)]
struct EventQueue<C, const N: usize = 1> {
links: [Rc<EventLink<Timestamp, C>>; N],
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pub(super) fn construct<A: Allocate>(

worker.dataflow_named("Dataflow: differential logging", move |scope| {
let enable_logging = config.enable_logging;
let (logs, token) = event_queue.links.clone()
let (logs, token) = event_queue.links
.mz_replay::<_, ProvidedBuilder<_>, _>(
scope,
"differential logs",
Expand Down
2 changes: 1 addition & 1 deletion src/compute/src/logging/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
let mut register = self.worker.log_register();
register.insert_logger("timely", t_logger);
// Note that each reachability logger has a unique index, this is crucial to avoid dropping
// data.
// data because the event link structure is not multi-producer safe.
self.register_reachability_logger::<Timestamp>(&mut register, 0);
self.register_reachability_logger::<Product<Timestamp, PointStamp<u64>>>(&mut register, 1);
self.register_reachability_logger::<(Timestamp, Subtime)>(&mut register, 2);
Expand Down

0 comments on commit 7d426db

Please sign in to comment.