Skip to content

Commit

Permalink
Replace Chunker with container builder
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jun 13, 2024
1 parent f52db45 commit e279663
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 85 deletions.
149 changes: 83 additions & 66 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,45 +5,23 @@ use std::marker::PhantomData;
use timely::communication::message::RefOrMut;
use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::{PushInto, SizableContainer};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use crate::consolidation::{consolidate_updates, ConsolidateContainer};
use crate::difference::Semigroup;

/// Behavior to transform streams of data into sorted chunks of regular size.
pub trait Chunker {
/// Input type.
type Input;
/// Output type.
type Output;

/// Accept a container and absorb its contents. The caller must
/// call [`extract`] or [`finish`] soon after pushing a container.
fn push_container(&mut self, container: RefOrMut<Self::Input>);

/// Extract ready data, leaving unfinished data behind.
///
/// Should be called repeatedly until it returns `None`, which indicates that there is no
/// more ready data.
fn extract(&mut self) -> Option<Self::Output>;

/// Unconditionally extract all data, leaving no unfinished data behind.
///
/// Should be called repeatedly until it returns `None`, which indicates that there is no
/// more data.
fn finish(&mut self) -> Option<Self::Output>;
}

/// Chunk a stream of vectors into chains of vectors.
pub struct VecChunker<T> {
pending: Vec<T>,
ready: VecDeque<Vec<T>>,
empty: Option<Vec<T>>,
}

impl<T> Default for VecChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: VecDeque::default(),
empty: None,
}
}
}
Expand Down Expand Up @@ -87,17 +65,14 @@ where
}
}

impl<K, V, T, R> Chunker for VecChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for VecChunker<((K, V), T, R)>
where
K: Ord + Clone,
V: Ord + Clone,
T: Ord + Clone,
R: Semigroup + Clone,
{
type Input = Vec<((K, V), T, R)>;
type Output = Self::Input;

fn push_container(&mut self, container: RefOrMut<Self::Input>) {
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
// Important: Consolidation requires `pending` to have twice the chunk capacity to
Expand Down Expand Up @@ -130,12 +105,27 @@ where
}
}
}
}

impl<K, V, T, R> ContainerBuilder for VecChunker<((K, V), T, R)>
where
K: Ord + Clone + 'static,
V: Ord + Clone + 'static,
T: Ord + Clone + 'static,
R: Semigroup + Clone + 'static,
{
type Container = Vec<((K, V), T, R)>;

fn extract(&mut self) -> Option<Self::Output> {
self.ready.pop_front()
fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
}

fn finish(&mut self) -> Option<Self::Output> {
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.pending.is_empty() {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
Expand All @@ -144,21 +134,24 @@ where
self.ready.push_back(chunk);
}
}
self.ready.pop_front()
self.empty = self.ready.pop_front();
self.empty.as_mut()
}
}

/// Chunk a stream of vectors into chains of vectors.
pub struct ColumnationChunker<T: Columnation> {
pending: Vec<T>,
ready: VecDeque<TimelyStack<T>>,
empty: Option<TimelyStack<T>>,
}

impl<T: Columnation> Default for ColumnationChunker<T> {
fn default() -> Self {
Self {
pending: Vec::default(),
ready: VecDeque::default(),
empty: None,
}
}
}
Expand Down Expand Up @@ -204,17 +197,14 @@ where
}
}

impl<K, V, T, R> Chunker for ColumnationChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord + Clone,
V: Columnation + Ord + Clone,
T: Columnation + Ord + Clone,
R: Columnation + Semigroup + Clone,
{
type Input = Vec<((K, V), T, R)>;
type Output = TimelyStack<((K,V),T,R)>;

fn push_container(&mut self, container: RefOrMut<Self::Input>) {
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
Expand Down Expand Up @@ -245,12 +235,27 @@ where
}
}
}
}

fn extract(&mut self) -> Option<Self::Output> {
self.ready.pop_front()
impl<K, V, T, R> ContainerBuilder for ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord + Clone + 'static,
V: Columnation + Ord + Clone + 'static,
T: Columnation + Ord + Clone + 'static,
R: Columnation + Semigroup + Clone + 'static,
{
type Container = TimelyStack<((K,V),T,R)>;

fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = Some(ready);
self.empty.as_mut()
} else {
None
}
}

fn finish(&mut self) -> Option<Self::Output> {
fn finish(&mut self) -> Option<&mut Self::Container> {
consolidate_updates(&mut self.pending);
while !self.pending.is_empty() {
let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity());
Expand All @@ -259,51 +264,44 @@ where
}
self.ready.push_back(chunk);
}
self.ready.pop_front()
self.empty = self.ready.pop_front();
self.empty.as_mut()
}
}

/// Chunk a stream of vectors into chains of vectors.
pub struct ContainerChunker<Input, Output, Consolidator>
where
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::ItemRef<'a>>,
Consolidator: ConsolidateContainer<Output>,
{
pub struct ContainerChunker<Input, Output, Consolidator> {
pending: Output,
empty: Output,
ready: Vec<Output>,
ready: VecDeque<Output>,
consolidator: Consolidator,
_marker: PhantomData<(Input, Consolidator)>,
}

impl<Input, Output, Consolidator> Default for ContainerChunker<Input, Output, Consolidator>
where
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::ItemRef<'a>>,
Consolidator: ConsolidateContainer<Output> + Default,
Input: Default,
Output: Default,
Consolidator: Default,
{
fn default() -> Self {
Self {
pending: Output::default(),
empty: Output::default(),
ready: Vec::default(),
ready: VecDeque::default(),
consolidator: Consolidator::default(),
_marker: PhantomData,
}
}
}

impl<Input, Output, Consolidator> Chunker for ContainerChunker<Input, Output, Consolidator>
impl<'a, Input, Output, Consolidator> PushInto<RefOrMut<'a, Input>> for ContainerChunker<Input, Output, Consolidator>
where
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::Item<'a>> + PushInto<Input::ItemRef<'a>>,
Output: SizableContainer + PushInto<Input::Item<'a>> + PushInto<Input::ItemRef<'a>>,
Consolidator: ConsolidateContainer<Output>,
{
type Input = Input;
type Output = Output;

fn push_container(&mut self, container: RefOrMut<Self::Input>) {
fn push_into(&mut self, container: RefOrMut<'a, Input>) {
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
Expand All @@ -316,7 +314,7 @@ where
// Note that we're pushing non-full containers, which is a deviation from
// other implementation. The reason for this is that we cannot extract
// partial data from `this.pending`. We should revisit this in the future.
this.ready.push(std::mem::take(&mut this.pending));
this.ready.push_back(std::mem::take(&mut this.pending));
}
}
};
Expand All @@ -335,20 +333,39 @@ where
}
}
}
}

fn extract(&mut self) -> Option<Self::Output> {
self.ready.pop()
impl<Input, Output, Consolidator> ContainerBuilder for ContainerChunker<Input, Output, Consolidator>
where
Input: Container,
for<'a> Output: SizableContainer + PushInto<Input::Item<'a>> + PushInto<Input::ItemRef<'a>>,
Consolidator: ConsolidateContainer<Output> + Default + 'static,
{
type Container = Output;

fn extract(&mut self) -> Option<&mut Self::Container> {
if let Some(ready) = self.ready.pop_front() {
self.empty = ready;
Some(&mut self.empty)
} else {
None
}
}

fn finish(&mut self) -> Option<Self::Output> {
fn finish(&mut self) -> Option<&mut Self::Container> {
if !self.pending.is_empty() {
self.consolidator.consolidate(&mut self.pending, &mut self.empty);
std::mem::swap(&mut self.pending, &mut self.empty);
self.empty.clear();
if !self.pending.is_empty() {
self.ready.push(std::mem::take(&mut self.pending));
self.ready.push_back(std::mem::take(&mut self.pending));
}
}
self.ready.pop()
if let Some(ready) = self.ready.pop_front() {
self.empty = ready;
Some(&mut self.empty)
} else {
None
}
}
}
28 changes: 16 additions & 12 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use timely::logging_core::Logger;
use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::{Container, PartialOrder};
use timely::container::{ContainerBuilder, PushInto};

use crate::difference::Semigroup;
use crate::logging::{BatcherEvent, DifferentialEvent};
use crate::trace::{Batcher, Builder};
use crate::Data;
use crate::trace::implementations::chunker::Chunker;

/// Creates batches from unordered tuples.
pub struct MergeBatcher<C, M, T>
pub struct MergeBatcher<Input, C, M, T>
where
C: Chunker<Output=M::Chunk> + Default,
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
{
/// each power-of-two length list of allocations.
Expand All @@ -40,15 +40,16 @@ where
lower: Antichain<T>,
/// The lower-bound frontier of the data, after the last call to seal.
frontier: Antichain<T>,
_marker: PhantomData<Input>,
}

impl<C, M, T> Batcher for MergeBatcher<C, M, T>
impl<Input, C, M, T> Batcher for MergeBatcher<Input, C, M, T>
where
C: Chunker<Output=M::Chunk> + Default,
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<RefOrMut<'a, Input>>,
M: Merger<Time = T>,
T: Timestamp,
{
type Input = C::Input;
type Input = Input;
type Output = M::Output;
type Time = T;

Expand All @@ -62,14 +63,16 @@ where
stash: Vec::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(T::minimum()),
_marker: PhantomData,
}
}

/// Push a container of data into this merge batcher. Updates the internal chain structure if
/// needed.
fn push_container(&mut self, container: RefOrMut<C::Input>) {
self.chunker.push_container(container);
fn push_container(&mut self, container: RefOrMut<Input>) {
self.chunker.push_into(container);
while let Some(chunk) = self.chunker.extract() {
let chunk = std::mem::take(chunk);
self.insert_chain(vec![chunk]);
}
}
Expand All @@ -81,6 +84,7 @@ where
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
// Finish
while let Some(chunk) = self.chunker.finish() {
let chunk = std::mem::take(chunk);
self.insert_chain(vec![chunk]);
}

Expand Down Expand Up @@ -118,9 +122,9 @@ where
}
}

impl<C, M, T> MergeBatcher<C, M, T>
impl<Input, C, M, T> MergeBatcher<Input, C, M, T>
where
C: Chunker<Output=M::Chunk> + Default,
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
{
/// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
Expand Down Expand Up @@ -186,9 +190,9 @@ where
}
}

impl<C, M, T> Drop for MergeBatcher<C, M, T>
impl<Input, C, M, T> Drop for MergeBatcher<Input, C, M, T>
where
C: Chunker<Output=M::Chunk> + Default,
C: ContainerBuilder<Container=M::Chunk> + Default,
M: Merger<Time = T>,
{
fn drop(&mut self) {
Expand Down
Loading

0 comments on commit e279663

Please sign in to comment.