Skip to content

Commit

Permalink
Incorporate breaking changes from Timely's logging update
Browse files Browse the repository at this point in the history
  • Loading branch information
antiguru committed Jan 8, 2025
1 parent c3871fb commit ae4bb2e
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 50 deletions.
4 changes: 2 additions & 2 deletions examples/bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::logging::DifferentialEventBuilder;

type Node = u32;
type Edge = (Node, Node);
Expand All @@ -30,7 +30,7 @@ fn main() {
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
Expand Down
4 changes: 2 additions & 2 deletions examples/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::logging::DifferentialEventBuilder;

type Node = u32;
type Edge = (Node, Node);
Expand All @@ -30,7 +30,7 @@ fn main() {
if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
worker.log_register().insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
Expand Down
7 changes: 5 additions & 2 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
use columnar::Columnar;
use serde::{Deserialize, Serialize};

/// Container builder for differential log events.
pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;

/// Logger for differential dataflow events.
pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
pub type Logger = ::timely::logging_core::Logger<DifferentialEventBuilder>;

/// Enables logging of differential dataflow events.
pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
Expand All @@ -16,7 +19,7 @@ where
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker
.log_register()
.insert::<DifferentialEvent,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
.insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
}

/// Possible different differential events.
Expand Down
19 changes: 10 additions & 9 deletions src/operators/arrange/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::trace::wrappers::rc::TraceBox;

use timely::scheduling::Activator;

use crate::logging::DifferentialEvent;
use super::{TraceWriter, TraceAgentQueueWriter, TraceAgentQueueReader, Arranged};
use super::TraceReplayInstruction;

Expand Down Expand Up @@ -93,9 +94,9 @@ impl<Tr: TraceReader> TraceAgent<Tr> {
let queues = Rc::new(RefCell::new(Vec::new()));

if let Some(logging) = &logging {
logging.log(
crate::logging::TraceShare { operator: operator.global_id, diff: 1 }
);
logging.log(DifferentialEvent::from(crate::logging::TraceShare {

Check failure on line 97 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 97 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 97 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: operator.global_id, diff: 1
}));
}

let reader = TraceAgent {
Expand Down Expand Up @@ -532,9 +533,9 @@ where
fn clone(&self) -> Self {

if let Some(logging) = &self.logging {
logging.log(
crate::logging::TraceShare { operator: self.operator.global_id, diff: 1 }
);
logging.log(DifferentialEvent::from(crate::logging::TraceShare {

Check failure on line 536 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 536 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 536 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id, diff: 1
}));
}

// increase counts for wrapped `TraceBox`.
Expand All @@ -561,9 +562,9 @@ where
fn drop(&mut self) {

if let Some(logging) = &self.logging {
logging.log(
crate::logging::TraceShare { operator: self.operator.global_id, diff: -1 }
);
logging.log(DifferentialEvent::from(crate::logging::TraceShare {

Check failure on line 565 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 565 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 565 in src/operators/arrange/agent.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id, diff: -1
}));
}

// decrement borrow counts to remove all holds
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ where
// Acquire a logger for arrange events.
let logger = {
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange")
};

// Where we will deposit received updates, and from which we extract batches.
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ where
let logger = {
let scope = stream.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange")
};

// Tracks the lower envelope of times in `priority_queue`.
Expand Down
2 changes: 1 addition & 1 deletion src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ where
let logger = {
let scope = trace.stream.scope();
let register = scope.log_register();
register.get::<crate::logging::DifferentialEvent>("differential/arrange")
register.get::<crate::logging::DifferentialEventBuilder>("differential/arrange")
};

let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
Expand Down
11 changes: 5 additions & 6 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@
use std::marker::PhantomData;

use timely::logging_core::Logger;
use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};

use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::logging::{BatcherEvent, DifferentialEvent, Logger};
use crate::trace::{Batcher, Builder, Description};

/// Creates batches from containers of unordered tuples.
Expand All @@ -41,7 +40,7 @@ pub struct MergeBatcher<Input, C, M: Merger> {
/// The lower-bound frontier of the data, after the last call to seal.
frontier: Antichain<M::Time>,
/// Logger for size accounting.
logger: Option<Logger<DifferentialEvent>>,
logger: Option<Logger>,
/// Timely operator ID.
operator_id: usize,
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
Expand All @@ -58,7 +57,7 @@ where
type Time = M::Time;
type Output = M::Chunk;

fn new(logger: Option<Logger<DifferentialEvent>>, operator_id: usize) -> Self {
fn new(logger: Option<Logger>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
Expand Down Expand Up @@ -184,13 +183,13 @@ where
capacity = capacity.saturating_add_unsigned(capacity_);
allocations = allocations.saturating_add_unsigned(allocations_);
}
logger.log(BatcherEvent {
logger.log(DifferentialEvent::from(BatcherEvent {
operator: self.operator_id,
records_diff: records * diff,
size_diff: size * diff,
capacity_diff: capacity * diff,
allocations_diff: allocations * diff,
})
}));
}
}
}
Expand Down
48 changes: 25 additions & 23 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
//! have paid back any "debt" to higher layers by continuing to provide fuel as updates arrive.

use crate::logging::Logger;
use crate::logging::{DifferentialEvent, Logger};
use crate::trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic};
use crate::trace::cursor::CursorList;
use crate::trace::Merger;
Expand Down Expand Up @@ -293,10 +293,12 @@ where
fn insert(&mut self, batch: Self::Batch) {

// Log the introduction of a batch.
self.logger.as_ref().map(|l| l.log(crate::logging::BatchEvent {
operator: self.operator.global_id,
length: batch.len()
}));
if let Some(l) = self.logger.as_ref() {
l.log(DifferentialEvent::from(crate::logging::BatchEvent {

Check failure on line 297 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 297 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 297 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id,
length: batch.len()
}))
};

assert!(batch.lower() != batch.upper());
assert_eq!(batch.lower(), &self.upper);
Expand Down Expand Up @@ -331,35 +333,35 @@ impl<B: Batch> Spine<B> {
for batch in self.merging.drain(..) {
match batch {
MergeState::Single(Some(batch)) => {
logger.log(crate::logging::DropEvent {
logger.log(DifferentialEvent::from(crate::logging::DropEvent {

Check failure on line 336 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 336 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 336 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id,
length: batch.len(),
});
}));
},
MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => {
logger.log(crate::logging::DropEvent {
logger.log(DifferentialEvent::from(crate::logging::DropEvent {

Check failure on line 342 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 342 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 342 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id,
length: batch1.len(),
});
logger.log(crate::logging::DropEvent {
}));
logger.log(DifferentialEvent::from(crate::logging::DropEvent {

Check failure on line 346 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 346 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 346 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id,
length: batch2.len(),
});
}));
},
MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => {
logger.log(crate::logging::DropEvent {
logger.log(DifferentialEvent::from(crate::logging::DropEvent {

Check failure on line 352 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 352 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 352 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id,
length: batch.len(),
});
}));
}
_ => { },
}
}
for batch in self.pending.drain(..) {
logger.log(crate::logging::DropEvent {
logger.log(DifferentialEvent::from(crate::logging::DropEvent {

Check failure on line 361 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 361 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 361 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id,
length: batch.len(),
});
}));
}
}
}
Expand Down Expand Up @@ -643,15 +645,15 @@ impl<B: Batch> Spine<B> {
}
MergeState::Single(old) => {
// Log the initiation of a merge.
self.logger.as_ref().map(|l| l.log(
crate::logging::MergeEvent {
if let Some(l) = self.logger.as_ref() {
l.log(DifferentialEvent::from(crate::logging::MergeEvent {

Check failure on line 649 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / test mdBook

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 649 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied

Check failure on line 649 in src/trace/implementations/spine_fueled.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.78

the trait bound `CapacityContainerBuilder<Vec<(Duration, DifferentialEvent)>>: From<DifferentialEvent>` is not satisfied
operator: self.operator.global_id,
scale: index,
length1: old.as_ref().map(|b| b.len()).unwrap_or(0),
length2: batch.as_ref().map(|b| b.len()).unwrap_or(0),
complete: None,
}
));
}));
}
let compaction_frontier = self.logical_frontier.borrow();
self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier);
}
Expand All @@ -666,15 +668,15 @@ impl<B: Batch> Spine<B> {
if let Some((merged, inputs)) = self.merging[index].complete() {
if let Some((input1, input2)) = inputs {
// Log the completion of a merge from existing parts.
self.logger.as_ref().map(|l| l.log(
crate::logging::MergeEvent {
if let Some(l) = self.logger.as_ref() {
l.log(DifferentialEvent::from(crate::logging::MergeEvent {
operator: self.operator.global_id,
scale: index,
length1: input1.len(),
length2: input2.len(),
complete: Some(merged.len()),
}
));
}));
}
}
Some(merged)
}
Expand Down
5 changes: 2 additions & 3 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ pub mod description;
pub mod implementations;
pub mod wrappers;

use timely::logging_core::Logger;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::progress::Timestamp;

use crate::logging::DifferentialEvent;
use crate::logging::Logger;
use crate::trace::cursor::IntoOwned;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
Expand Down Expand Up @@ -309,7 +308,7 @@ pub trait Batcher {
/// Times at which batches are formed.
type Time: Timestamp;
/// Allocates a new empty batcher.
fn new(logger: Option<Logger<DifferentialEvent>>, operator_id: usize) -> Self;
fn new(logger: Option<Logger>, operator_id: usize) -> Self;
/// Adds an unordered container of elements to the batcher.
fn push_container(&mut self, batch: &mut Self::Input);
/// Returns all updates not greater or equal to an element of `upper`.
Expand Down

0 comments on commit ae4bb2e

Please sign in to comment.