From 21d4e216d29b27dedc28e5984e8ef72a4c5fa80f Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 3 Jun 2024 21:12:14 -0400 Subject: [PATCH] Improve trait bounds Make the BuilderInput trait generic over the key and value containers instead of the layout, which makes trait bounds much simpler and allows for reusing the `BuilerInput` implementation for generic key and value containers, instead tying it to specific layouts. Signed-off-by: Moritz Hoffmann --- examples/spines.rs | 4 +- src/trace/implementations/chunker.rs | 50 +++--- .../implementations/merge_batcher_flat.rs | 20 +-- src/trace/implementations/mod.rs | 160 ++++++------------ src/trace/implementations/ord_neu.rs | 16 +- src/trace/implementations/rhh.rs | 2 +- 6 files changed, 93 insertions(+), 159 deletions(-) diff --git a/examples/spines.rs b/examples/spines.rs index 9fa407977..6720575fe 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -66,8 +66,8 @@ fn main() { }, "flat" => { use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); + let data = data.arrange::>(); + let keys = keys.arrange::>(); keys.join_core(&data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); } diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 55ac911a5..59a8a62ad 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -245,31 +245,31 @@ where } /// Chunk a stream of vectors into chains of vectors. -pub struct ContainerChunker +pub struct ContainerChunker where - I: Container, - for<'a> O: SizableContainer + PushInto>, - Sorter: ContainerSorter, - Consolidator: ConsolidateContainer + ?Sized, + Input: Container, + for<'a> Output: SizableContainer + PushInto>, + Sorter: ContainerSorter, + Consolidator: ConsolidateContainer + ?Sized, { - pending: O, - empty: O, - ready: Vec, + pending: Output, + empty: Output, + ready: Vec, sorter: Sorter, - _marker: PhantomData<(I, Consolidator)>, + _marker: PhantomData<(Input, Consolidator)>, } -impl Default for ContainerChunker +impl Default for ContainerChunker where - I: Container, - for<'a> O: SizableContainer + PushInto>, - Sorter: ContainerSorter + Default, - Consolidator: ConsolidateContainer + ?Sized, + Input: Container, + for<'a> Output: SizableContainer + PushInto>, + Sorter: ContainerSorter + Default, + Consolidator: ConsolidateContainer + ?Sized, { fn default() -> Self { Self { - pending: O::default(), - empty: O::default(), + pending: Output::default(), + empty: Output::default(), ready: Vec::default(), sorter: Sorter::default(), _marker: PhantomData, @@ -277,19 +277,19 @@ where } } -impl Chunker for ContainerChunker +impl Chunker for ContainerChunker where - I: Container, - for<'a> O: SizableContainer + PushInto>, - Sorter: ContainerSorter, - Consolidator: ConsolidateContainer, + Input: Container, + for<'a> Output: SizableContainer + PushInto>, + Sorter: ContainerSorter, + Consolidator: ConsolidateContainer, { - type Input = I; - type Output = O; + type Input = Input; + type Output = Output; fn push_container(&mut self, container: RefOrMut) { - if self.pending.capacity() < O::preferred_capacity() { - self.pending.reserve(O::preferred_capacity() - self.pending.len()); + if self.pending.capacity() < Output::preferred_capacity() { + self.pending.reserve(Output::preferred_capacity() - self.pending.len()); } // TODO: This uses `IterRef`, which isn't optimal for containers that can move. for item in container.iter() { diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index 4309b4ab1..0ed3e4306 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -12,18 +12,18 @@ use crate::trace::implementations::merge_batcher::Merger; use crate::trace::Builder; use crate::trace::cursor::IntoOwned; -/// A merger for flat stacks -pub struct FlatcontainerMerger { - _marker: PhantomData<(T, R)>, +/// A merger for flat stacks. `T` describes the +pub struct FlatcontainerMerger { + _marker: PhantomData<(T, R, MC)>, } -impl Default for FlatcontainerMerger { +impl Default for FlatcontainerMerger { fn default() -> Self { Self { _marker: PhantomData, } } } -impl FlatcontainerMerger { +impl FlatcontainerMerger { const BUFFER_SIZE_BYTES: usize = 64 << 10; fn chunk_capacity(&self) -> usize { let size = ::std::mem::size_of::(); @@ -38,13 +38,13 @@ impl FlatcontainerMerger { /// Helper to get pre-sized vector from the stash. #[inline] - fn empty(&self, stash: &mut Vec>) -> FlatStack { + fn empty(&self, stash: &mut Vec>) -> FlatStack { stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity())) } /// Helper to return a chunk to the stash. #[inline] - fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { + fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { // TODO: Should we limit the size of `stash`? if chunk.capacity() == self.chunk_capacity() { chunk.clear(); @@ -88,13 +88,11 @@ where } } -impl Merger for FlatcontainerMerger<((K, V), T, R), FR> +impl Merger for FlatcontainerMerger where - K: Ord + Clone, - V: Ord + Clone, for<'a> T: Ord + PartialOrder + PartialOrder> + Data, for<'a> R: Default + Semigroup + Semigroup> + Data, - for<'a> FR: MergerChunk + Push<((K, V), T, R)> + Clone + 'static + for<'a> FR: MergerChunk + Clone + 'static + ReserveItems<::ReadItem<'a>> + Push<::ReadItem<'a>> + Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, &'a R)> diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 8045a5e4f..c7972a60b 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -200,7 +200,6 @@ where use std::convert::TryInto; use abomonation_derive::Abomonation; -use crate::trace::cursor::IntoOwned; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] @@ -322,7 +321,7 @@ impl BatchContainer for OffsetList { } /// Behavior to split an update into principal components. -pub trait BuilderInput: Container { +pub trait BuilderInput: Container { /// Key portion type Key<'a>: Ord; /// Value portion @@ -336,16 +335,20 @@ pub trait BuilderInput: Container { fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff); /// Test that the key equals a key in the layout's key container. - fn key_eq(this: &Self::Key<'_>, other: ::ReadItem<'_>) -> bool; + fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool; /// Test that the value equals a key in the layout's value container. - fn val_eq(this: &Self::Val<'_>, other: ::ReadItem<'_>) -> bool; + fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool; } -impl BuilderInput> for Vec<((K, V), T, R)> +impl BuilderInput for Vec<((K, V), T, R)> where K: Ord + Clone + 'static, + KBC: BatchContainer, + for<'a> KBC::ReadItem<'a>: PartialEq<&'a K>, V: Ord + Clone + 'static, + VBC: BatchContainer, + for<'a> VBC::ReadItem<'a>: PartialEq<&'a V>, T: Timestamp + Lattice + Clone + 'static, R: Ord + Semigroup + 'static, { @@ -358,24 +361,28 @@ where (key, val, time, diff) } - fn key_eq(this: &K, other: &K) -> bool { - this == other + fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool { + KBC::reborrow(other) == this } - fn val_eq(this: &V, other: &V) -> bool { - this == other + fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool { + VBC::reborrow(other) == this } } -impl BuilderInput> for TimelyStack<((K, V), T, R)> +impl BuilderInput for TimelyStack<((K::Owned, V::Owned), T, R)> where - K: Ord + Columnation + Clone + 'static, - V: Ord + Columnation + Clone + 'static, + K: BatchContainer, + for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>, + K::Owned: Ord + Columnation + Clone + 'static, + V: BatchContainer, + for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>, + V::Owned: Ord + Columnation + Clone + 'static, T: Timestamp + Lattice + Columnation + Clone + 'static, R: Ord + Clone + Semigroup + Columnation + 'static, { - type Key<'a> = &'a K; - type Val<'a> = &'a V; + type Key<'a> = &'a K::Owned; + type Val<'a> = &'a V::Owned; type Time = T; type Diff = R; @@ -383,22 +390,19 @@ where (key, val, time.clone(), diff.clone()) } - fn key_eq(this: &&K, other: &K) -> bool { - *this == other + fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool { + K::reborrow(other) == *this } - fn val_eq(this: &&V, other: &V) -> bool { - *this == other + fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool { + V::reborrow(other) == *this } } mod flatcontainer { - use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::flatcontainer::{Containerized, FlatStack, IntoOwned, Push, Region}; - use timely::progress::Timestamp; - use crate::difference::Semigroup; - use crate::lattice::Lattice; - use crate::trace::implementations::{BuilderInput, FlatLayout, Layout, OffsetList, Update}; + use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; + use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update}; impl Layout for FlatLayout where @@ -425,108 +429,40 @@ mod flatcontainer { type OffsetContainer = OffsetList; } - impl BuilderInput> for TimelyStack<((K, V), T, R)> - where - K: Ord + Columnation + Containerized + Clone + 'static, - for<'a> K::Region: Push + Push<::ReadItem<'a>>, - for<'a> ::ReadItem<'a>: Copy + Ord, - for<'a> K: PartialEq<::ReadItem<'a>>, - V: Ord + Columnation + Containerized + Clone + 'static, - for<'a> V::Region: Push + Push<::ReadItem<'a>>, - for<'a> ::ReadItem<'a>: Copy + Ord, - for<'a> V: PartialEq<::ReadItem<'a>>, - T: Timestamp + Lattice + Columnation + Containerized + Clone + 'static, - for<'a> T::Region: Region + Push + Push<::ReadItem<'a>>, - for<'a> ::ReadItem<'a>: Copy + Ord, - for<'a> T: PartialEq<::ReadItem<'a>>, - R: Ord + Clone + Semigroup + Columnation + Containerized + 'static, - for<'a> R::Region: Region + Push + Push<::ReadItem<'a>>, - for<'a> ::ReadItem<'a>: Copy + Ord, - for<'a> R: PartialEq<::ReadItem<'a>>, - { - type Key<'a> = &'a K; - type Val<'a> = &'a V; - type Time = T; - type Diff = R; - - fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { - (key, val, time.clone(), diff.clone()) - } - - fn key_eq(this: &&K, other: <::Region as Region>::ReadItem<'_>) -> bool { - **this == ::Region::reborrow(other) - } - - fn val_eq(this: &&V, other: <::Region as Region>::ReadItem<'_>) -> bool { - **this == ::Region::reborrow(other) - } - } - - impl BuilderInput> for FlatStack<<((K, V), T, R) as Containerized>::Region> + impl BuilderInput for FlatStack,T,R>> where - K: Ord + Containerized + Clone + 'static, - for<'a> K::Region: Push + Push<::ReadItem<'a>> + Clone, - for<'a> ::ReadItem<'a>: Copy + Ord, - for<'a> K: PartialEq<::ReadItem<'a>>, - V: Ord + Containerized + Clone + 'static, - for<'a> V::Region: Push + Push<::ReadItem<'a>> + Clone, - for<'a> ::ReadItem<'a>: Copy + Ord, - for<'a> V: PartialEq<::ReadItem<'a>>, - T: Timestamp + Lattice + Containerized + Clone + 'static, - for<'a> T::Region: Region + Push + Push<::ReadItem<'a>> + Clone, - for<'a> ::ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned=T> + PartialEq<::ReadItem<'a>>, - R: Ord + Clone + Semigroup + Containerized + 'static, - for<'a> R::Region: Region + Push + Push<::ReadItem<'a>> + Clone, - for<'a> ::ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned=R> + PartialEq<::ReadItem<'a>>, + K: Region + Clone + 'static, + for<'a> K::ReadItem<'a>: Copy + Ord, + KBC: BatchContainer, + for<'a> KBC::ReadItem<'a>: PartialEq>, + for<'a> V: Region + Clone + 'static, + for<'a> V::ReadItem<'a>: Copy + Ord, + VBC: BatchContainer, + for<'a> VBC::ReadItem<'a>: PartialEq>, + for<'a> T: Region + Clone + 'static, + for<'a> T::ReadItem<'a>: Copy + Ord, + for<'a> R: Region + Clone + 'static, + for<'a> R::ReadItem<'a>: Copy + Ord, { - type Key<'a> = ::ReadItem<'a>; - type Val<'a> = ::ReadItem<'a>; - type Time = T; - type Diff = R; + type Key<'a> = K::ReadItem<'a>; + type Val<'a> = V::ReadItem<'a>; + type Time = T::Owned; + type Diff = R::Owned; fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { (key, val, time.into_owned(), diff.into_owned()) } - fn key_eq(this: &Self::Key<'_>, other: <::Region as Region>::ReadItem<'_>) -> bool { - ::Region::reborrow(*this) == ::Region::reborrow(other) + fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool { + KBC::reborrow(other) == K::reborrow(*this) } - fn val_eq(this: &Self::Val<'_>, other: <::Region as Region>::ReadItem<'_>) -> bool { - ::Region::reborrow(*this) == ::Region::reborrow(other) + fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool { + VBC::reborrow(other) == V::reborrow(*this) } } } -impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> -where - K: Ord+ToOwned+PreferredContainer + ?Sized, - K::Owned: Columnation + Ord+Clone+'static, - for<'a> <::Container as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = K::Owned>, - V: Ord+ToOwned+PreferredContainer + ?Sized, - V::Owned: Columnation + Ord+Clone+'static, - for<'a> <::Container as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = V::Owned>, - T: Columnation + Ord+Clone+Lattice+Timestamp, - R: Columnation + Ord+Clone+Semigroup+'static, -{ - type Key<'a> = &'a K::Owned; - type Val<'a> = &'a V::Owned; - type Time = T; - type Diff = R; - - fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { - (key, val, time.clone(), diff.clone()) - } - - fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { - <::Container as BatchContainer>::reborrow(other).eq(&<::Container as BatchContainer>::reborrow(<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this))) - } - - fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { - <::Container as BatchContainer>::reborrow(other).eq(&<::Container as BatchContainer>::reborrow(<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this))) - } -} - pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 52bf097d5..f78406213 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -42,16 +42,16 @@ pub type ColValSpine = Spine< >; /// A trace implementation backed by flatcontainer storage. -pub type FlatValSpine = Spine< +pub type FlatValSpine = Spine< Rc>>, MergeBatcher< ContainerChunker< - Vec<((K,V),T,R)>, + C, FlatStack<<((K,V),T,R) as Containerized>::Region>, ExternalContainerSorter::Region>>, ContainerConsolidator, >, - FlatcontainerMerger<((K,V),T,R), <((K,V),T,R) as Containerized>::Region>, + FlatcontainerMerger::Region>, T, >, RcBuilder, FlatStack<<((K,V),T,R) as Containerized>::Region>>>, @@ -74,16 +74,16 @@ pub type ColKeySpine = Spine< >; /// A trace implementation backed by flatcontainer storage. -pub type FlatKeySpine = Spine< +pub type FlatKeySpine = Spine< Rc>>, MergeBatcher< ContainerChunker< - Vec<((K,()),T,R)>, + C, FlatStack<<((K,()),T,R) as Containerized>::Region>, ExternalContainerSorter::Region>>, ContainerConsolidator, >, - FlatcontainerMerger<((K,()),T,R), <((K,()),T,R) as Containerized>::Region>, + FlatcontainerMerger::Region>, T, >, RcBuilder, FlatStack<<((K,()),T,R) as Containerized>::Region>>>, @@ -599,7 +599,7 @@ mod val_batch { impl Builder for OrdValBuilder where L: Layout, - CI: for<'a> BuilderInput::Time, Diff=::Diff>, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, for<'a> L::KeyContainer: PushInto>, for<'a> L::ValContainer: PushInto>, for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, @@ -1066,7 +1066,7 @@ mod key_batch { impl Builder for OrdKeyBuilder where L: Layout, - CI: for<'a> BuilderInput::Time, Diff=::Diff>, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, for<'a> L::KeyContainer: PushInto>, for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index e5d64e236..d143afc16 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -760,7 +760,7 @@ mod val_batch { impl Builder for RhhValBuilder where ::Key: Default + HashOrdered, - CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, for<'a> L::ValContainer: PushInto>, for<'a> ::ReadItem<'a>: HashOrdered + IntoOwned<'a, Owned = ::Key>, {