Skip to content

Commit

Permalink
Move Batcher::seal to Builder (#546)
Browse files Browse the repository at this point in the history
* Reorganize the boundary between batchers and builders

* Graspan tidy
  • Loading branch information
frankmcsherry authored Dec 6, 2024
1 parent cf97c1a commit dafe288
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 140 deletions.
2 changes: 1 addition & 1 deletion experiments/src/bin/graspan2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ fn unoptimized() {

let value_flow_next =
value_flow_next
.arrange::<ValBatcher<_,_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
.arrange::<KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>()
// .distinct_total_core::<Diff>()
.threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None })
;
Expand Down
57 changes: 2 additions & 55 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ where
T: Timestamp,
{
type Input = Input;
type Output = M::Output;
type Time = T;
type Output = M::Chunk;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Self {
Expand Down Expand Up @@ -109,7 +109,7 @@ where

self.stash.clear();

let seal = M::seal::<B>(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
let seal = B::seal(&mut readied, self.lower.borrow(), upper.borrow(), Antichain::from_elem(T::minimum()).borrow());
self.lower = upper;
seal
}
Expand Down Expand Up @@ -204,10 +204,6 @@ where
pub trait Merger: Default {
/// The internal representation of chunks of data.
type Chunk: Container;
/// The output type
/// TODO: This should be replaced by `Chunk` or another container once the builder understands
/// building from a complete chain.
type Output;
/// The type of time in frontiers to extract updates.
type Time;
/// Merge chains into an output chain.
Expand All @@ -223,15 +219,6 @@ pub trait Merger: Default {
stash: &mut Vec<Self::Chunk>,
);

/// Build from a chain
/// TODO: We can move this entirely to `MergeBatcher` once builders can accepts chains.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output;

/// Account size and allocation changes. Returns a tuple of (records, size, capacity, allocations).
fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize);
}
Expand Down Expand Up @@ -286,7 +273,6 @@ where
{
type Time = T;
type Chunk = Vec<((K, V), T, R)>;
type Output = Vec<((K, V), T, R)>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
Expand Down Expand Up @@ -401,45 +387,6 @@ where
readied.push(ready);
}
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for buffer in chain.iter() {
for ((key, val), time, _) in buffer.iter() {
if !upper.less_equal(time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
}
let mut builder = B::with_capacity(keys, vals, upds);

for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
(chunk.len(), 0, 0, 0)
}
Expand Down
39 changes: 0 additions & 39 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use timely::{Container, Data, PartialOrder};

use crate::difference::Semigroup;
use crate::trace::implementations::merge_batcher::Merger;
use crate::trace::Builder;

/// A merger for timely stacks
pub struct ColumnationMerger<T> {
Expand Down Expand Up @@ -62,7 +61,6 @@ where
{
type Time = T;
type Chunk = TimelyStack<((K, V), T, R)>;
type Output = TimelyStack<((K, V), T, R)>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
Expand Down Expand Up @@ -183,43 +181,6 @@ where
}
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for buffer in chain.iter() {
for ((key, val), time, _) in buffer.iter() {
if !upper.less_equal(time) {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
}
let mut builder = B::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
let cb = |siz, cap| {
Expand Down
45 changes: 1 addition & 44 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
use std::cmp::Ordering;
use std::marker::PhantomData;
use timely::progress::frontier::{Antichain, AntichainRef};
use timely::{Container, Data, PartialOrder};
use timely::{Data, PartialOrder};
use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};

use crate::difference::{IsZero, Semigroup};
use crate::trace::implementations::merge_batcher::Merger;
use crate::trace::Builder;
use crate::trace::cursor::IntoOwned;

/// A merger for flat stacks.
Expand Down Expand Up @@ -110,7 +109,6 @@ where
{
type Time = MC::TimeOwned;
type Chunk = FlatStack<MC>;
type Output = FlatStack<MC>;

fn merge(&mut self, list1: Vec<Self::Chunk>, list2: Vec<Self::Chunk>, output: &mut Vec<Self::Chunk>, stash: &mut Vec<Self::Chunk>) {
let mut list1 = list1.into_iter();
Expand Down Expand Up @@ -242,47 +240,6 @@ where
}
}

fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(
chain: &mut Vec<Self::Chunk>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> B::Output {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
{
let mut prev_keyval = None;
for buffer in chain.iter() {
for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) {
if !upper.less_equal(&time) {
if let Some((p_key, p_val)) = prev_keyval {
debug_assert!(p_key <= key);
debug_assert!(p_key != key || p_val <= val);
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
}
}
let mut builder = B::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}

fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) {
let (mut size, mut capacity, mut allocations) = (0, 0, 0);
let cb = |siz, cap| {
Expand Down
78 changes: 78 additions & 0 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {

/// Test that the value equals a key in the layout's value container.
fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;

/// Count the number of distinct keys, (key, val) pairs, and total updates.
fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
}

impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
Expand Down Expand Up @@ -372,6 +375,31 @@ where
fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == this
}

fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}

impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
Expand Down Expand Up @@ -401,6 +429,31 @@ where
fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
V::reborrow(other) == *this
}

fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}

mod flatcontainer {
Expand Down Expand Up @@ -483,6 +536,31 @@ mod flatcontainer {
fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool {
VBC::reborrow(other) == V::reborrow(*this)
}

fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
let mut keys = 0;
let mut vals = 0;
let mut upds = 0;
let mut prev_keyval = None;
for link in chain.iter() {
for ((key, val), _, _) in link.iter() {
if let Some((p_key, p_val)) = prev_keyval {
if p_key != key {
keys += 1;
vals += 1;
} else if p_val != val {
vals += 1;
}
} else {
keys += 1;
vals += 1;
}
upds += 1;
prev_keyval = Some((key, val));
}
}
(keys, vals, upds)
}
}
}

Expand Down
31 changes: 30 additions & 1 deletion src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,22 @@ mod val_batch {
description: Description::new(lower, upper, since),
}
}
}

fn seal(
chain: &mut Vec<Self::Input>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}
}
}

mod key_batch {
Expand Down Expand Up @@ -1167,6 +1181,21 @@ mod key_batch {
description: Description::new(lower, upper, since),
}
}

fn seal(
chain: &mut Vec<Self::Input>,
lower: AntichainRef<Self::Time>,
upper: AntichainRef<Self::Time>,
since: AntichainRef<Self::Time>,
) -> Self::Output {
let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
let mut builder = Self::with_capacity(keys, vals, upds);
for mut chunk in chain.drain(..) {
builder.push(&mut chunk);
}

builder.done(lower.to_owned(), upper.to_owned(), since.to_owned())
}
}

}
Loading

0 comments on commit dafe288

Please sign in to comment.