Skip to content

Commit

Permalink
Key layouts
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 11, 2023
1 parent a8201fb commit 344e034
Showing 1 changed file with 59 additions and 120 deletions.
179 changes: 59 additions & 120 deletions src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,21 @@ where


/// A trace implementation using a spine of ordered lists.
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K, V), T, R), O>>>>;
pub type OrdValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R), O>>>>;

/// A trace implementation using a spine of abomonated ordered lists.
pub type OrdValSpineAbom<K, V, T, R, O=usize> = Spine<Rc<Abomonated<OrdValBatch<Vector<((K, V), T, R), O>>, Vec<u8>>>>;
pub type OrdValSpineAbom<K, V, T, R, O=usize> = Spine<Rc<Abomonated<OrdValBatch<Vector<((K,V),T,R), O>>, Vec<u8>>>>;

/// A trace implementation for empty values using a spine of ordered lists.
pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<K, T, R, O>>>;
pub type OrdKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R), O>>>>;

/// A trace implementation for empty values using a spine of abomonated ordered lists.
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<Rc<Abomonated<OrdKeyBatch<K, T, R, O>, Vec<u8>>>>;
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<Rc<Abomonated<OrdKeyBatch<Vector<((K,()),T,R), O>>, Vec<u8>>>>;

/// A trace implementation backed by columnar storage.
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K, V), T, R), O>>>>;
pub type ColValSpine<K, V, T, R, O=usize> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R), O>>>>;
/// A trace implementation backed by columnar storage.
pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<K, T, R, O, TimelyStack<K>>>>;
pub type ColKeySpine<K, T, R, O=usize> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R), O>>>>;


/// A container that can retain/discard from some offset onward.
Expand Down Expand Up @@ -177,9 +177,11 @@ pub struct OrdValBatch<L: Layout> {
// Type aliases to make certain types readable.
type TDLayer<L> = OrderedLeaf<<<L as Layout>::Target as Update>::Time, <<L as Layout>::Target as Update>::Diff>;
type VTDLayer<L> = OrderedLayer<<<L as Layout>::Target as Update>::Val, TDLayer<L>, <L as Layout>::ValOffset, <L as Layout>::ValContainer>;
type KTDLayer<L> = OrderedLayer<<<L as Layout>::Target as Update>::Key, TDLayer<L>, <L as Layout>::KeyOffset, <L as Layout>::KeyContainer>;
type KVTDLayer<L> = OrderedLayer<<<L as Layout>::Target as Update>::Key, VTDLayer<L>, <L as Layout>::KeyOffset, <L as Layout>::KeyContainer>;
type TDBuilder<L> = OrderedLeafBuilder<<<L as Layout>::Target as Update>::Time, <<L as Layout>::Target as Update>::Diff>;
type VTDBuilder<L> = OrderedBuilder<<<L as Layout>::Target as Update>::Val, TDBuilder<L>, <L as Layout>::ValOffset, <L as Layout>::ValContainer>;
type KTDBuilder<L> = OrderedBuilder<<<L as Layout>::Target as Update>::Key, TDBuilder<L>, <L as Layout>::KeyOffset, <L as Layout>::KeyContainer>;
type KVTDBuilder<L> = OrderedBuilder<<<L as Layout>::Target as Update>::Key, VTDBuilder<L>, <L as Layout>::KeyOffset, <L as Layout>::KeyContainer>;

impl<L: Layout> BatchReader for OrdValBatch<L> {
Expand Down Expand Up @@ -459,75 +461,47 @@ impl<L: Layout> Builder<OrdValBatch<L>> for OrdValBuilder<L> {


/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Debug, Abomonation)]
pub struct OrdKeyBatch<K, T, R, O=usize, CK=Vec<K>>
where
K: Ord+Clone,
T: Clone+Lattice,
R: Clone,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
#[derive(Abomonation)]
pub struct OrdKeyBatch<L: Layout> {
/// Where all the dataz is.
pub layer: OrderedLayer<K, OrderedLeaf<T, R>, O, CK>,
pub layer: KTDLayer<L>,
/// Description of the update times this layer represents.
pub desc: Description<T>,
pub desc: Description<<L::Target as Update>::Time>,
}

impl<K, T, R, O, CK> BatchReader for OrdKeyBatch<K, T, R, O, CK>
where
K: Ord+Clone+'static,
T: Lattice+Ord+Clone+'static,
R: Clone+Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
type Key = K;
impl<L: Layout> BatchReader for OrdKeyBatch<L> {
type Key = <L::Target as Update>::Key;
type Val = ();
type Time = T;
type R = R;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Cursor = OrdKeyCursor<K, T, R, O, CK>;
type Cursor = OrdKeyCursor<L>;
fn cursor(&self) -> Self::Cursor {
OrdKeyCursor {
valid: true,
cursor: self.layer.cursor(),
phantom: PhantomData
}
}
fn len(&self) -> usize { <OrderedLayer<K, OrderedLeaf<T, R>, O, CK> as Trie>::tuples(&self.layer) }
fn description(&self) -> &Description<T> { &self.desc }
fn len(&self) -> usize { <KTDLayer<L> as Trie>::tuples(&self.layer) }
fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.desc }
}

impl<K, T, R, O, CK> Batch for OrdKeyBatch<K, T, R, O, CK>
where
K: Ord+Clone+'static,
T: Lattice+timely::progress::Timestamp+Ord+Clone+'static,
R: Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
impl<L: Layout> Batch for OrdKeyBatch<L> {
type Batcher = MergeBatcher<Self>;
type Builder = OrdKeyBuilder<K, T, R, O, CK>;
type Merger = OrdKeyMerger<K, T, R, O, CK>;
type Builder = OrdKeyBuilder<L>;
type Merger = OrdKeyMerger<L>;

fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<T>>) -> Self::Merger {
fn begin_merge(&self, other: &Self, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self::Merger {
OrdKeyMerger::new(self, other, compaction_frontier)
}
}

impl<K, T, R, O, CK> OrdKeyBatch<K, T, R, O, CK>
where
K: Ord+Clone+'static,
T: Lattice+Ord+Clone+'static,
R: Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
fn advance_builder_from(layer: &mut OrderedBuilder<K, OrderedLeafBuilder<T, R>, O, CK>, frontier: AntichainRef<T>, key_pos: usize) {
impl<L: Layout> OrdKeyBatch<L> {
fn advance_builder_from(layer: &mut KTDBuilder<L>, frontier: AntichainRef<<L::Target as Update>::Time>, key_pos: usize) {

let key_start = key_pos;
let time_start: usize = layer.offs[key_pos].try_into().unwrap();
let time_start: usize = layer.offs[key_pos].try_into().ok().unwrap();

// We will zip through the time leaves, calling advance on each,
// then zip through the value layer, sorting and collapsing each,
Expand All @@ -550,10 +524,10 @@ where
// we will change batch.layer.vals.offs[i] in this iteration, from `write_position`'s
// initial value.

let lower: usize = layer.offs[i].try_into().unwrap();
let upper: usize = layer.offs[i+1].try_into().unwrap();
let lower: usize = layer.offs[i].try_into().ok().unwrap();
let upper: usize = layer.offs[i+1].try_into().ok().unwrap();

layer.offs[i] = O::try_from(write_position).unwrap();
layer.offs[i] = L::KeyOffset::try_from(write_position).ok().unwrap();

let updates = &mut layer.vals.vals[..];

Expand All @@ -566,14 +540,14 @@ where
}
}
layer.vals.vals.truncate(write_position);
layer.offs[layer.keys.len()] = O::try_from(write_position).unwrap();
layer.offs[layer.keys.len()] = L::KeyOffset::try_from(write_position).ok().unwrap();

// 4. Remove empty keys.
let offs = &mut layer.offs;
let mut write_position = key_start;
layer.keys.retain_from(key_start, |index, _item| {
let lower = offs[index].try_into().unwrap();
let upper = offs[index+1].try_into().unwrap();
let lower = offs[index].try_into().ok().unwrap();
let upper = offs[index+1].try_into().ok().unwrap();
if lower < upper {
offs[write_position+1] = offs[index+1];
write_position += 1;
Expand All @@ -587,35 +561,21 @@ where
}

/// State for an in-progress merge.
pub struct OrdKeyMerger<K, T, R, O=usize,CK=Vec<K>>
where
K: Ord+Clone+'static,
T: Lattice+Ord+Clone+'static,
R: Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
pub struct OrdKeyMerger<L: Layout> {
// first batch, and position therein.
lower1: usize,
upper1: usize,
// second batch, and position therein.
lower2: usize,
upper2: usize,
// result that we are currently assembling.
result: <OrderedLayer<K, OrderedLeaf<T, R>, O, CK> as Trie>::MergeBuilder,
description: Description<T>,
result: <KTDLayer<L> as Trie>::MergeBuilder,
description: Description<<L::Target as Update>::Time>,
should_compact: bool,
}

impl<K, T, R, O, CK> Merger<OrdKeyBatch<K, T, R, O, CK>> for OrdKeyMerger<K, T, R, O, CK>
where
K: Ord+Clone+'static,
T: Lattice+timely::progress::Timestamp+Ord+Clone+'static,
R: Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
fn new(batch1: &OrdKeyBatch<K, T, R, O, CK>, batch2: &OrdKeyBatch<K, T, R, O, CK>, compaction_frontier: Option<AntichainRef<T>>) -> Self {
impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L> {
fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: Option<AntichainRef<<L::Target as Update>::Time>>) -> Self {

assert!(batch1.upper() == batch2.lower());

Expand All @@ -631,12 +591,12 @@ where
upper1: batch1.layer.keys(),
lower2: 0,
upper2: batch2.layer.keys(),
result: <<OrderedLayer<K, OrderedLeaf<T, R>, O, CK> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer),
result: <<KTDLayer<L> as Trie>::MergeBuilder as MergeBuilder>::with_capacity(&batch1.layer, &batch2.layer),
description: description,
should_compact: compaction_frontier.is_some(),
}
}
fn done(self) -> OrdKeyBatch<K, T, R, O, CK> {
fn done(self) -> OrdKeyBatch<L> {

assert!(self.lower1 == self.upper1);
assert!(self.lower2 == self.upper2);
Expand All @@ -646,7 +606,7 @@ where
desc: self.description,
}
}
fn work(&mut self, source1: &OrdKeyBatch<K,T,R,O,CK>, source2: &OrdKeyBatch<K,T,R,O,CK>, fuel: &mut isize) {
fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {

let starting_updates = self.result.vals.vals.len();
let mut effort = 0isize;
Expand Down Expand Up @@ -691,7 +651,7 @@ where

// if we are supplied a frontier, we should compact.
if self.should_compact {
OrdKeyBatch::<K,T,R,O,CK>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);
OrdKeyBatch::<L>::advance_builder_from(&mut self.result, self.description.since().borrow(), initial_key_pos);
}

*fuel -= effort;
Expand All @@ -705,30 +665,23 @@ where

/// A cursor for navigating a single layer.
#[derive(Debug)]
pub struct OrdKeyCursor<K, T: Lattice+Ord+Clone, R: Semigroup, O=usize, CK=Vec<K>> {
pub struct OrdKeyCursor<L: Layout> {
valid: bool,
cursor: OrderedCursor<OrderedLeaf<T, R>>,
phantom: PhantomData<(K, O, CK)>,
cursor: OrderedCursor<OrderedLeaf<<L::Target as Update>::Time, <L::Target as Update>::Diff>>,
phantom: PhantomData<L>,
}

impl<K, T, R, O, CK> Cursor for OrdKeyCursor<K, T, R, O, CK>
where
K: Ord+Clone,
T: Lattice+Ord+Clone,
R: Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
type Key = K;
impl<L: Layout> Cursor for OrdKeyCursor<L> {
type Key = <L::Target as Update>::Key;
type Val = ();
type Time = T;
type R = R;
type Time = <L::Target as Update>::Time;
type R = <L::Target as Update>::Diff;

type Storage = OrdKeyBatch<K, T, R, O, CK>;
type Storage = OrdKeyBatch<L>;

fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { &self.cursor.key(&storage.layer) }
fn key<'a>(&self, storage: &'a Self::Storage) -> &'a Self::Key { &self.cursor.key(&storage.layer) }
fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, mut logic: L) {
fn map_times<L2: FnMut(&Self::Time, &Self::R)>(&mut self, storage: &Self::Storage, mut logic: L2) {
self.cursor.child.rewind(&storage.layer.vals);
while self.cursor.child.valid(&storage.layer.vals) {
logic(&self.cursor.child.key(&storage.layer.vals).0, &self.cursor.child.key(&storage.layer.vals).1);
Expand All @@ -738,7 +691,7 @@ where
fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.valid(&storage.layer) }
fn val_valid(&self, _storage: &Self::Storage) -> bool { self.valid }
fn step_key(&mut self, storage: &Self::Storage){ self.cursor.step(&storage.layer); self.valid = true; }
fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek(&storage.layer, key); self.valid = true; }
fn seek_key(&mut self, storage: &Self::Storage, key: &Self::Key) { self.cursor.seek(&storage.layer, key); self.valid = true; }
fn step_val(&mut self, _storage: &Self::Storage) { self.valid = false; }
fn seek_val(&mut self, _storage: &Self::Storage, _val: &()) { }
fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind(&storage.layer); self.valid = true; }
Expand All @@ -747,45 +700,31 @@ where


/// A builder for creating layers from unsorted update tuples.
pub struct OrdKeyBuilder<K, T, R, O=usize,CK=Vec<K>>
where
K: Ord+Clone,
T: Ord+Clone+Lattice,
R: Clone+Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
builder: OrderedBuilder<K, OrderedLeafBuilder<T, R>, O, CK>,
pub struct OrdKeyBuilder<L: Layout> {
builder: KTDBuilder<L>,
}

impl<K, T, R, O, CK> Builder<OrdKeyBatch<K, T, R, O, CK>> for OrdKeyBuilder<K, T, R, O, CK>
where
K: Ord+Clone+'static,
T: Lattice+timely::progress::Timestamp+Ord+Clone+'static,
R: Semigroup,
O: OrdOffset, <O as TryFrom<usize>>::Error: Debug, <O as TryInto<usize>>::Error: Debug,
CK: BatchContainer<Item=K>+Deref<Target=[K]>+RetainFrom<K>,
{
impl<L: Layout> Builder<OrdKeyBatch<L>> for OrdKeyBuilder<L> {

fn new() -> Self {
OrdKeyBuilder {
builder: OrderedBuilder::<K, OrderedLeafBuilder<T, R>, O, CK>::new()
builder: <KTDBuilder<L>>::new()
}
}

fn with_capacity(cap: usize) -> Self {
OrdKeyBuilder {
builder: <OrderedBuilder<K, OrderedLeafBuilder<T, R>, O, CK> as TupleBuilder>::with_capacity(cap)
builder: <KTDBuilder<L> as TupleBuilder>::with_capacity(cap)
}
}

#[inline]
fn push(&mut self, (key, _, time, diff): (K, (), T, R)) {
fn push(&mut self, (key, _, time, diff): (<L::Target as Update>::Key, (), <L::Target as Update>::Time, <L::Target as Update>::Diff)) {
self.builder.push_tuple((key, (time, diff)));
}

#[inline(never)]
fn done(self, lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> OrdKeyBatch<K, T, R, O, CK> {
fn done(self, lower: Antichain<<L::Target as Update>::Time>, upper: Antichain<<L::Target as Update>::Time>, since: Antichain<<L::Target as Update>::Time>) -> OrdKeyBatch<L> {
OrdKeyBatch {
layer: self.builder.done(),
desc: Description::new(lower, upper, since)
Expand Down

0 comments on commit 344e034

Please sign in to comment.