From 35f518046226c27892b162c5e1b6ff6965e93d6e Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 11 Nov 2024 10:45:59 -0500 Subject: [PATCH] Changes to track timely's #597 (#538) * Changes to track timely's #597 * Update dogs3 * Update interactive --- Cargo.toml | 2 +- dogsdogsdogs/src/operators/half_join.rs | 4 +- dogsdogsdogs/src/operators/lookup_map.rs | 5 +- interactive/src/logging.rs | 11 +-- src/capture.rs | 2 +- src/dynamic/mod.rs | 6 +- src/operators/arrange/upsert.rs | 4 +- src/operators/consolidate.rs | 4 +- src/operators/count.rs | 4 +- src/operators/join.rs | 10 +-- src/operators/reduce.rs | 5 +- src/operators/threshold.rs | 4 +- src/trace/implementations/chunker.rs | 81 +++++----------------- src/trace/implementations/merge_batcher.rs | 5 +- src/trace/mod.rs | 3 +- tests/trace.rs | 5 +- 16 files changed, 39 insertions(+), 116 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 03e62168d..594f3051c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ fnv="1.0.2" timely = {workspace = true} [workspace.dependencies] -timely = { version = "0.13", default-features = false } +timely = { version = "0.14", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [features] diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 71a92696a..56d0fcf07 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -151,7 +151,6 @@ where let arrangement_stream = arrangement.stream; let mut stash = HashMap::new(); - let mut buffer = Vec::new(); let exchange = Exchange::new(move |update: &((K, V, G::Timestamp),G::Timestamp,R)| (update.0).0.hashed().into()); @@ -167,10 +166,9 @@ where // drain the first input, stashing requests. input1.for_each(|capability, data| { - data.swap(&mut buffer); stash.entry(capability.retain()) .or_insert(Vec::new()) - .extend(buffer.drain(..)) + .extend(data.drain(..)) }); // Drain input batches; although we do not observe them, we want access to the input diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index a805140fe..0440e2bf8 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -49,8 +49,6 @@ where let mut logic1 = key_selector.clone(); let mut logic2 = key_selector.clone(); - let mut buffer = Vec::new(); - let mut key: K = supplied_key0; let exchange = Exchange::new(move |update: &(D,G::Timestamp,R)| { logic1(&update.0, &mut key); @@ -64,10 +62,9 @@ where // drain the first input, stashing requests. input1.for_each(|capability, data| { - data.swap(&mut buffer); stash.entry(capability.retain()) .or_insert(Vec::new()) - .extend(buffer.drain(..)) + .extend(data.drain(..)) }); // Drain input batches; although we do not observe them, we want access to the input diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index be07fa4b6..a7dd1205d 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -55,8 +55,6 @@ where let (mut park_out, park) = demux.new_output(); let (mut text_out, text) = demux.new_output(); - let mut demux_buffer = Vec::new(); - demux.build(move |_capability| { move |_frontiers| { @@ -71,8 +69,6 @@ where input.for_each(|time, data| { - data.swap(&mut demux_buffer); - let mut operates_session = operates.session(&time); let mut channels_session = channels.session(&time); let mut schedule_session = schedule.session(&time); @@ -81,7 +77,7 @@ where let mut park_session = park.session(&time); let mut text_session = text.session(&time); - for (time, _worker, datum) in demux_buffer.drain(..) { + for (time, _worker, datum) in data.drain(..) { // Round time up to next multiple of `granularity_ns`. let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns; @@ -235,8 +231,6 @@ where let (mut batch_out, batch) = demux.new_output(); let (mut merge_out, merge) = demux.new_output(); - let mut demux_buffer = Vec::new(); - demux.build(move |_capability| { move |_frontiers| { @@ -246,11 +240,10 @@ where input.for_each(|time, data| { - data.swap(&mut demux_buffer); let mut batch_session = batch.session(&time); let mut merge_session = merge.session(&time); - for (time, _worker, datum) in demux_buffer.drain(..) { + for (time, _worker, datum) in data.drain(..) { // Round time up to next multiple of `granularity_ns`. let time_ns = (((time.as_nanos() as u64) / granularity_ns) + 1) * granularity_ns; diff --git a/src/capture.rs b/src/capture.rs index d546ad4a2..049441950 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -600,7 +600,7 @@ pub mod sink { } // Now record the update to the writer. - send_queue.push_back(Message::Updates(updates.replace(Vec::new()))); + send_queue.push_back(Message::Updates(std::mem::take(updates))); // Transmit timestamp counts downstream. output diff --git a/src/dynamic/mod.rs b/src/dynamic/mod.rs index d13eadcd8..e1d4c4474 100644 --- a/src/dynamic/mod.rs +++ b/src/dynamic/mod.rs @@ -45,22 +45,20 @@ where let (mut output, stream) = builder.new_output(); let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]); - let mut vector = Default::default(); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); input.for_each(|cap, data| { - data.swap(&mut vector); let mut new_time = cap.time().clone(); let mut vec = std::mem::take(&mut new_time.inner).into_vec(); vec.truncate(level - 1); new_time.inner = PointStamp::new(vec); let new_cap = cap.delayed(&new_time); - for (_data, time, _diff) in vector.iter_mut() { + for (_data, time, _diff) in data.iter_mut() { let mut vec = std::mem::take(&mut time.inner).into_vec(); vec.truncate(level - 1); time.inner = PointStamp::new(vec); } - output.session(&new_cap).give_container(&mut vector); + output.session(&new_cap).give_container(data); }); }); diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 5d687856f..867bc055c 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -161,7 +161,6 @@ where // Tracks the lower envelope of times in `priority_queue`. let mut capabilities = Antichain::>::new(); - let mut buffer = Vec::new(); // Form the trace we will both use internally and publish. let activator = Some(stream.scope().activator_for(info.address.clone())); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); @@ -186,8 +185,7 @@ where // Stash capabilities and associated data (ordered by time). input.for_each(|cap, data| { capabilities.insert(cap.retain()); - data.swap(&mut buffer); - for (key, val, time) in buffer.drain(..) { + for (key, val, time) in data.drain(..) { priority_queue.push(std::cmp::Reverse((time, key, val))) } }); diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 4d89d7a86..41824bd6f 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -97,11 +97,9 @@ where self.inner .unary::, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| { - let mut vector = Vec::new(); move |input, output| { input.for_each(|time, data| { - data.swap(&mut vector); - output.session_with_builder(&time).give_iterator(vector.drain(..)); + output.session_with_builder(&time).give_iterator(data.drain(..)); }) } }) diff --git a/src/operators/count.rs b/src/operators/count.rs index b39bb8769..af310b664 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -65,7 +65,6 @@ where fn count_total_core + 'static>(&self) -> Collection { let mut trace = self.trace.clone(); - let mut buffer = Vec::new(); self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| { @@ -87,8 +86,7 @@ where if cap.is_none() { // NB: Assumes batches are in-order cap = Some(capability.retain()); } - batches.swap(&mut buffer); - for batch in buffer.drain(..) { + for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order batch_cursors.push(batch.cursor()); batch_storage.push(batch); diff --git a/src/operators/join.rs b/src/operators/join.rs index e5772d1c6..f029a4c3d 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -444,10 +444,6 @@ where let mut trace1_option = Some(trace1); let mut trace2_option = Some(trace2); - // Swappable buffers for input extraction. - let mut input1_buffer = Vec::new(); - let mut input2_buffer = Vec::new(); - move |input1, input2, output| { // 1. Consuming input. @@ -468,8 +464,7 @@ where // This test *should* always pass, as we only drop a trace in response to the other input emptying. if let Some(ref mut trace2) = trace2_option { let capability = capability.retain(); - data.swap(&mut input1_buffer); - for batch1 in input1_buffer.drain(..) { + for batch1 in data.drain(..) { // Ignore any pre-loaded data. if PartialOrder::less_equal(&acknowledged1, batch1.lower()) { if !batch1.is_empty() { @@ -496,8 +491,7 @@ where // This test *should* always pass, as we only drop a trace in response to the other input emptying. if let Some(ref mut trace1) = trace1_option { let capability = capability.retain(); - data.swap(&mut input2_buffer); - for batch2 in input2_buffer.drain(..) { + for batch2 in data.drain(..) { // Ignore any pre-loaded data. if PartialOrder::less_equal(&acknowledged2, batch2.lower()) { if !batch2.is_empty() { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index a26f0072e..6b1017130 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -373,8 +373,6 @@ where let mut output_upper = Antichain::from_elem(::minimum()); let mut output_lower = Antichain::from_elem(::minimum()); - let mut input_buffer = Vec::new(); - let id = trace.stream.scope().index(); move |input, output| { @@ -409,8 +407,7 @@ where // times in the batch. input.for_each(|capability, batches| { - batches.swap(&mut input_buffer); - for batch in input_buffer.drain(..) { + for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor()); batch_storage.push(batch); diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index 3825f0e1c..d2add618f 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -112,7 +112,6 @@ where { let mut trace = self.trace.clone(); - let mut buffer = Vec::new(); self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| { @@ -134,8 +133,7 @@ where if cap.is_none() { // NB: Assumes batches are in-order cap = Some(capability.retain()); } - batches.swap(&mut buffer); - for batch in buffer.drain(..) { + for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order batch_cursors.push(batch.cursor()); batch_storage.push(batch); diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 527a614d0..478480803 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -1,7 +1,6 @@ //! Organize streams of data into sorted chunks. use std::collections::VecDeque; -use timely::communication::message::RefOrMut; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; @@ -64,14 +63,14 @@ where } } -impl<'a, K, V, T, R> PushInto>> for VecChunker<((K, V), T, R)> +impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)> where K: Ord + Clone, V: Ord + Clone, T: Ord + Clone, R: Semigroup + Clone, { - fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) { + fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity // because we don't write more than capacity elements into the buffer. // Important: Consolidation requires `pending` to have twice the chunk capacity to @@ -80,27 +79,11 @@ where self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); } - // `container` is either a shared reference or an owned allocations. - match container { - RefOrMut::Ref(vec) => { - let mut slice = &vec[..]; - while !slice.is_empty() { - let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); - slice = tail; - self.pending.extend_from_slice(head); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } - } - RefOrMut::Mut(vec) => { - let mut drain = vec.drain(..).peekable(); - while drain.peek().is_some() { - self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } + let mut drain = container.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); } } } @@ -196,41 +179,25 @@ where } } -impl<'a, K, V, T, R> PushInto>> for ColumnationChunker<((K, V), T, R)> +impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for ColumnationChunker<((K, V), T, R)> where K: Columnation + Ord + Clone, V: Columnation + Ord + Clone, T: Columnation + Ord + Clone, R: Columnation + Semigroup + Clone, { - fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) { + fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity // because we don't write more than capacity elements into the buffer. if self.pending.capacity() < Self::chunk_capacity() * 2 { self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); } - // `container` is either a shared reference or an owned allocations. - match container { - RefOrMut::Ref(vec) => { - let mut slice = &vec[..]; - while !slice.is_empty() { - let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); - slice = tail; - self.pending.extend_from_slice(head); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } - } - RefOrMut::Mut(vec) => { - let mut drain = vec.drain(..).peekable(); - while drain.peek().is_some() { - self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } + let mut drain = container.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); } } } @@ -288,7 +255,7 @@ where } } -impl<'a, Input, Output> PushInto> for ContainerChunker +impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker where Input: Container, Output: SizableContainer @@ -296,7 +263,7 @@ where + PushInto> + PushInto>, { - fn push_into(&mut self, container: RefOrMut<'a, Input>) { + fn push_into(&mut self, container: &'a mut Input) { if self.pending.capacity() < Output::preferred_capacity() { self.pending.reserve(Output::preferred_capacity() - self.pending.len()); } @@ -313,19 +280,9 @@ where } } }; - match container { - RefOrMut::Ref(container) => { - for item in container.iter() { - self.pending.push(item); - form_batch(self); - } - } - RefOrMut::Mut(container) => { - for item in container.drain() { - self.pending.push(item); - form_batch(self); - } - } + for item in container.drain() { + self.pending.push(item); + form_batch(self); } } } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index cd4e7e72a..4e5c6cb16 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -3,7 +3,6 @@ use std::collections::VecDeque; use std::marker::PhantomData; -use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; use timely::progress::frontier::AntichainRef; @@ -45,7 +44,7 @@ where impl Batcher for MergeBatcher where - C: ContainerBuilder + Default + for<'a> PushInto>, + C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, M: Merger