From ae4bb2e91156d460e882991239b69842fe9624b8 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Wed, 8 Jan 2025 15:38:02 +0100 Subject: [PATCH] Incorporate breaking changes from Timely's logging update https://github.com/TimelyDataflow/timely-dataflow/pull/615 Signed-off-by: Moritz Hoffmann --- examples/bfs.rs | 4 +- examples/dynamic.rs | 4 +- src/logging.rs | 7 +++- src/operators/arrange/agent.rs | 19 +++++---- src/operators/arrange/arrangement.rs | 2 +- src/operators/arrange/upsert.rs | 2 +- src/operators/reduce.rs | 2 +- src/trace/implementations/merge_batcher.rs | 11 +++-- src/trace/implementations/spine_fueled.rs | 48 +++++++++++----------- src/trace/mod.rs | 5 +-- 10 files changed, 54 insertions(+), 50 deletions(-) diff --git a/examples/bfs.rs b/examples/bfs.rs index 26d58fe24..fa6bad616 100644 --- a/examples/bfs.rs +++ b/examples/bfs.rs @@ -7,7 +7,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; -use differential_dataflow::logging::DifferentialEvent; +use differential_dataflow::logging::DifferentialEventBuilder; type Node = u32; type Edge = (Node, Node); @@ -30,7 +30,7 @@ fn main() { if let Ok(stream) = ::std::net::TcpStream::connect(&addr) { let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream); let mut logger = ::timely::logging::BatchLogger::new(writer); - worker.log_register().insert::("differential/arrange", move |time, data| + worker.log_register().insert::("differential/arrange", move |time, data| logger.publish_batch(time, data) ); } diff --git a/examples/dynamic.rs b/examples/dynamic.rs index 01422ec57..6a2daf58a 100644 --- a/examples/dynamic.rs +++ b/examples/dynamic.rs @@ -7,7 +7,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; -use differential_dataflow::logging::DifferentialEvent; +use differential_dataflow::logging::DifferentialEventBuilder; type Node = u32; type Edge = (Node, Node); @@ -30,7 +30,7 @@ fn main() { if let Ok(stream) = ::std::net::TcpStream::connect(&addr) { let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream); let mut logger = ::timely::logging::BatchLogger::new(writer); - worker.log_register().insert::("differential/arrange", move |time, data| + worker.log_register().insert::("differential/arrange", move |time, data| logger.publish_batch(time, data) ); } diff --git a/src/logging.rs b/src/logging.rs index 628f283af..215e7f767 100644 --- a/src/logging.rs +++ b/src/logging.rs @@ -3,8 +3,11 @@ use columnar::Columnar; use serde::{Deserialize, Serialize}; +/// Container builder for differential log events. +pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder>; + /// Logger for differential dataflow events. -pub type Logger = ::timely::logging::Logger; +pub type Logger = ::timely::logging_core::Logger; /// Enables logging of differential dataflow events. pub fn enable(worker: &mut timely::worker::Worker, writer: W) -> Option> @@ -16,7 +19,7 @@ where let mut logger = ::timely::logging::BatchLogger::new(writer); worker .log_register() - .insert::("differential/arrange", move |time, data| logger.publish_batch(time, data)) + .insert::("differential/arrange", move |time, data| logger.publish_batch(time, data)) } /// Possible different differential events. diff --git a/src/operators/arrange/agent.rs b/src/operators/arrange/agent.rs index bc74e82cd..92c00f144 100644 --- a/src/operators/arrange/agent.rs +++ b/src/operators/arrange/agent.rs @@ -15,6 +15,7 @@ use crate::trace::wrappers::rc::TraceBox; use timely::scheduling::Activator; +use crate::logging::DifferentialEvent; use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged}; use super::TraceReplayInstruction; @@ -93,9 +94,9 @@ impl TraceAgent { let queues = Rc::new(RefCell::new(Vec::new())); if let Some(logging) = &logging { - logging.log( - crate::logging::TraceShare { operator: operator.global_id, diff: 1 } - ); + logging.log(DifferentialEvent::from(crate::logging::TraceShare { + operator: operator.global_id, diff: 1 + })); } let reader = TraceAgent { @@ -532,9 +533,9 @@ where fn clone(&self) -> Self { if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 } - ); + logging.log(DifferentialEvent::from(crate::logging::TraceShare { + operator: self.operator.global_id, diff: 1 + })); } // increase counts for wrapped `TraceBox`. @@ -561,9 +562,9 @@ where fn drop(&mut self) { if let Some(logging) = &self.logging { - logging.log( - crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 } - ); + logging.log(DifferentialEvent::from(crate::logging::TraceShare { + operator: self.operator.global_id, diff: -1 + })); } // decrement borrow counts to remove all holds diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 16d4c16be..9e820e7d7 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -437,7 +437,7 @@ where // Acquire a logger for arrange events. let logger = { let register = scope.log_register(); - register.get::("differential/arrange") + register.get::("differential/arrange") }; // Where we will deposit received updates, and from which we extract batches. diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index cfabfd472..64db110b5 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -156,7 +156,7 @@ where let logger = { let scope = stream.scope(); let register = scope.log_register(); - register.get::("differential/arrange") + register.get::("differential/arrange") }; // Tracks the lower envelope of times in `priority_queue`. diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 886c13d67..e7f8fcf24 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -336,7 +336,7 @@ where let logger = { let scope = trace.stream.scope(); let register = scope.log_register(); - register.get::("differential/arrange") + register.get::("differential/arrange") }; let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone())); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 7923ba3e7..2e24fca97 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -12,13 +12,12 @@ use std::marker::PhantomData; -use timely::logging_core::Logger; use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; use timely::Container; use timely::container::{ContainerBuilder, PushInto}; -use crate::logging::{BatcherEvent, DifferentialEvent}; +use crate::logging::{BatcherEvent, DifferentialEvent, Logger}; use crate::trace::{Batcher, Builder, Description}; /// Creates batches from containers of unordered tuples. @@ -41,7 +40,7 @@ pub struct MergeBatcher { /// The lower-bound frontier of the data, after the last call to seal. frontier: Antichain, /// Logger for size accounting. - logger: Option>, + logger: Option, /// Timely operator ID. operator_id: usize, /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. @@ -58,7 +57,7 @@ where type Time = M::Time; type Output = M::Chunk; - fn new(logger: Option>, operator_id: usize) -> Self { + fn new(logger: Option, operator_id: usize) -> Self { Self { logger, operator_id, @@ -184,13 +183,13 @@ where capacity = capacity.saturating_add_unsigned(capacity_); allocations = allocations.saturating_add_unsigned(allocations_); } - logger.log(BatcherEvent { + logger.log(DifferentialEvent::from(BatcherEvent { operator: self.operator_id, records_diff: records * diff, size_diff: size * diff, capacity_diff: capacity * diff, allocations_diff: allocations * diff, - }) + })); } } } diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 6b1c02d5c..2337a4b8d 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -69,7 +69,7 @@ //! have paid back any "debt" to higher layers by continuing to provide fuel as updates arrive. -use crate::logging::Logger; +use crate::logging::{DifferentialEvent, Logger}; use crate::trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic}; use crate::trace::cursor::CursorList; use crate::trace::Merger; @@ -293,10 +293,12 @@ where fn insert(&mut self, batch: Self::Batch) { // Log the introduction of a batch. - self.logger.as_ref().map(|l| l.log(crate::logging::BatchEvent { - operator: self.operator.global_id, - length: batch.len() - })); + if let Some(l) = self.logger.as_ref() { + l.log(DifferentialEvent::from(crate::logging::BatchEvent { + operator: self.operator.global_id, + length: batch.len() + })) + }; assert!(batch.lower() != batch.upper()); assert_eq!(batch.lower(), &self.upper); @@ -331,35 +333,35 @@ impl Spine { for batch in self.merging.drain(..) { match batch { MergeState::Single(Some(batch)) => { - logger.log(crate::logging::DropEvent { + logger.log(DifferentialEvent::from(crate::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), - }); + })); }, MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { - logger.log(crate::logging::DropEvent { + logger.log(DifferentialEvent::from(crate::logging::DropEvent { operator: self.operator.global_id, length: batch1.len(), - }); - logger.log(crate::logging::DropEvent { + })); + logger.log(DifferentialEvent::from(crate::logging::DropEvent { operator: self.operator.global_id, length: batch2.len(), - }); + })); }, MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { - logger.log(crate::logging::DropEvent { + logger.log(DifferentialEvent::from(crate::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), - }); + })); } _ => { }, } } for batch in self.pending.drain(..) { - logger.log(crate::logging::DropEvent { + logger.log(DifferentialEvent::from(crate::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), - }); + })); } } } @@ -643,15 +645,15 @@ impl Spine { } MergeState::Single(old) => { // Log the initiation of a merge. - self.logger.as_ref().map(|l| l.log( - crate::logging::MergeEvent { + if let Some(l) = self.logger.as_ref() { + l.log(DifferentialEvent::from(crate::logging::MergeEvent { operator: self.operator.global_id, scale: index, length1: old.as_ref().map(|b| b.len()).unwrap_or(0), length2: batch.as_ref().map(|b| b.len()).unwrap_or(0), complete: None, - } - )); + })); + } let compaction_frontier = self.logical_frontier.borrow(); self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier); } @@ -666,15 +668,15 @@ impl Spine { if let Some((merged, inputs)) = self.merging[index].complete() { if let Some((input1, input2)) = inputs { // Log the completion of a merge from existing parts. - self.logger.as_ref().map(|l| l.log( - crate::logging::MergeEvent { + if let Some(l) = self.logger.as_ref() { + l.log(DifferentialEvent::from(crate::logging::MergeEvent { operator: self.operator.global_id, scale: index, length1: input1.len(), length2: input2.len(), complete: Some(merged.len()), - } - )); + })); + } } Some(merged) } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index e06183faf..e8c28a891 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -12,11 +12,10 @@ pub mod description; pub mod implementations; pub mod wrappers; -use timely::logging_core::Logger; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; -use crate::logging::DifferentialEvent; +use crate::logging::Logger; use crate::trace::cursor::IntoOwned; use crate::difference::Semigroup; use crate::lattice::Lattice; @@ -309,7 +308,7 @@ pub trait Batcher { /// Times at which batches are formed. type Time: Timestamp; /// Allocates a new empty batcher. - fn new(logger: Option>, operator_id: usize) -> Self; + fn new(logger: Option, operator_id: usize) -> Self; /// Adds an unordered container of elements to the batcher. fn push_container(&mut self, batch: &mut Self::Input); /// Returns all updates not greater or equal to an element of `upper`.