diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 2aa08dd00..042e9f486 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -86,7 +86,7 @@ fn unoptimized() { let value_flow_next = value_flow_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 4e5c6cb16..935ba7a5b 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -49,8 +49,8 @@ where T: Timestamp, { type Input = Input; - type Output = M::Output; type Time = T; + type Output = M::Chunk; fn new(logger: Option>, operator_id: usize) -> Self { Self { @@ -109,7 +109,7 @@ where self.stash.clear(); - let seal = M::seal::(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow()); + let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow()); self.lower = upper; seal } @@ -204,10 +204,6 @@ where pub trait Merger: Default { /// The internal representation of chunks of data. type Chunk: Container; - /// The output type - /// TODO: This should be replaced by `Chunk` or another container once the builder understands - /// building from a complete chain. - type Output; /// The type of time in frontiers to extract updates. type Time; /// Merge chains into an output chain. @@ -223,15 +219,6 @@ pub trait Merger: Default { stash: &mut Vec, ); - /// Build from a chain - /// TODO: We can move this entirely to `MergeBatcher` once builders can accepts chains. - fn seal>( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> B::Output; - /// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations). fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); } @@ -286,7 +273,6 @@ where { type Time = T; type Chunk = Vec<((K, V), T, R)>; - type Output = Vec<((K, V), T, R)>; fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); @@ -401,45 +387,6 @@ where readied.push(ready); } } - - fn seal>( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> B::Output { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in chain.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } else if p_val != val { - vals += 1; - } - } else { - keys += 1; - vals += 1; - } - upds += 1; - prev_keyval = Some((key, val)); - } - } - } - let mut builder = B::with_capacity(keys, vals, upds); - - for mut chunk in chain.drain(..) { - builder.push(&mut chunk); - } - - builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) - } - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { (chunk.len(), 0, 0, 0) } diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 677347015..39a9e53ce 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -8,7 +8,6 @@ use timely::{Container, Data, PartialOrder}; use crate::difference::Semigroup; use crate::trace::implementations::merge_batcher::Merger; -use crate::trace::Builder; /// A merger for timely stacks pub struct ColumnationMerger { @@ -62,7 +61,6 @@ where { type Time = T; type Chunk = TimelyStack<((K, V), T, R)>; - type Output = TimelyStack<((K, V), T, R)>; fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); @@ -183,43 +181,6 @@ where } } - fn seal>( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> B::Output { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for buffer in chain.iter() { - for ((key, val), time, _) in buffer.iter() { - if !upper.less_equal(time) { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } else if p_val != val { - vals += 1; - } - } else { - keys += 1; - vals += 1; - } - upds += 1; - prev_keyval = Some((key, val)); - } - } - } - let mut builder = B::with_capacity(keys, vals, upds); - for mut chunk in chain.drain(..) { - builder.push(&mut chunk); - } - - builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) - } - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { let (mut size, mut capacity, mut allocations) = (0, 0, 0); let cb = |siz, cap| { diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index ae8d2894f..e6087712f 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -3,13 +3,12 @@ use std::cmp::Ordering; use std::marker::PhantomData; use timely::progress::frontier::{Antichain, AntichainRef}; -use timely::{Container, Data, PartialOrder}; +use timely::{Data, PartialOrder}; use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems}; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::difference::{IsZero, Semigroup}; use crate::trace::implementations::merge_batcher::Merger; -use crate::trace::Builder; use crate::trace::cursor::IntoOwned; /// A merger for flat stacks. @@ -110,7 +109,6 @@ where { type Time = MC::TimeOwned; type Chunk = FlatStack; - type Output = FlatStack; fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); @@ -242,47 +240,6 @@ where } } - fn seal>( - chain: &mut Vec, - lower: AntichainRef, - upper: AntichainRef, - since: AntichainRef, - ) -> B::Output { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - { - let mut prev_keyval = None; - for buffer in chain.iter() { - for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) { - if !upper.less_equal(&time) { - if let Some((p_key, p_val)) = prev_keyval { - debug_assert!(p_key <= key); - debug_assert!(p_key != key || p_val <= val); - if p_key != key { - keys += 1; - vals += 1; - } else if p_val != val { - vals += 1; - } - } else { - keys += 1; - vals += 1; - } - upds += 1; - prev_keyval = Some((key, val)); - } - } - } - } - let mut builder = B::with_capacity(keys, vals, upds); - for mut chunk in chain.drain(..) { - builder.push(&mut chunk); - } - - builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) - } - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { let (mut size, mut capacity, mut allocations) = (0, 0, 0); let cb = |siz, cap| { diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index eeabc312a..31bd4c628 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -343,6 +343,9 @@ pub trait BuilderInput: Container { /// Test that the value equals a key in the layout's value container. fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool; + + /// Count the number of distinct keys, (key, val) pairs, and total updates. + fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize); } impl BuilderInput for Vec<((K, V), T, R)> @@ -372,6 +375,31 @@ where fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool { VBC::reborrow(other) == this } + + fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for link in chain.iter() { + for ((key, val), _, _) in link.iter() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; + } + } else { + keys += 1; + vals += 1; + } + upds += 1; + prev_keyval = Some((key, val)); + } + } + (keys, vals, upds) + } } impl BuilderInput for TimelyStack<((K::Owned, V::Owned), T, R)> @@ -401,6 +429,31 @@ where fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool { V::reborrow(other) == *this } + + fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for link in chain.iter() { + for ((key, val), _, _) in link.iter() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; + } + } else { + keys += 1; + vals += 1; + } + upds += 1; + prev_keyval = Some((key, val)); + } + } + (keys, vals, upds) + } } mod flatcontainer { @@ -483,6 +536,31 @@ mod flatcontainer { fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool { VBC::reborrow(other) == V::reborrow(*this) } + + fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for link in chain.iter() { + for ((key, val), _, _) in link.iter() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; + } + } else { + keys += 1; + vals += 1; + } + upds += 1; + prev_keyval = Some((key, val)); + } + } + (keys, vals, upds) + } } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index bb1485cb1..8228294b1 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -694,8 +694,22 @@ mod val_batch { description: Description::new(lower, upper, since), } } - } + fn seal( + chain: &mut Vec, + lower: AntichainRef, + upper: AntichainRef, + since: AntichainRef, + ) -> Self::Output { + let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); + let mut builder = Self::with_capacity(keys, vals, upds); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); + } + + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + } + } } mod key_batch { @@ -1167,6 +1181,21 @@ mod key_batch { description: Description::new(lower, upper, since), } } + + fn seal( + chain: &mut Vec, + lower: AntichainRef, + upper: AntichainRef, + since: AntichainRef, + ) -> Self::Output { + let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); + let mut builder = Self::with_capacity(keys, vals, upds); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); + } + + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 866d60678..08f26ade5 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -867,6 +867,21 @@ mod val_batch { description: Description::new(lower, upper, since), } } + + fn seal( + chain: &mut Vec, + lower: AntichainRef, + upper: AntichainRef, + since: AntichainRef, + ) -> Self::Output { + let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]); + let mut builder = Self::with_capacity(keys, vals, upds); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); + } + + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + } } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index ccf8ff090..4a85747c2 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -343,6 +343,17 @@ pub trait Builder: Sized { fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; + + /// Builds a batch from a chain of updates corresponding to the indicated lower and upper bounds. + /// + /// This method relies on the chain only containing updates greater or equal to the lower frontier, + /// and not greater or equal to the upper frontier. Chains must also be sorted and consolidated. + fn seal( + chain: &mut Vec, + lower: AntichainRef, + upper: AntichainRef, + since: AntichainRef, + ) -> Self::Output; } /// Represents a merge in progress. @@ -457,6 +468,14 @@ pub mod rc_blanket_impls { fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } + fn seal( + chain: &mut Vec, + lower: AntichainRef, + upper: AntichainRef, + since: AntichainRef, + ) -> Self::Output { + Rc::new(B::seal(chain, lower, upper, since)) + } } /// Wrapper type for merging reference counted batches.