Skip to content

Commit

Permalink
Changes to track timely master (#542)
Browse files Browse the repository at this point in the history
* Update bounds to track timely master

* Update around container traits

* Point at timely on crates

Signed-off-by: Moritz Hoffmann <[email protected]>

---------

Signed-off-by: Moritz Hoffmann <[email protected]>
Co-authored-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
frankmcsherry and antiguru authored Dec 6, 2024
1 parent a3bf1db commit c90b92e
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ fnv="1.0.2"
timely = {workspace = true}

[workspace.dependencies]
timely = { version = "0.14", default-features = false }
timely = { version = "0.15", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[features]
Expand Down
4 changes: 2 additions & 2 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl<G: Scope, D, R, C> Collection<G, D, R, C> {
Collection { inner: stream, phantom: std::marker::PhantomData }
}
}
impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
Expand Down Expand Up @@ -654,7 +654,7 @@ where
G: Scope,
D: Data,
R: Semigroup+'static,
C: Container,
C: Container + Clone + 'static,
I: IntoIterator<Item=Collection<G, D, R, C>>,
{
scope
Expand Down
6 changes: 3 additions & 3 deletions src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
use timely::container::{ContainerBuilder, PushInto};
use timely::container::flatcontainer::{FlatStack, Push, Region};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use crate::Data;
Expand Down Expand Up @@ -156,7 +156,7 @@ where
// TODO: Can we replace `multiple` by a bool?
#[cold]
fn consolidate_and_flush_through(&mut self, multiple: usize) {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
consolidate_updates(&mut self.current);
let mut drain = self.current.drain(..(self.current.len()/multiple)*multiple).peekable();
while drain.peek().is_some() {
Expand All @@ -180,7 +180,7 @@ where
/// Precondition: `current` is not allocated or has space for at least one element.
#[inline]
fn push_into(&mut self, item: P) {
let preferred_capacity = <Vec<(D,T,R)>>::preferred_capacity();
let preferred_capacity = timely::container::buffer::default_capacity::<(D, T, R)>();
if self.current.capacity() < preferred_capacity * 2 {
self.current.reserve(preferred_capacity * 2 - self.current.capacity());
}
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ where
G::Timestamp: Lattice,
P: ParallelizationContract<G::Timestamp, Ba::Input>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Ba::Input: Container,
Ba::Input: Container + Clone + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Expand Down
12 changes: 6 additions & 6 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,15 @@ where
+ PushInto<Input::ItemRef<'a>>,
{
fn push_into(&mut self, container: &'a mut Input) {
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
self.pending.ensure_capacity(&mut None);

let form_batch = |this: &mut Self| {
if this.pending.len() == this.pending.capacity() {
if this.pending.at_capacity() {
let starting_len = this.pending.len();
consolidate_container(&mut this.pending, &mut this.empty);
std::mem::swap(&mut this.pending, &mut this.empty);
this.empty.clear();
if this.pending.len() > this.pending.capacity() / 2 {
if this.pending.len() > starting_len / 2 {
// Note that we're pushing non-full containers, which is a deviation from
// other implementation. The reason for this is that we cannot extract
// partial data from `this.pending`. We should revisit this in the future.
Expand All @@ -289,7 +289,7 @@ where

impl<Output> ContainerBuilder for ContainerChunker<Output>
where
Output: SizableContainer + ConsolidateLayout,
Output: SizableContainer + ConsolidateLayout + Clone + 'static,
{
type Container = Output;

Expand Down

0 comments on commit c90b92e

Please sign in to comment.