Skip to content

Commit

Permalink
Improve trait bounds
Browse files Browse the repository at this point in the history
Make the BuilderInput trait generic over the key and value containers
instead of the layout, which makes trait bounds much simpler and allows
for reusing the `BuilerInput` implementation for generic key and value
containers, instead tying it to specific layouts.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 4, 2024
1 parent 4d350c7 commit 21d4e21
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 159 deletions.
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ fn main() {
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine;
let data = data.arrange::<FlatKeySpine<_,_,_>>();
let keys = keys.arrange::<FlatKeySpine<_,_,_>>();
let data = data.arrange::<FlatKeySpine<String,_,isize,_>>();
let keys = keys.arrange::<FlatKeySpine<String,_,isize,_>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
50 changes: 25 additions & 25 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,51 +245,51 @@ where
}

/// Chunk a stream of vectors into chains of vectors.
pub struct ContainerChunker<I, O, Sorter, Consolidator>
pub struct ContainerChunker<Input, Output, Sorter, Consolidator>
where
I: Container,
for<'a> O: SizableContainer + PushInto<I::ItemRef<'a>>,
Sorter: ContainerSorter<O>,
Consolidator: ConsolidateContainer<O> + ?Sized,
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::ItemRef<'a>>,
Sorter: ContainerSorter<Output>,
Consolidator: ConsolidateContainer<Output> + ?Sized,
{
pending: O,
empty: O,
ready: Vec<O>,
pending: Output,
empty: Output,
ready: Vec<Output>,
sorter: Sorter,
_marker: PhantomData<(I, Consolidator)>,
_marker: PhantomData<(Input, Consolidator)>,
}

impl<I, O, Sorter, Consolidator> Default for ContainerChunker<I, O, Sorter, Consolidator>
impl<Input, Output, Sorter, Consolidator> Default for ContainerChunker<Input, Output, Sorter, Consolidator>
where
I: Container,
for<'a> O: SizableContainer + PushInto<I::ItemRef<'a>>,
Sorter: ContainerSorter<O> + Default,
Consolidator: ConsolidateContainer<O> + ?Sized,
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::ItemRef<'a>>,
Sorter: ContainerSorter<Output> + Default,
Consolidator: ConsolidateContainer<Output> + ?Sized,
{
fn default() -> Self {
Self {
pending: O::default(),
empty: O::default(),
pending: Output::default(),
empty: Output::default(),
ready: Vec::default(),
sorter: Sorter::default(),
_marker: PhantomData,
}
}
}

impl<I, O, Sorter, Consolidator> Chunker for ContainerChunker<I, O, Sorter, Consolidator>
impl<Input, Output, Sorter, Consolidator> Chunker for ContainerChunker<Input, Output, Sorter, Consolidator>
where
I: Container,
for<'a> O: SizableContainer + PushInto<I::ItemRef<'a>>,
Sorter: ContainerSorter<O>,
Consolidator: ConsolidateContainer<O>,
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::ItemRef<'a>>,
Sorter: ContainerSorter<Output>,
Consolidator: ConsolidateContainer<Output>,
{
type Input = I;
type Output = O;
type Input = Input;
type Output = Output;

fn push_container(&mut self, container: RefOrMut<Self::Input>) {
if self.pending.capacity() < O::preferred_capacity() {
self.pending.reserve(O::preferred_capacity() - self.pending.len());
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
// TODO: This uses `IterRef`, which isn't optimal for containers that can move.
for item in container.iter() {
Expand Down
20 changes: 9 additions & 11 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,18 @@ use crate::trace::implementations::merge_batcher::Merger;
use crate::trace::Builder;
use crate::trace::cursor::IntoOwned;

/// A merger for flat stacks
pub struct FlatcontainerMerger<T, R> {
_marker: PhantomData<(T, R)>,
/// A merger for flat stacks. `T` describes the
pub struct FlatcontainerMerger<T, R, MC> {
_marker: PhantomData<(T, R, MC)>,
}

impl<T, R> Default for FlatcontainerMerger<T, R> {
impl<T, R, MC> Default for FlatcontainerMerger<T, R, MC> {
fn default() -> Self {
Self { _marker: PhantomData, }
}
}

impl<T, R: Region> FlatcontainerMerger<T, R> {
impl<T, R, MC: Region> FlatcontainerMerger<T, R, MC> {
const BUFFER_SIZE_BYTES: usize = 64 << 10;
fn chunk_capacity(&self) -> usize {
let size = ::std::mem::size_of::<T>();
Expand All @@ -38,13 +38,13 @@ impl<T, R: Region> FlatcontainerMerger<T, R> {

/// Helper to get pre-sized vector from the stash.
#[inline]
fn empty(&self, stash: &mut Vec<FlatStack<R>>) -> FlatStack<R> {
fn empty(&self, stash: &mut Vec<FlatStack<MC>>) -> FlatStack<MC> {
stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity()))
}

/// Helper to return a chunk to the stash.
#[inline]
fn recycle(&self, mut chunk: FlatStack<R>, stash: &mut Vec<FlatStack<R>>) {
fn recycle(&self, mut chunk: FlatStack<MC>, stash: &mut Vec<FlatStack<MC>>) {
// TODO: Should we limit the size of `stash`?
if chunk.capacity() == self.chunk_capacity() {
chunk.clear();
Expand Down Expand Up @@ -88,13 +88,11 @@ where
}
}

impl<K, V, T, R, FR> Merger for FlatcontainerMerger<((K, V), T, R), FR>
impl<T, R, FR> Merger for FlatcontainerMerger<T, R, FR>
where
K: Ord + Clone,
V: Ord + Clone,
for<'a> T: Ord + PartialOrder + PartialOrder<FR::Time<'a>> + Data,
for<'a> R: Default + Semigroup + Semigroup<FR::Diff<'a>> + Data,
for<'a> FR: MergerChunk + Push<((K, V), T, R)> + Clone + 'static
for<'a> FR: MergerChunk + Clone + 'static
+ ReserveItems<<FR as Region>::ReadItem<'a>>
+ Push<<FR as Region>::ReadItem<'a>>
+ Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, &'a R)>
Expand Down
160 changes: 48 additions & 112 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ where

use std::convert::TryInto;
use abomonation_derive::Abomonation;
use crate::trace::cursor::IntoOwned;

/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)]
Expand Down Expand Up @@ -322,7 +321,7 @@ impl BatchContainer for OffsetList {
}

/// Behavior to split an update into principal components.
pub trait BuilderInput<L: Layout>: Container {
pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
/// Key portion
type Key<'a>: Ord;
/// Value portion
Expand All @@ -336,16 +335,20 @@ pub trait BuilderInput<L: Layout>: Container {
fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);

/// Test that the key equals a key in the layout's key container.
fn key_eq(this: &Self::Key<'_>, other: <L::KeyContainer as BatchContainer>::ReadItem<'_>) -> bool;
fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;

/// Test that the value equals a key in the layout's value container.
fn val_eq(this: &Self::Val<'_>, other: <L::ValContainer as BatchContainer>::ReadItem<'_>) -> bool;
fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
}

impl<K,V,T,R> BuilderInput<Vector<((K, V), T,R)>> for Vec<((K, V), T, R)>
impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
where
K: Ord + Clone + 'static,
KBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<&'a K>,
V: Ord + Clone + 'static,
VBC: BatchContainer,
for<'a> VBC::ReadItem<'a>: PartialEq<&'a V>,
T: Timestamp + Lattice + Clone + 'static,
R: Ord + Semigroup + 'static,
{
Expand All @@ -358,47 +361,48 @@ where
(key, val, time, diff)
}

fn key_eq(this: &K, other: &K) -> bool {
this == other
fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
KBC::reborrow(other) == this
}

fn val_eq(this: &V, other: &V) -> bool {
this == other
fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == this
}
}

impl<K,V,T,R> BuilderInput<TStack<((K, V), T, R)>> for TimelyStack<((K, V), T, R)>
impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
where
K: Ord + Columnation + Clone + 'static,
V: Ord + Columnation + Clone + 'static,
K: BatchContainer,
for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>,
K::Owned: Ord + Columnation + Clone + 'static,
V: BatchContainer,
for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>,
V::Owned: Ord + Columnation + Clone + 'static,
T: Timestamp + Lattice + Columnation + Clone + 'static,
R: Ord + Clone + Semigroup + Columnation + 'static,
{
type Key<'a> = &'a K;
type Val<'a> = &'a V;
type Key<'a> = &'a K::Owned;
type Val<'a> = &'a V::Owned;
type Time = T;
type Diff = R;

fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time.clone(), diff.clone())
}

fn key_eq(this: &&K, other: &K) -> bool {
*this == other
fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
K::reborrow(other) == *this
}

fn val_eq(this: &&V, other: &V) -> bool {
*this == other
fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
V::reborrow(other) == *this
}
}

mod flatcontainer {
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::flatcontainer::{Containerized, FlatStack, IntoOwned, Push, Region};
use timely::progress::Timestamp;
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::implementations::{BuilderInput, FlatLayout, Layout, OffsetList, Update};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update};

impl<U: Update> Layout for FlatLayout<U>
where
Expand All @@ -425,108 +429,40 @@ mod flatcontainer {
type OffsetContainer = OffsetList;
}

impl<K,V,T,R> BuilderInput<FlatLayout<((K, V), T, R)>> for TimelyStack<((K, V), T, R)>
where
K: Ord + Columnation + Containerized + Clone + 'static,
for<'a> K::Region: Push<K> + Push<<K::Region as Region>::ReadItem<'a>>,
for<'a> <K::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> K: PartialEq<<K::Region as Region>::ReadItem<'a>>,
V: Ord + Columnation + Containerized + Clone + 'static,
for<'a> V::Region: Push<V> + Push<<V::Region as Region>::ReadItem<'a>>,
for<'a> <V::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> V: PartialEq<<V::Region as Region>::ReadItem<'a>>,
T: Timestamp + Lattice + Columnation + Containerized + Clone + 'static,
for<'a> T::Region: Region<Owned=T> + Push<T> + Push<<T::Region as Region>::ReadItem<'a>>,
for<'a> <T::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> T: PartialEq<<T::Region as Region>::ReadItem<'a>>,
R: Ord + Clone + Semigroup + Columnation + Containerized + 'static,
for<'a> R::Region: Region<Owned=R> + Push<R> + Push<<R::Region as Region>::ReadItem<'a>>,
for<'a> <R::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> R: PartialEq<<R::Region as Region>::ReadItem<'a>>,
{
type Key<'a> = &'a K;
type Val<'a> = &'a V;
type Time = T;
type Diff = R;

fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time.clone(), diff.clone())
}

fn key_eq(this: &&K, other: <<K as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
**this == <K as Containerized>::Region::reborrow(other)
}

fn val_eq(this: &&V, other: <<V as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
**this == <V as Containerized>::Region::reborrow(other)
}
}

impl<K,V,T,R> BuilderInput<FlatLayout<((K, V), T, R)>> for FlatStack<<((K, V), T, R) as Containerized>::Region>
impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for FlatStack<TupleABCRegion<TupleABRegion<K,V>,T,R>>
where
K: Ord + Containerized + Clone + 'static,
for<'a> K::Region: Push<K> + Push<<K::Region as Region>::ReadItem<'a>> + Clone,
for<'a> <K::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> K: PartialEq<<K::Region as Region>::ReadItem<'a>>,
V: Ord + Containerized + Clone + 'static,
for<'a> V::Region: Push<V> + Push<<V::Region as Region>::ReadItem<'a>> + Clone,
for<'a> <V::Region as Region>::ReadItem<'a>: Copy + Ord,
for<'a> V: PartialEq<<V::Region as Region>::ReadItem<'a>>,
T: Timestamp + Lattice + Containerized + Clone + 'static,
for<'a> T::Region: Region<Owned=T> + Push<T> + Push<<T::Region as Region>::ReadItem<'a>> + Clone,
for<'a> <T::Region as Region>::ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned=T> + PartialEq<<T::Region as Region>::ReadItem<'a>>,
R: Ord + Clone + Semigroup + Containerized + 'static,
for<'a> R::Region: Region<Owned=R> + Push<R> + Push<<R::Region as Region>::ReadItem<'a>> + Clone,
for<'a> <R::Region as Region>::ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned=R> + PartialEq<<R::Region as Region>::ReadItem<'a>>,
K: Region + Clone + 'static,
for<'a> K::ReadItem<'a>: Copy + Ord,
KBC: BatchContainer,
for<'a> KBC::ReadItem<'a>: PartialEq<K::ReadItem<'a>>,
for<'a> V: Region + Clone + 'static,
for<'a> V::ReadItem<'a>: Copy + Ord,
VBC: BatchContainer,
for<'a> VBC::ReadItem<'a>: PartialEq<V::ReadItem<'a>>,
for<'a> T: Region + Clone + 'static,
for<'a> T::ReadItem<'a>: Copy + Ord,
for<'a> R: Region + Clone + 'static,
for<'a> R::ReadItem<'a>: Copy + Ord,
{
type Key<'a> = <K::Region as Region>::ReadItem<'a>;
type Val<'a> = <V::Region as Region>::ReadItem<'a>;
type Time = T;
type Diff = R;
type Key<'a> = K::ReadItem<'a>;
type Val<'a> = V::ReadItem<'a>;
type Time = T::Owned;
type Diff = R::Owned;

fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time.into_owned(), diff.into_owned())
}

fn key_eq(this: &Self::Key<'_>, other: <<K as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
<K as Containerized>::Region::reborrow(*this) == <K as Containerized>::Region::reborrow(other)
fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool {
KBC::reborrow(other) == K::reborrow(*this)
}

fn val_eq(this: &Self::Val<'_>, other: <<V as Containerized>::Region as Region>::ReadItem<'_>) -> bool {
<V as Containerized>::Region::reborrow(*this) == <V as Containerized>::Region::reborrow(other)
fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == V::reborrow(*this)
}
}
}

impl<K,V,T,R> BuilderInput<Preferred<K, V, T, R>> for TimelyStack<((<K as ToOwned>::Owned, <V as ToOwned>::Owned), T, R)>
where
K: Ord+ToOwned+PreferredContainer + ?Sized,
K::Owned: Columnation + Ord+Clone+'static,
for<'a> <<K as PreferredContainer>::Container as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = K::Owned>,
V: Ord+ToOwned+PreferredContainer + ?Sized,
V::Owned: Columnation + Ord+Clone+'static,
for<'a> <<V as PreferredContainer>::Container as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = V::Owned>,
T: Columnation + Ord+Clone+Lattice+Timestamp,
R: Columnation + Ord+Clone+Semigroup+'static,
{
type Key<'a> = &'a K::Owned;
type Val<'a> = &'a V::Owned;
type Time = T;
type Diff = R;

fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
(key, val, time.clone(), diff.clone())
}

fn key_eq(this: &&K::Owned, other: <<K as PreferredContainer>::Container as BatchContainer>::ReadItem<'_>) -> bool {
<<K as PreferredContainer>::Container as BatchContainer>::reborrow(other).eq(&<<K as PreferredContainer>::Container as BatchContainer>::reborrow(<<<K as PreferredContainer>::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)))
}

fn val_eq(this: &&V::Owned, other: <<V as PreferredContainer>::Container as BatchContainer>::ReadItem<'_>) -> bool {
<<V as PreferredContainer>::Container as BatchContainer>::reborrow(other).eq(&<<V as PreferredContainer>::Container as BatchContainer>::reborrow(<<<V as PreferredContainer>::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)))
}
}

pub use self::containers::{BatchContainer, SliceContainer};

/// Containers for data that resemble `Vec<T>`, with leaner implementations.
Expand Down
Loading

0 comments on commit 21d4e21

Please sign in to comment.