From b846f738d168581296f1e541abaf18d25d37b316 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 6 Dec 2024 11:24:03 -0500 Subject: [PATCH] Merge batcher for flat container without key and value (#547) Signed-off-by: Moritz Hoffmann --- .../implementations/merge_batcher_flat.rs | 47 +++++++++---------- 1 file changed, 21 insertions(+), 26 deletions(-) diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index e6087712f..505e0d7dc 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -5,7 +5,7 @@ use std::marker::PhantomData; use timely::progress::frontier::{Antichain, AntichainRef}; use timely::{Data, PartialOrder}; use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems}; -use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; +use timely::container::flatcontainer::impls::tuple::TupleABCRegion; use crate::difference::{IsZero, Semigroup}; use crate::trace::implementations::merge_batcher::Merger; @@ -56,10 +56,8 @@ impl FlatcontainerMerger { /// Behavior to dissect items of chunks in the merge batcher pub trait MergerChunk: Region { - /// The key of the update - type Key<'a>: Ord where Self: 'a; - /// The value of the update - type Val<'a>: Ord where Self: 'a; + /// The data portion of the update + type Data<'a>: Ord where Self: 'a; /// The time of the update type Time<'a>: Ord where Self: 'a; /// The owned time type. @@ -70,28 +68,25 @@ pub trait MergerChunk: Region { type DiffOwned; /// Split a read item into its constituents. Must be cheap. - fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>); + fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>); } -impl MergerChunk for TupleABCRegion, T, R> +impl MergerChunk for TupleABCRegion where - K: Region, - for<'a> K::ReadItem<'a>: Ord, - V: Region, - for<'a> V::ReadItem<'a>: Ord, + D: Region, + for<'a> D::ReadItem<'a>: Ord, T: Region, for<'a> T::ReadItem<'a>: Ord, R: Region, { - type Key<'a> = K::ReadItem<'a> where Self: 'a; - type Val<'a> = V::ReadItem<'a> where Self: 'a; + type Data<'a> = D::ReadItem<'a> where Self: 'a; type Time<'a> = T::ReadItem<'a> where Self: 'a; type TimeOwned = T::Owned; type Diff<'a> = R::ReadItem<'a> where Self: 'a; type DiffOwned = R::Owned; - fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { - (key, val, time, diff) + fn into_parts<'a>((data, time, diff): Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>) { + (data, time, diff) } } @@ -100,8 +95,8 @@ where for<'a> MC: MergerChunk + Clone + 'static + ReserveItems<::ReadItem<'a>> + Push<::ReadItem<'a>> - + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)> - + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>, + + Push<(MC::Data<'a>, MC::Time<'a>, &'a MC::DiffOwned)> + + Push<(MC::Data<'a>, MC::Time<'a>, MC::Diff<'a>)>, for<'a> MC::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=MC::TimeOwned>, for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, @@ -125,9 +120,9 @@ where while !head1.is_empty() && !head2.is_empty() { while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { let cmp = { - let (key1, val1, time1, _diff) = MC::into_parts(head1.peek()); - let (key2, val2, time2, _diff) = MC::into_parts(head2.peek()); - ((key1, val1), time1).cmp(&((key2, val2), time2)) + let (data1, time1, _diff) = MC::into_parts(head1.peek()); + let (data2, time2, _diff) = MC::into_parts(head2.peek()); + (data1, time1).cmp(&(data2, time2)) }; // TODO: The following less/greater branches could plausibly be a good moment for // `copy_range`, on account of runs of records that might benefit more from a @@ -140,12 +135,12 @@ where result.copy(head2.pop()); } Ordering::Equal => { - let (key, val, time1, diff1) = MC::into_parts(head1.pop()); - let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop()); + let (data, time1, diff1) = MC::into_parts(head1.pop()); + let (_data, _time2, diff2) = MC::into_parts(head2.pop()); diff1.clone_onto(&mut diff); diff.plus_equals(&diff2); if !diff.is_zero() { - result.copy(((key, val), time1, &diff)); + result.copy((data, time1, &diff)); } } } @@ -212,20 +207,20 @@ where let mut ready = self.empty(stash); for buffer in merged { - for (key, val, time, diff) in buffer.iter().map(MC::into_parts) { + for (data, time, diff) in buffer.iter().map(MC::into_parts) { if upper.less_equal(&time) { frontier.insert_with(&time, |time| (*time).into_owned()); if keep.len() == keep.capacity() && !keep.is_empty() { kept.push(keep); keep = self.empty(stash); } - keep.copy(((key, val), time, diff)); + keep.copy((data, time, diff)); } else { if ready.len() == ready.capacity() && !ready.is_empty() { readied.push(ready); ready = self.empty(stash); } - ready.copy(((key, val), time, diff)); + ready.copy((data, time, diff)); } } // Recycling buffer.