From 2520e6fc4ba40ab8411e38f68a1593cf43a3d359 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 21 May 2024 22:05:00 -0400 Subject: [PATCH] Merge batcher for flatcontainer Signed-off-by: Moritz Hoffmann --- .../implementations/merge_batcher_flat.rs | 406 ++++++++++++++++++ src/trace/implementations/mod.rs | 68 ++- src/trace/implementations/ord_neu.rs | 12 +- 3 files changed, 465 insertions(+), 21 deletions(-) create mode 100644 src/trace/implementations/merge_batcher_flat.rs diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs new file mode 100644 index 000000000..0f7423f1a --- /dev/null +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -0,0 +1,406 @@ +//! A general purpose `Batcher` implementation for FlatStack. + +use crate::consolidation::consolidate_updates; +use std::cmp::Ordering; +use std::marker::PhantomData; +use timely::communication::message::RefOrMut; +use timely::progress::frontier::{Antichain, AntichainRef}; +use timely::{Container, Data, PartialOrder}; +use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems}; +use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; + +use crate::difference::Semigroup; +use crate::trace::implementations::merge_batcher::Merger; +use crate::trace::Builder; +use crate::trace::cursor::IntoOwned; + +/// A merger for flat stacks +pub struct FlatcontainerMerger { + pending: Vec, + _marker: PhantomData, +} + +impl Default for FlatcontainerMerger { + fn default() -> Self { + Self { pending: Vec::default(), _marker: PhantomData, } + } +} + +impl FlatcontainerMerger { + const BUFFER_SIZE_BYTES: usize = 64 << 10; + fn chunk_capacity(&self) -> usize { + let size = ::std::mem::size_of::(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + /// Buffer size for pending updates, currently 2 * [`Self::chunk_capacity`]. + fn pending_capacity(&self) -> usize { + self.chunk_capacity() * 2 + } + + /// Helper to get pre-sized vector from the stash. + #[inline] + fn empty(&self, stash: &mut Vec>) -> FlatStack { + stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity())) + } + + /// Helper to return a chunk to the stash. + #[inline] + fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { + // TODO: Should we limit the size of `stash`? + if chunk.capacity() == self.chunk_capacity() { + chunk.clear(); + stash.push(chunk); + } + } +} + +/// Behavior to dissect items of chunks in the merge batcher +pub trait MergerChunk: Region { + /// The key of the update + type Key<'a>: Ord where Self: 'a; + /// The value of the update + type Val<'a>: Ord where Self: 'a; + /// The time of the update + type Time<'a>: Ord where Self: 'a; + /// The diff of the update + type Diff<'a> where Self: 'a; + + /// Split a read item into its constituents. + fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>); +} + +impl MergerChunk for TupleABCRegion, T, R> +where + K: Region, + for<'a> K::ReadItem<'a>: Ord, + V: Region, + for<'a> V::ReadItem<'a>: Ord, + T: Region, + for<'a> T::ReadItem<'a>: Ord, + R: Region, +{ + type Key<'a> = K::ReadItem<'a> where TupleABCRegion, T, R>: 'a; + type Val<'a> = V::ReadItem<'a> where TupleABCRegion, T, R>: 'a; + type Time<'a> = T::ReadItem<'a> where TupleABCRegion, T, R>: 'a; + type Diff<'a> = R::ReadItem<'a> where TupleABCRegion, T, R>: 'a; + + fn into_parts<'a>(((key, val), time, diff): , T, R> as Region>::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { + (key, val, time, diff) + } +} + +impl Merger for FlatcontainerMerger<((K, V), T, R), FR> +where + K: Ord + Clone, + V: Ord + Clone, + for<'a> T: Ord + PartialOrder + PartialOrder> + Data, + for<'a> R: Default + Semigroup + Semigroup> + Data, + for<'a> FR: MergerChunk + Push<((K, V), T, R)> + Clone + 'static + + ReserveItems<::ReadItem<'a>> + + Push<::ReadItem<'a>> + + Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, &'a R)> + + Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, FR::Diff<'a>)>, + for<'a> FR::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=T>, + for<'a> FR::Diff<'a>: IntoOwned<'a, Owned=R>, +{ + type Time = T; + type Input = Vec<((K, V), T, R)>; + type Chunk = FlatStack; + type Output = FlatStack; + + fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + if self.pending.capacity() < self.pending_capacity() { + self.pending.reserve(self.pending_capacity() - self.pending.len()); + } + + // Form a chain from what's in pending. + // This closure does the following: + // * If pending is full, consolidate. + // * If after consolidation it's more than half full, peel off a chain of full blocks, + // leaving behind any partial block in pending. + // * Merge the new chain with `final_chain` and return it in-place. + let form_chain = |this: &mut Self, final_chain: &mut Vec, stash: &mut _| { + if this.pending.len() == this.pending.capacity() { + consolidate_updates(&mut this.pending); + if this.pending.len() >= this.chunk_capacity() { + let mut chain = Vec::default(); + while this.pending.len() > this.chunk_capacity() { + let mut chunk = this.empty(stash); + for datum in this.pending.drain(..chunk.capacity()) { + chunk.copy(datum); + } + chain.push(chunk); + } + if final_chain.is_empty() { + *final_chain = chain; + } else if !chain.is_empty() { + let mut output = Vec::default(); + this.merge(std::mem::take(final_chain), chain, &mut output, stash); + *final_chain = output; + } + } + } + }; + + let mut final_chain = Vec::default(); + // `container` is either a shared reference or an owned allocations. + match container { + RefOrMut::Ref(vec) => { + let mut slice = &vec[..]; + while !slice.is_empty() { + let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); + slice = tail; + self.pending.extend_from_slice(head); + form_chain(self, &mut final_chain, stash); + } + } + RefOrMut::Mut(vec) => { + while !vec.is_empty() { + self.pending.extend(vec.drain(..std::cmp::min(self.pending.capacity() - self.pending.len(), vec.len()))); + form_chain(self, &mut final_chain, stash); + } + } + } + final_chain + } + + fn finish(&mut self, stash: &mut Vec) -> Vec { + // Extract all data from `pending`. + consolidate_updates(&mut self.pending); + let mut chain = Vec::default(); + while !self.pending.is_empty() { + let mut chunk = self.empty(stash); + for datum in self.pending.drain(..std::cmp::min(chunk.capacity(), self.pending.len())) { + chunk.copy(datum); + } + chain.push(chunk); + } + chain + } + + fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { + let mut list1 = list1.into_iter(); + let mut list2 = list2.into_iter(); + + let mut head1 = >::from(list1.next().unwrap_or_default()); + let mut head2 = >::from(list2.next().unwrap_or_default()); + + let mut result = self.empty(stash); + + let mut diff = R::default(); + + // while we have valid data in each input, merge. + while !head1.is_empty() && !head2.is_empty() { + while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { + let cmp = { + let x: FR::ReadItem<'_> = head1.peek(); + let (key1, val1, time1, _diff) = FR::into_parts(x); + let y: FR::ReadItem<'_> = head2.peek(); + let (key2, val2, time2, _diff) = FR::into_parts(y); + ((key1, val1), time1).cmp(&((key2, val2), time2)) + }; + match cmp { + Ordering::Less => { + result.copy(head1.pop()); + } + Ordering::Greater => { + result.copy(head2.pop()); + } + Ordering::Equal => { + let element1 = head1.pop(); + let (key, val, time1, diff1) = FR::into_parts(element1); + let element2 = head2.pop(); + let (_key, _val, _time2, diff2) = FR::into_parts(element2); + diff1.clone_onto(&mut diff); + diff.plus_equals(&diff2); + if !diff.is_zero() { + result.copy(((key, val), time1, &diff)); + } + } + } + } + + if result.capacity() == result.len() { + output.push(result); + result = self.empty(stash); + } + + if head1.is_empty() { + self.recycle(head1.done(), stash); + head1 = FlatStackQueue::from(list1.next().unwrap_or_default()); + } + if head2.is_empty() { + self.recycle(head2.done(), stash); + head2 = FlatStackQueue::from(list2.next().unwrap_or_default()); + } + } + + if result.len() > 0 { + output.push(result); + } else { + self.recycle(result, stash); + } + + if !head1.is_empty() { + let mut result = self.empty(stash); + result.reserve_items(head1.iter()); + for item in head1.iter() { + result.copy(item); + } + output.push(result); + } + output.extend(list1); + + if !head2.is_empty() { + let mut result = self.empty(stash); + result.reserve_items(head2.iter()); + for item in head2.iter() { + result.copy(item); + } + output.push(result); + } + output.extend(list2); + } + + fn extract( + &mut self, + merged: Vec, + upper: AntichainRef, + frontier: &mut Antichain, + readied: &mut Vec, + kept: &mut Vec, + stash: &mut Vec, + ) { + let mut keep = self.empty(stash); + let mut ready = self.empty(stash); + + for buffer in merged { + for element in buffer.iter() { + let (key, val, time, diff) = FR::into_parts(element); + // let time_owned = time.flat_to_owned(); + if upper.less_equal(&time) { + frontier.insert_with(&time, |time| (*time).into_owned()); + if keep.len() == keep.capacity() && !keep.is_empty() { + kept.push(keep); + keep = self.empty(stash); + } + keep.copy(((key, val), time, diff)); + } else { + if ready.len() == ready.capacity() && !ready.is_empty() { + readied.push(ready); + ready = self.empty(stash); + } + ready.copy(((key, val), time, diff)); + } + } + // Recycling buffer. + self.recycle(buffer, stash); + } + // Finish the kept data. + if !keep.is_empty() { + kept.push(keep); + } + if !ready.is_empty() { + readied.push(ready); + } + } + + fn seal>( + chain: &mut Vec, + lower: AntichainRef, + upper: AntichainRef, + since: AntichainRef, + ) -> B::Output { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + { + let mut prev_keyval = None; + for buffer in chain.iter() { + for element in buffer.iter() { + let (key, val, time, _) = FR::into_parts(element); + if !upper.less_equal(&time) { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; + } + } else { + keys += 1; + vals += 1; + } + upds += 1; + prev_keyval = Some((key, val)); + } + } + } + } + let mut builder = B::with_capacity(keys, vals, upds); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); + } + + builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) + } + + fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let cb = |siz, cap| { + size += siz; + capacity += cap; + allocations += 1; + }; + chunk.heap_size(cb); + (chunk.len(), size, capacity, allocations) + } +} + +struct FlatStackQueue { + list: FlatStack, + head: usize, +} + +impl Default for FlatStackQueue { + fn default() -> Self { + Self::from(Default::default()) + } +} + +impl FlatStackQueue { + fn pop(&mut self) -> R::ReadItem<'_> { + self.head += 1; + self.list.get(self.head - 1) + } + + fn peek(&self) -> R::ReadItem<'_> { + self.list.get(self.head) + } + + fn from(list: FlatStack) -> Self { + FlatStackQueue { list, head: 0 } + } + + fn done(self) -> FlatStack { + self.list + } + + fn is_empty(&self) -> bool { + self.head == self.list.len() + } + + /// Return an iterator over the remaining elements. + fn iter(&self) -> impl Iterator> + Clone { + self.list.iter().skip(self.head) + } +} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index e8d068387..28c9066c0 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -42,6 +42,7 @@ pub mod spine_fueled; pub mod merge_batcher; pub mod merge_batcher_col; +pub mod merge_batcher_flat; pub mod ord_neu; pub mod rhh; pub mod huffman_container; @@ -392,28 +393,28 @@ where mod flatcontainer { use timely::container::columnation::{Columnation, TimelyStack}; - use timely::container::flatcontainer::{Containerized, FlatStack, Push, Region}; + use timely::container::flatcontainer::{Containerized, FlatStack, IntoOwned, Push, Region}; use timely::progress::Timestamp; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::implementations::{BuilderInput, FlatLayout, Layout, OffsetList, Update}; impl Layout for FlatLayout - where - U::Key: Containerized, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, - U::Val: Containerized, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, - U::Time: Containerized, - ::Region: Region, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, - U::Diff: Containerized, - ::Region: Region, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + where + U::Key: Containerized, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + U::Val: Containerized, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + U::Time: Containerized, + ::Region: Region, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + U::Diff: Containerized, + ::Region: Region, + for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, + for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, { type Target = U; type KeyContainer = FlatStack<::Region>; @@ -459,6 +460,41 @@ mod flatcontainer { **this == ::Region::reborrow(other) } } + + impl BuilderInput> for FlatStack<<((K, V), T, R) as Containerized>::Region> + where + K: Ord + Containerized + Clone + 'static, + for<'a> K::Region: Push + Push<::ReadItem<'a>> + Clone, + for<'a> ::ReadItem<'a>: Copy + Ord, + for<'a> K: PartialEq<::ReadItem<'a>>, + V: Ord + Containerized + Clone + 'static, + for<'a> V::Region: Push + Push<::ReadItem<'a>> + Clone, + for<'a> ::ReadItem<'a>: Copy + Ord, + for<'a> V: PartialEq<::ReadItem<'a>>, + T: Timestamp + Lattice + Containerized + Clone + 'static, + for<'a> T::Region: Region + Push + Push<::ReadItem<'a>> + Clone, + for<'a> ::ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned=T> + PartialEq<::ReadItem<'a>>, + R: Ord + Clone + Semigroup + Containerized + 'static, + for<'a> R::Region: Region + Push + Push<::ReadItem<'a>> + Clone, + for<'a> ::ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned=R> + PartialEq<::ReadItem<'a>>, + { + type Key<'a> = ::ReadItem<'a>; + type Val<'a> = ::ReadItem<'a>; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.into_owned(), diff.into_owned()) + } + + fn key_eq(this: &Self::Key<'_>, other: <::Region as Region>::ReadItem<'_>) -> bool { + ::Region::reborrow(*this) == ::Region::reborrow(other) + } + + fn val_eq(this: &Self::Val<'_>, other: <::Region as Region>::ReadItem<'_>) -> bool { + ::Region::reborrow(*this) == ::Region::reborrow(other) + } + } } impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index e475a0b3d..f9fe4ed49 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -10,10 +10,12 @@ use std::rc::Rc; use timely::container::columnation::{TimelyStack}; +use timely::container::flatcontainer::{Containerized, FlatStack}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::implementations::merge_batcher_col::ColumnationMerger; +use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; @@ -40,8 +42,8 @@ pub type ColValSpine = Spine< /// A trace implementation backed by flatcontainer storage. pub type FlatValSpine = Spine< Rc>>, - MergeBatcher, T>, - RcBuilder, TimelyStack<((K,V),T,R)>>>, + MergeBatcher::Region>, T>, + RcBuilder, FlatStack<<((K,V),T,R) as Containerized>::Region>>>, >; /// A trace implementation using a spine of ordered lists. @@ -62,9 +64,9 @@ pub type ColKeySpine = Spine< /// A trace implementation backed by flatcontainer storage. pub type FlatKeySpine = Spine< - Rc>>, - MergeBatcher, T>, - RcBuilder, TimelyStack<((K,()),T,R)>>>, + Rc>>, + MergeBatcher::Region>, T>, + RcBuilder, FlatStack<<((K,()),T,R) as Containerized>::Region>>>, >; /// A trace implementation backed by columnar storage.