diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 6187b8ba9..331a4c305 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -118,15 +118,15 @@ pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T pub type OrdValSpineAbom<K, V, T, R, O=usize> = Spine<Rc<Abomonated<OrdValBatch<Vector<((K,V),T,R), O>, Vec<((K,V),T,R)>>, Vec<u8>>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>>; +pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>, Vec<((K,()),T,R)>>>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<Rc<Abomonated<OrdKeyBatch<Vector<((K,()),T,R), O>>, Vec<u8>>>>; +pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<Rc<Abomonated<OrdKeyBatch<Vector<((K,()),T,R), O>, Vec<((K,()),T,R)>>, Vec<u8>>>>; /// A trace implementation backed by columnar storage. pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>, TimelyStack<((K,V),T,R)>>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>>; +pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>, TimelyStack<((K,()),T,R)>>>>; /// A container that can retain/discard from some offset onward. @@ -484,20 +484,22 @@ where /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Abomonation)] -pub struct OrdKeyBatch<L: Layout> { +pub struct OrdKeyBatch<L: Layout, C> { /// Where all the dataz is. pub layer: KTDLayer<L>, /// Description of the update times this layer represents. pub desc: Description<<L::Target as Update>::Time>, + /// Phantom data + pub phantom: PhantomData<C>, } -impl<L: Layout> BatchReader for OrdKeyBatch<L> { +impl<L: Layout, C> BatchReader for OrdKeyBatch<L, C> { type Key = <L::Target as Update>::Key; type Val = (); type Time = <L::Target as Update>::Time; type R = <L::Target as Update>::Diff; - type Cursor = OrdKeyCursor<L>; + type Cursor = OrdKeyCursor<L, C>; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -509,7 +511,7 @@ impl<L: Layout> BatchReader for OrdKeyBatch<L> { fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.desc } } -impl<L: Layout> Batch for OrdKeyBatch<L> { +impl<L: Layout> Batch for OrdKeyBatch<L, Vec<L::Target>> { type Batcher = MergeBatcher<Self>; type Builder = OrdKeyBuilder<L>; type Merger = OrdKeyMerger<L>; @@ -519,7 +521,23 @@ impl<L: Layout> Batch for OrdKeyBatch<L> { } } -impl<L: Layout> OrdKeyBatch<L> { +impl<L: Layout> Batch for OrdKeyBatch<L, TimelyStack<L::Target>> +where + <L as Layout>::Target: Columnation + 'static, + Self::Key: Columnation + 'static, + Self::Time: Columnation + 'static, + Self::R: Columnation + 'static, +{ + type Batcher = ColumnatedMergeBatcher<Self>; + type Builder = OrdKeyBuilder<L>; + type Merger = OrdKeyMerger<L>; + + fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger { + OrdKeyMerger::new(self, other, compaction_frontier) + } +} + +impl<L: Layout, C> OrdKeyBatch<L, C> { fn advance_builder_from(layer: &mut KTDBuilder<L>, frontier: AntichainRef<<L::Target as Update>::Time>, key_pos: usize) { let key_start = key_pos; @@ -596,8 +614,11 @@ pub struct OrdKeyMerger<L: Layout> { should_compact: bool, } -impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> { - fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self { +impl<L: Layout, C> Merger<OrdKeyBatch<L, C>> for OrdKeyMerger<L> +where + OrdKeyBatch<L, C>: Batch<Time=<L::Target as Update>::Time> +{ + fn new(batch1: &OrdKeyBatch<L, C>, batch2: &OrdKeyBatch<L, C>, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -618,7 +639,7 @@ impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> { should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdKeyBatch<L> { + fn done(self) -> OrdKeyBatch<L, C> { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -626,9 +647,10 @@ impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> { OrdKeyBatch { layer: self.result.done(), desc: self.description, + phantom: PhantomData, } } - fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch<L, C>, source2: &OrdKeyBatch<L, C>, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -673,7 +695,7 @@ impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> { // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::<L>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::<L, C>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -687,19 +709,19 @@ impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> { /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor<L: Layout> { +pub struct OrdKeyCursor<L: Layout, C> { valid: bool, cursor: OrderedCursor<OrderedLeaf<<L::Target as Update>::Time, <L::Target as Update>::Diff>>, - phantom: PhantomData<L>, + phantom: PhantomData<(L, C)>, } -impl<L: Layout> Cursor for OrdKeyCursor<L> { +impl<L: Layout, C> Cursor for OrdKeyCursor<L, C> { type Key = <L::Target as Update>::Key; type Val = (); type Time = <L::Target as Update>::Time; type R = <L::Target as Update>::Diff; - type Storage = OrdKeyBatch<L>; + type Storage = OrdKeyBatch<L, C>; fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } @@ -726,7 +748,10 @@ pub struct OrdKeyBuilder<L: Layout> { builder: KTDBuilder<L>, } -impl<L: Layout> Builder<OrdKeyBatch<L>> for OrdKeyBuilder<L> { +impl<L: Layout, C> Builder<OrdKeyBatch<L, C>> for OrdKeyBuilder<L> +where + OrdKeyBatch<L, C>: Batch<Key=<L::Target as Update>::Key, Val=(), Time=<L::Target as Update>::Time, R=<L::Target as Update>::Diff> +{ fn new() -> Self { OrdKeyBuilder { @@ -746,10 +771,11 @@ impl<L: Layout> Builder<OrdKeyBatch<L>> for OrdKeyBuilder<L> { } #[inline(never)] - fn done(self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdKeyBatch<L> { + fn done(self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdKeyBatch<L, C> { OrdKeyBatch { layer: self.builder.done(), - desc: Description::new(lower, upper, since) + desc: Description::new(lower, upper, since), + phantom: PhantomData, } } }