From 344e034fb0c036161ce7d4b66946b03c887f79ae Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 11 Nov 2023 09:37:43 -0500 Subject: [PATCH] Key layouts --- src/trace/implementations/ord.rs | 179 ++++++++++--------------------- 1 file changed, 59 insertions(+), 120 deletions(-) diff --git a/src/trace/implementations/ord.rs b/src/trace/implementations/ord.rs index 678782ccf..c29f3df52 100644 --- a/src/trace/implementations/ord.rs +++ b/src/trace/implementations/ord.rs @@ -118,21 +118,21 @@ where /// A trace implementation using a spine of ordered lists. -pub type OrdValSpine = Spine>>>; +pub type OrdValSpine = Spine>>>; /// A trace implementation using a spine of abomonated ordered lists. -pub type OrdValSpineAbom = Spine>, Vec>>>; +pub type OrdValSpineAbom = Spine>, Vec>>>; /// A trace implementation for empty values using a spine of ordered lists. -pub type OrdKeySpine = Spine>>; +pub type OrdKeySpine = Spine>>>; /// A trace implementation for empty values using a spine of abomonated ordered lists. -pub type OrdKeySpineAbom = Spine, Vec>>>; +pub type OrdKeySpineAbom = Spine>, Vec>>>; /// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine>>>; +pub type ColValSpine = Spine>>>; /// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; +pub type ColKeySpine = Spine>>>; /// A container that can retain/discard from some offset onward. @@ -177,9 +177,11 @@ pub struct OrdValBatch { // Type aliases to make certain types readable. type TDLayer = OrderedLeaf<<::Target as Update>::Time, <::Target as Update>::Diff>; type VTDLayer = OrderedLayer<<::Target as Update>::Val, TDLayer, ::ValOffset, ::ValContainer>; +type KTDLayer = OrderedLayer<<::Target as Update>::Key, TDLayer, ::KeyOffset, ::KeyContainer>; type KVTDLayer = OrderedLayer<<::Target as Update>::Key, VTDLayer, ::KeyOffset, ::KeyContainer>; type TDBuilder = OrderedLeafBuilder<<::Target as Update>::Time, <::Target as Update>::Diff>; type VTDBuilder = OrderedBuilder<<::Target as Update>::Val, TDBuilder, ::ValOffset, ::ValContainer>; +type KTDBuilder = OrderedBuilder<<::Target as Update>::Key, TDBuilder, ::KeyOffset, ::KeyContainer>; type KVTDBuilder = OrderedBuilder<<::Target as Update>::Key, VTDBuilder, ::KeyOffset, ::KeyContainer>; impl BatchReader for OrdValBatch { @@ -459,35 +461,21 @@ impl Builder> for OrdValBuilder { /// An immutable collection of update tuples, from a contiguous interval of logical times. -#[derive(Debug, Abomonation)] -pub struct OrdKeyBatch> -where - K: Ord+Clone, - T: Clone+Lattice, - R: Clone, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +#[derive(Abomonation)] +pub struct OrdKeyBatch { /// Where all the dataz is. - pub layer: OrderedLayer, O, CK>, + pub layer: KTDLayer, /// Description of the update times this layer represents. - pub desc: Description, + pub desc: Description<::Time>, } -impl BatchReader for OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - type Key = K; +impl BatchReader for OrdKeyBatch { + type Key = ::Key; type Val = (); - type Time = T; - type R = R; + type Time = ::Time; + type R = ::Diff; - type Cursor = OrdKeyCursor; + type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { OrdKeyCursor { valid: true, @@ -495,39 +483,25 @@ where phantom: PhantomData } } - fn len(&self) -> usize { , O, CK> as Trie>::tuples(&self.layer) } - fn description(&self) -> &Description { &self.desc } + fn len(&self) -> usize { as Trie>::tuples(&self.layer) } + fn description(&self) -> &Description<::Time> { &self.desc } } -impl Batch for OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +impl Batch for OrdKeyBatch { type Batcher = MergeBatcher; - type Builder = OrdKeyBuilder; - type Merger = OrdKeyMerger; + type Builder = OrdKeyBuilder; + type Merger = OrdKeyMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: Option>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: Option::Time>>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } } -impl OrdKeyBatch -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - fn advance_builder_from(layer: &mut OrderedBuilder, O, CK>, frontier: AntichainRef, key_pos: usize) { +impl OrdKeyBatch { + fn advance_builder_from(layer: &mut KTDBuilder, frontier: AntichainRef<::Time>, key_pos: usize) { let key_start = key_pos; - let time_start: usize = layer.offs[key_pos].try_into().unwrap(); + let time_start: usize = layer.offs[key_pos].try_into().ok().unwrap(); // We will zip through the time leaves, calling advance on each, // then zip through the value layer, sorting and collapsing each, @@ -550,10 +524,10 @@ where // we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s // initial value. - let lower: usize = layer.offs[i].try_into().unwrap(); - let upper: usize = layer.offs[i+1].try_into().unwrap(); + let lower: usize = layer.offs[i].try_into().ok().unwrap(); + let upper: usize = layer.offs[i+1].try_into().ok().unwrap(); - layer.offs[i] = O::try_from(write_position).unwrap(); + layer.offs[i] = L::KeyOffset::try_from(write_position).ok().unwrap(); let updates = &mut layer.vals.vals[..]; @@ -566,14 +540,14 @@ where } } layer.vals.vals.truncate(write_position); - layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap(); + layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap(); // 4. Remove empty keys. let offs = &mut layer.offs; let mut write_position = key_start; layer.keys.retain_from(key_start, |index, _item| { - let lower = offs[index].try_into().unwrap(); - let upper = offs[index+1].try_into().unwrap(); + let lower = offs[index].try_into().ok().unwrap(); + let upper = offs[index+1].try_into().ok().unwrap(); if lower < upper { offs[write_position+1] = offs[index+1]; write_position += 1; @@ -587,14 +561,7 @@ where } /// State for an in-progress merge. -pub struct OrdKeyMerger> -where - K: Ord+Clone+'static, - T: Lattice+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +pub struct OrdKeyMerger { // first batch, and position therein. lower1: usize, upper1: usize, @@ -602,20 +569,13 @@ where lower2: usize, upper2: usize, // result that we are currently assembling. - result: , O, CK> as Trie>::MergeBuilder, - description: Description, + result: as Trie>::MergeBuilder, + description: Description<::Time>, should_compact: bool, } -impl Merger> for OrdKeyMerger -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option>) -> Self { +impl Merger> for OrdKeyMerger { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: Option::Time>>) -> Self { assert!(batch1.upper() == batch2.lower()); @@ -631,12 +591,12 @@ where upper1: batch1.layer.keys(), lower2: 0, upper2: batch2.layer.keys(), - result: <, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), + result: < as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer), description: description, should_compact: compaction_frontier.is_some(), } } - fn done(self) -> OrdKeyBatch { + fn done(self) -> OrdKeyBatch { assert!(self.lower1 == self.upper1); assert!(self.lower2 == self.upper2); @@ -646,7 +606,7 @@ where desc: self.description, } } - fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { + fn work(&mut self, source1: &OrdKeyBatch, source2: &OrdKeyBatch, fuel: &mut isize) { let starting_updates = self.result.vals.vals.len(); let mut effort = 0isize; @@ -691,7 +651,7 @@ where // if we are supplied a frontier, we should compact. if self.should_compact { - OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); + OrdKeyBatch::::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos); } *fuel -= effort; @@ -705,30 +665,23 @@ where /// A cursor for navigating a single layer. #[derive(Debug)] -pub struct OrdKeyCursor> { +pub struct OrdKeyCursor { valid: bool, - cursor: OrderedCursor>, - phantom: PhantomData<(K, O, CK)>, + cursor: OrderedCursor::Time, ::Diff>>, + phantom: PhantomData, } -impl Cursor for OrdKeyCursor -where - K: Ord+Clone, - T: Lattice+Ord+Clone, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - type Key = K; +impl Cursor for OrdKeyCursor { + type Key = ::Key; type Val = (); - type Time = T; - type R = R; + type Time = ::Time; + type R = ::Diff; - type Storage = OrdKeyBatch; + type Storage = OrdKeyBatch; - fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) } + fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) } fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() } - fn map_times(&mut self, storage: &Self::Storage, mut logic: L) { + fn map_times(&mut self, storage: &Self::Storage, mut logic: L2) { self.cursor.child.rewind(&storage.layer.vals); while self.cursor.child.valid(&storage.layer.vals) { logic(&self.cursor.child.key(&storage.layer.vals).0, &self.cursor.child.key(&storage.layer.vals).1); @@ -738,7 +691,7 @@ where fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) } fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid } fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; } - fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); self.valid = true; } + fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; } fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; } fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { } fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; } @@ -747,45 +700,31 @@ where /// A builder for creating layers from unsorted update tuples. -pub struct OrdKeyBuilder> -where - K: Ord+Clone, - T: Ord+Clone+Lattice, - R: Clone+Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ - builder: OrderedBuilder, O, CK>, +pub struct OrdKeyBuilder { + builder: KTDBuilder, } -impl Builder> for OrdKeyBuilder -where - K: Ord+Clone+'static, - T: Lattice+timely::progress::Timestamp+Ord+Clone+'static, - R: Semigroup, - O: OrdOffset, >::Error: Debug, >::Error: Debug, - CK: BatchContainer+Deref+RetainFrom, -{ +impl Builder> for OrdKeyBuilder { fn new() -> Self { OrdKeyBuilder { - builder: OrderedBuilder::, O, CK>::new() + builder: >::new() } } fn with_capacity(cap: usize) -> Self { OrdKeyBuilder { - builder: , O, CK> as TupleBuilder>::with_capacity(cap) + builder: as TupleBuilder>::with_capacity(cap) } } #[inline] - fn push(&mut self, (key, _, time, diff): (K, (), T, R)) { + fn push(&mut self, (key, _, time, diff): (::Key, (), ::Time, ::Diff)) { self.builder.push_tuple((key, (time, diff))); } #[inline(never)] - fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { + fn done(self, lower: Antichain<::Time>, upper: Antichain<::Time>, since: Antichain<::Time>) -> OrdKeyBatch { OrdKeyBatch { layer: self.builder.done(), desc: Description::new(lower, upper, since)