From f3c37dcfc0e5bb9982980f40f1cd95810eae69d0 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 5 Dec 2024 15:52:17 -0500 Subject: [PATCH] Remove batcher from Trace There is no inherent reason a trace must know about a batcher, other than convenience. This change factors the batcher out of the trace and moves it to an explicit type parameter where necessary. Signed-off-by: Moritz Hoffmann --- examples/spines.rs | 30 ++++++++-------- experiments/src/bin/deals.rs | 14 ++++---- experiments/src/bin/graspan1.rs | 6 ++-- experiments/src/bin/graspan2.rs | 40 ++++++++++----------- src/operators/arrange/arrangement.rs | 42 +++++++++++++---------- src/operators/consolidate.rs | 13 +++---- src/operators/reduce.rs | 2 +- src/trace/implementations/mod.rs | 2 ++ src/trace/implementations/ord_neu.rs | 36 ++++++++++++------- src/trace/implementations/rhh.rs | 8 +++-- src/trace/implementations/spine_fueled.rs | 26 +++++--------- src/trace/mod.rs | 4 +-- tests/bfs.rs | 13 +++---- tests/trace.rs | 4 +-- 14 files changed, 127 insertions(+), 113 deletions(-) diff --git a/examples/spines.rs b/examples/spines.rs index 7fc726783..e4cd50bea 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -28,46 +28,46 @@ fn main() { match mode.as_str() { "new" => { - use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); + use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeySpine}; + let data = data.arrange::, ColKeySpine<_,_,_>>(); + let keys = keys.arrange::, ColKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "old" => { - use differential_dataflow::trace::implementations::ord_neu::OrdKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); + use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, OrdKeySpine}; + let data = data.arrange::, OrdKeySpine<_,_,_>>(); + let keys = keys.arrange::, OrdKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "rhh" => { - use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::>(); + use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecSpine}; + let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecSpine<_,(),_,_>>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecSpine<_,(),_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "slc" => { - use differential_dataflow::trace::implementations::ord_neu::PreferredSpine; + use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredSpine}; let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::>() + .arrange::, PreferredSpine<[u8],[u8],_,_>>() .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); let keys = keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::>() + .arrange::, PreferredSpine<[u8],u8,_,_>>() .reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "flat" => { - use differential_dataflow::trace::implementations::ord_neu::FlatKeySpineDefault; - let data = data.arrange::>(); - let keys = keys.arrange::>(); + use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeySpineDefault}; + let data = data.arrange::, FlatKeySpineDefault>(); + let keys = keys.arrange::, FlatKeySpineDefault>(); keys.join_core(&data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); } diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index d162ca457..ab7871ccb 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -6,7 +6,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, ValBatcher}; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; @@ -41,7 +41,7 @@ fn main() { let (input, graph) = scope.new_collection(); // each edge should exist in both directions. - let graph = graph.arrange::>(); + let graph = graph.arrange::, ValSpine<_,_,_,_>>(); match program.as_str() { "tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(), @@ -94,10 +94,10 @@ fn tc>(edges: &EdgeArranged) -> C let result = inner .map(|(x,y)| (y,x)) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&edges, |_y,&x,&z| Some((x, z))) .concat(&edges.as_collection(|&k,&v| (k,v))) - .arrange::>() + .arrange::, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -121,12 +121,12 @@ fn sg>(edges: &EdgeArranged) -> C let result = inner - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) .concat(&peers) - .arrange::>() + .arrange::, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index baa50bd00..41266c4b4 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -6,7 +6,7 @@ use timely::order::Product; use differential_dataflow::difference::Present; use differential_dataflow::input::Input; -use differential_dataflow::trace::implementations::ValSpine; +use differential_dataflow::trace::implementations::{ValBatcher, ValSpine}; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::SemigroupVariable; @@ -31,7 +31,7 @@ fn main() { let (n_handle, nodes) = scope.new_collection(); let (e_handle, edges) = scope.new_collection(); - let edges = edges.arrange::>(); + let edges = edges.arrange::, ValSpine<_,_,_,_>>(); // a N c <- a N b && b E c // N(a,c) <- N(a,b), E(b, c) @@ -46,7 +46,7 @@ fn main() { let next = labels.join_core(&edges, |_b, a, c| Some((*c, *a))) .concat(&nodes) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() // .distinct_total_core::(); .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }); diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 637936aa4..cde55b71c 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -10,7 +10,7 @@ use differential_dataflow::Collection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher}; use differential_dataflow::difference::Present; type Node = u32; @@ -47,7 +47,7 @@ fn unoptimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::>(); + let dereference = dereference.arrange::, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias, value_alias) = scope @@ -60,14 +60,14 @@ fn unoptimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::>(); - let memory_alias_arranged = memory_alias.arrange::>(); + let value_flow_arranged = value_flow.arrange::, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::, ValSpine<_,_,_,_>>(); // VA(a,b) <- VF(x,a),VF(x,b) // VA(a,b) <- VF(x,a),MA(x,y),VF(y,b) let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))); let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&value_alias_next); @@ -77,16 +77,16 @@ fn unoptimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))); let value_flow_next = value_flow_next - .arrange::>() + .arrange::, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -95,12 +95,12 @@ fn unoptimized() { let memory_alias_next: Collection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); let memory_alias_next: Collection<_,_,Present> = memory_alias_next - .arrange::>() + .arrange::, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -172,7 +172,7 @@ fn optimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::>(); + let dereference = dereference.arrange::, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias) = scope @@ -185,8 +185,8 @@ fn optimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::>(); - let memory_alias_arranged = memory_alias.arrange::>(); + let value_flow_arranged = value_flow.arrange::, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::, ValSpine<_,_,_,_>>(); // VF(a,a) <- // VF(a,b) <- A(a,x),VF(x,b) @@ -194,13 +194,13 @@ fn optimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))) - .arrange::>() + .arrange::, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -209,9 +209,9 @@ fn optimized() { let value_flow_deref = value_flow .map(|(a,b)| (b,a)) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&dereference, |_x,&a,&b| Some((a,b))) - .arrange::>(); + .arrange::, ValSpine<_,_,_,_>>(); // MA(a,b) <- VFD(x,a),VFD(y,b) // MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b) @@ -222,10 +222,10 @@ fn optimized() { let memory_alias_next = memory_alias_arranged .join_core(&value_flow_deref, |_x,&y,&a| Some((y,a))) - .arrange::>() + .arrange::, ValSpine<_,_,_,_>>() .join_core(&value_flow_deref, |_y,&a,&b| Some((a,b))) .concat(&memory_alias_next) - .arrange::>() + .arrange::, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index fd9e7fd08..c0615d8c0 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::{KeySpine, ValSpine}; +use crate::trace::implementations::{KeyBatcher, KeySpine, ValBatcher, ValSpine}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -353,21 +353,23 @@ where G::Timestamp: Lattice, { /// Arranges updates into a shared trace. - fn arrange(&self) -> Arranged> + fn arrange(&self) -> Arranged> where + Ba: Batcher + 'static, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Builder: Builder, { - self.arrange_named("Arrange") + self.arrange_named::("Arrange") } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where + Ba: Batcher + 'static, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Builder: Builder, ; } @@ -379,14 +381,15 @@ where V: ExchangeData, R: ExchangeData + Semigroup, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where + Ba: Batcher, Time=G::Timestamp> + 'static, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher>, + Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core(&self.inner, exchange, name) + arrange_core::<_, _, Ba, _>(&self.inner, exchange, name) } } @@ -395,14 +398,16 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: &StreamCore::Input>, pact: P, name: &str) -> Arranged> +pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> where G: Scope, G::Timestamp: Lattice, - P: ParallelizationContract::Input>, + P: ParallelizationContract, + Ba: Batcher + 'static, + Ba::Input: Container, Tr: Trace+'static, Tr::Batch: Batch, - ::Input: timely::Container, + Tr::Builder: Builder, { // The `Arrange` operator is tasked with reacting to an advancing input // frontier by producing the sequence of batches whose lower and upper @@ -434,7 +439,7 @@ where }; // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id); + let mut batcher = Ba::new(logger.clone(), info.global_id); // Capabilities for the lower envelope of updates in `batcher`. let mut capabilities = Antichain::>::new(); @@ -557,14 +562,15 @@ impl Arrange(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where + Ba: Batcher, Time=G::Timestamp> + 'static, Tr: Trace + 'static, Tr::Batch: Batch, - Tr::Batcher: Batcher>, + Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core(&self.map(|k| (k, ())).inner, exchange, name) + arrange_core::<_,_,Ba,_>(&self.map(|k| (k, ())).inner, exchange, name) } } @@ -595,7 +601,7 @@ where } fn arrange_by_key_named(&self, name: &str) -> Arranged>> { - self.arrange_named(name) + self.arrange_named::, _>(name) } } @@ -630,6 +636,6 @@ where fn arrange_by_self_named(&self, name: &str) -> Arranged>> { self.map(|k| (k, ())) - .arrange_named(name) + .arrange_named::, _>(name) } } diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 41824bd6f..e3a551246 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -16,7 +16,7 @@ use crate::difference::Semigroup; use crate::Data; use crate::lattice::Lattice; -use crate::trace::Batcher; +use crate::trace::{Batcher, Builder}; /// Methods which require data be arrangeable. impl Collection @@ -47,21 +47,22 @@ where /// }); /// ``` pub fn consolidate(&self) -> Self { - use crate::trace::implementations::KeySpine; - self.consolidate_named::>("Consolidate") + use crate::trace::implementations::{KeyBatcher, KeySpine}; + self.consolidate_named::, KeySpine<_,_,_>>("Consolidate") } /// As `consolidate` but with the ability to name the operator and specify the trace type. - pub fn consolidate_named(&self, name: &str) -> Self + pub fn consolidate_named(&self, name: &str) -> Self where + Ba: Batcher, Time=G::Timestamp> + 'static, Tr: crate::trace::Trace+'static, for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>, Tr::Batch: crate::trace::Batch, - Tr::Batcher: Batcher>, + Tr::Builder: Builder, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) - .arrange_named::(name) + .arrange_named::(name) .as_collection(|d, _| d.into_owned()) } diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 6b1017130..2818340ec 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -460,7 +460,7 @@ where builders.push(T2::Builder::new()); } - let mut buffer = <::Batcher as crate::trace::Batcher>::Output::default(); + let mut buffer = ::Input::default(); // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 168f5427c..b7414b986 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -50,7 +50,9 @@ pub mod chunker; // Opinionated takes on default spines. pub use self::ord_neu::OrdValSpine as ValSpine; +pub use self::ord_neu::OrdValBatcher as ValBatcher; pub use self::ord_neu::OrdKeySpine as KeySpine; +pub use self::ord_neu::OrdKeyBatcher as KeyBatcher; use std::borrow::{ToOwned}; use std::convert::TryInto; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index f870a6277..35c2941dc 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -28,69 +28,79 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine< Rc>>, - MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>, RcBuilder, Vec<((K,V),T,R)>>>, >; +/// A batcher using ordered lists. +pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>; + // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. pub type ColValSpine = Spine< Rc>>, - MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>, RcBuilder, TimelyStack<((K,V),T,R)>>>, >; +/// A batcher for columnar storage. +pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>; /// A trace implementation backed by flatcontainer storage. -pub type FlatValSpine = Spine< +pub type FlatValSpine = Spine< Rc>, - MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, RcBuilder>>, >; +/// A batcher for flatcontainer storage. +pub type FlatValBatcher = MergeBatcher>, FlatcontainerMerger, ::TimeOwned>; /// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatValSpineDefault = FlatValSpine< +pub type FlatValSpineDefault = FlatValSpine< FlatLayout<::Region, ::Region, ::Region, ::Region>, TupleABCRegion::Region, ::Region>, ::Region, ::Region>, - C, >; +/// A batcher for flatcontainer storage, using [`FlatLayout`] as the layout. +pub type FlatValBatcherDefault = FlatValBatcher::Region, ::Region>, ::Region, ::Region>, C>; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, - MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>, T>, RcBuilder, Vec<((K,()),T,R)>>>, >; +/// A batcher for ordered lists. +pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<((K, ()), T, R)>, T>; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine< Rc>>, - MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>, T>, RcBuilder, TimelyStack<((K,()),T,R)>>>, >; +/// A batcher for columnar storage +pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColumnationMerger<((K,()),T,R)>, T>; /// A trace implementation backed by flatcontainer storage. -pub type FlatKeySpine = Spine< +pub type FlatKeySpine = Spine< Rc>, - MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, RcBuilder>>, >; +/// A batcher for flatcontainer storage. +pub type FlatKeyBatcher = MergeBatcher>, FlatcontainerMerger, ::TimeOwned>; /// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatKeySpineDefault = FlatKeySpine< +pub type FlatKeySpineDefault = FlatKeySpine< FlatLayout<::Region, <() as RegionPreference>::Region, ::Region, ::Region>, TupleABCRegion::Region, <() as RegionPreference>::Region>, ::Region, ::Region>, - C, >; +/// A batcher for flatcontainer storage, using [`FlatLayout`] as the layout. +pub type FlatKeyBatcherDefault = FlatValBatcher::Region, <() as RegionPreference>::Region>, ::Region, ::Region>, C>; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, - MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColumnationMerger<((::Owned,::Owned),T,R)>,T>, RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>, >; +/// A batcher for columnar storage. +pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColumnationMerger<((::Owned,::Owned),T,R)>,T>; // /// A trace implementation backed by columnar storage. diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 01f314650..a07b2dc67 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -25,18 +25,22 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine< Rc>>, - MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>, RcBuilder, Vec<((K,V),T,R)>>>, >; +/// A batcher for ordered lists. +pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<((K, V), T, R)>, T>; + // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; /// A trace implementation backed by columnar storage. pub type ColSpine = Spine< Rc>>, - MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>, RcBuilder, TimelyStack<((K,V),T,R)>>>, >; +/// A batcher for columnar storage. +pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColumnationMerger<((K,V),T,R)>, T>; + // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index e6e014aa6..deefbaa61 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -70,7 +70,7 @@ use crate::logging::Logger; -use crate::trace::{Batch, Batcher, Builder, BatchReader, Trace, TraceReader, ExertionLogic}; +use crate::trace::{Batch, Builder, BatchReader, Trace, TraceReader, ExertionLogic}; use crate::trace::cursor::CursorList; use crate::trace::Merger; @@ -83,12 +83,7 @@ use ::timely::order::PartialOrder; /// A spine maintains a small number of immutable collections of update tuples, merging the collections when /// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with /// other immutable collections. -pub struct Spine -where - // Intended constraints: - // BA: Batcher