From 2a439f2370642d56445b4b133d17061588cf80c2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 16 Jan 2025 15:46:40 -0500 Subject: [PATCH 1/4] Slow the rate of growth in ConsolidatingVec --- src/compute/src/sink/correction.rs | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/compute/src/sink/correction.rs b/src/compute/src/sink/correction.rs index aff9b8e433e9a..ec9fd9779946f 100644 --- a/src/compute/src/sink/correction.rs +++ b/src/compute/src/sink/correction.rs @@ -326,21 +326,24 @@ impl ConsolidatingVec { /// Pushes `item` into the vector. /// - /// If the vector does not have sufficient capacity, we try to consolidate and/or double its - /// capacity. + /// If the vector does not have sufficient capacity, we'll first consolidate and then increase + /// its capacity if the consolidated results still occupy a significant fraction of the vector. /// /// The worst-case cost of this function is O(n log n) in the number of items the vector stores, - /// but amortizes to O(1). + /// but amortizes to O(log n). pub fn push(&mut self, item: (D, Diff)) { let capacity = self.data.capacity(); if self.data.len() == capacity { // The vector is full. First, consolidate to try to recover some space. self.consolidate(); - // If consolidation didn't free at least half the available capacity, double the - // capacity. This ensures we won't consolidate over and over again with small gains. - if self.data.len() > capacity / 2 { - self.data.reserve(capacity); + // If consolidation didn't free enough space, at least a linear amount, increase the capacity + // The `slop` term describes the rate of growth, where we scale up by factors of 1/slop. + // Setting `slop` to 2 results in doubling whenever the list is at least half full. + // Larger numbers result in more conservative approaches that use more CPU, but less memory. + let slop = 10; + if self.data.len() > capacity * (slop - 1) / slop { + self.data.reserve(capacity / (slop - 1)); } } @@ -352,7 +355,7 @@ impl ConsolidatingVec { consolidate(&mut self.data); // We may have the opportunity to reclaim allocated memory. - // Given that `push` will double the capacity when the vector is more than half full, and + // Given that `push` will at most double the capacity when the vector is more than half full, and // we want to avoid entering into a resizing cycle, we choose to only shrink if the // vector's length is less than one fourth of its capacity. if self.data.len() < self.data.capacity() / 4 { From 8061a40ad08e56c02f63e7e9c396c4e7ad309113 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Fri, 17 Jan 2025 10:35:06 -0500 Subject: [PATCH 2/4] Introduce as LaunchDarkly config --- src/compute-types/src/dyncfgs.rs | 8 ++++++ src/compute/src/render/continual_task.rs | 2 +- src/compute/src/sink/correction.rs | 30 ++++++++++++++------ src/compute/src/sink/materialized_view.rs | 14 +++++++-- src/compute/src/sink/materialized_view_v2.rs | 19 +++++++++++-- 5 files changed, 59 insertions(+), 14 deletions(-) diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index d862efadcae86..08f45a6cd3a35 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -99,6 +99,13 @@ pub const LGALLOC_SLOW_CLEAR_BYTES: Config = Config::new( "Clear byte size per size class for every invocation", ); +/// The denominator `n` in the growth rate `1 + 1/(n-1)` for `ConsolidatingVec` growth. +pub const CONSOLIDATING_VEC_GROWTH_DENOMINATOR: Config = Config::new( + "consolidating_vec_growth_denominator", + 2, + "Denominator in growth rate for consolidating vector size", +); + /// The number of dataflows that may hydrate concurrently. pub const HYDRATION_CONCURRENCY: Config = Config::new( "compute_hydration_concurrency", @@ -179,4 +186,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION) .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET) .add(&COMPUTE_APPLY_COLUMN_DEMANDS) + .add(&CONSOLIDATING_VEC_GROWTH_DENOMINATOR) } diff --git a/src/compute/src/render/continual_task.rs b/src/compute/src/render/continual_task.rs index 16be4f1557024..a8ef6c3d64424 100644 --- a/src/compute/src/render/continual_task.rs +++ b/src/compute/src/render/continual_task.rs @@ -670,7 +670,7 @@ impl SinkState { SinkState { append_times: BTreeSet::new(), append_times_progress: Antichain::from_elem(Timestamp::minimum()), - to_append: ConsolidatingVec::with_min_capacity(128), + to_append: ConsolidatingVec::new(128, 2), to_append_progress: Antichain::from_elem(Timestamp::minimum()), output_progress: Antichain::from_elem(Timestamp::minimum()), } diff --git a/src/compute/src/sink/correction.rs b/src/compute/src/sink/correction.rs index ec9fd9779946f..45757c2b034d1 100644 --- a/src/compute/src/sink/correction.rs +++ b/src/compute/src/sink/correction.rs @@ -47,17 +47,24 @@ pub(super) struct Correction { metrics: SinkMetrics, /// Per-worker persist sink metrics. worker_metrics: SinkWorkerMetrics, + /// Configuration for `ConsolidatingVec` determining the rate of growth (doubling, or less). + growth_denominator: usize, } impl Correction { /// Construct a new `Correction` instance. - pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self { + pub fn new( + metrics: SinkMetrics, + worker_metrics: SinkWorkerMetrics, + growth_denominator: usize, + ) -> Self { Self { updates: Default::default(), since: Antichain::from_elem(Timestamp::MIN), total_size: Default::default(), metrics, worker_metrics, + growth_denominator, } } @@ -125,7 +132,8 @@ impl Correction { use std::collections::btree_map::Entry; match self.updates.entry(time) { Entry::Vacant(entry) => { - let vec: ConsolidatingVec<_> = data.collect(); + let mut vec: ConsolidatingVec<_> = data.collect(); + vec.growth_denominator = self.growth_denominator; new_size += (vec.len(), vec.capacity()); entry.insert(vec); } @@ -304,13 +312,22 @@ pub(crate) struct ConsolidatingVec { /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap /// might start smaller than this. min_capacity: usize, + /// Denominator in the growth rate, where 2 corresponds to doubling, and `n` to `1 + 1/(n-1)`. + /// + /// If consolidation didn't free enough space, at least a linear amount, increase the capacity + /// The `slop` term describes the rate of growth, where we scale up by factors of 1/slop. + /// Setting `slop` to 2 results in doubling whenever the list is at least half full. + /// Larger numbers result in more conservative approaches that use more CPU, but less memory. + growth_denominator: usize, } impl ConsolidatingVec { - pub fn with_min_capacity(min_capacity: usize) -> Self { + /// Creates a new instance from the necessary configuration arguments. + pub fn new(min_capacity: usize, growth_denominator: usize) -> Self { ConsolidatingVec { data: Vec::new(), min_capacity, + growth_denominator, } } @@ -337,11 +354,7 @@ impl ConsolidatingVec { // The vector is full. First, consolidate to try to recover some space. self.consolidate(); - // If consolidation didn't free enough space, at least a linear amount, increase the capacity - // The `slop` term describes the rate of growth, where we scale up by factors of 1/slop. - // Setting `slop` to 2 results in doubling whenever the list is at least half full. - // Larger numbers result in more conservative approaches that use more CPU, but less memory. - let slop = 10; + let slop = self.growth_denominator; if self.data.len() > capacity * (slop - 1) / slop { self.data.reserve(capacity / (slop - 1)); } @@ -391,6 +404,7 @@ impl FromIterator<(D, Diff)> for ConsolidatingVec { Self { data: Vec::from_iter(iter), min_capacity: 0, + growth_denominator: 2, } } } diff --git a/src/compute/src/sink/materialized_view.rs b/src/compute/src/sink/materialized_view.rs index f4f71e1fc91d2..08c283eaf7067 100644 --- a/src/compute/src/sink/materialized_view.rs +++ b/src/compute/src/sink/materialized_view.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use differential_dataflow::lattice::Lattice; use differential_dataflow::{Collection, Hashable}; use futures::StreamExt; +use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DENOMINATOR; use mz_compute_types::dyncfgs::ENABLE_MATERIALIZED_VIEW_SINK_V2; use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection}; use mz_ore::cast::CastFrom; @@ -222,6 +223,8 @@ where compute_state, ); + let growth_denominator = CONSOLIDATING_VEC_GROWTH_DENOMINATOR.get(&compute_state.worker_config); + let (written_batches, write_token) = write_batches( sink_id.clone(), operator_name.clone(), @@ -233,6 +236,7 @@ where &persist_errs, Arc::clone(&persist_clients), compute_state.read_only_rx.clone(), + growth_denominator, ); let append_token = append_batches( @@ -608,6 +612,7 @@ fn write_batches( persist_errs: &Stream, persist_clients: Arc, mut read_only: watch::Receiver, + growth_denominator: usize, ) -> (Stream, Rc) where G: Scope, @@ -667,8 +672,13 @@ where // Contains `desired - persist`, reflecting the updates we would like to commit // to `persist` in order to "correct" it to track `desired`. These collections are // only modified by updates received from either the `desired` or `persist` inputs. - let mut correction_oks = Correction::new(sink_metrics.clone(), sink_worker_metrics.clone()); - let mut correction_errs = Correction::new(sink_metrics, sink_worker_metrics); + let mut correction_oks = Correction::new( + sink_metrics.clone(), + sink_worker_metrics.clone(), + growth_denominator, + ); + let mut correction_errs = + Correction::new(sink_metrics, sink_worker_metrics, growth_denominator); // Contains descriptions of batches for which we know that we can // write data. We got these from the "centralized" operator that diff --git a/src/compute/src/sink/materialized_view_v2.rs b/src/compute/src/sink/materialized_view_v2.rs index 6f849d4cda3b6..1b1b7a8a695e0 100644 --- a/src/compute/src/sink/materialized_view_v2.rs +++ b/src/compute/src/sink/materialized_view_v2.rs @@ -115,6 +115,7 @@ use std::sync::Arc; use differential_dataflow::{Collection, Hashable}; use futures::StreamExt; +use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DENOMINATOR; use mz_ore::cast::CastFrom; use mz_persist_client::batch::{Batch, ProtoBatch}; use mz_persist_client::cache::PersistClientCache; @@ -204,6 +205,8 @@ where &desired, ); + let growth_denominator = CONSOLIDATING_VEC_GROWTH_DENOMINATOR.get(&compute_state.worker_config); + let (batches, write_token) = write::render( sink_id, persist_api.clone(), @@ -211,6 +214,7 @@ where &desired, &persist, &descs, + growth_denominator, ); let append_token = append::render(sink_id, persist_api, active_worker_id, &descs, &batches); @@ -668,6 +672,7 @@ mod write { desired: &DesiredStreams, persist: &PersistStreams, descs: &Stream, + growth_denominator: usize, ) -> (BatchesStream, PressOnDropButton) where S: Scope, @@ -702,7 +707,14 @@ mod write { let writer = persist_api.open_writer().await; let sink_metrics = persist_api.open_metrics().await; - let mut state = State::new(sink_id, worker_id, writer, sink_metrics, as_of); + let mut state = State::new( + sink_id, + worker_id, + writer, + sink_metrics, + as_of, + growth_denominator, + ); loop { // Read from the inputs, extract `desired` updates as positive contributions to @@ -821,6 +833,7 @@ mod write { persist_writer: WriteHandle, metrics: SinkMetrics, as_of: Antichain, + growth_denominator: usize, ) -> Self { let worker_metrics = metrics.for_worker(worker_id); @@ -833,8 +846,8 @@ mod write { worker_id, persist_writer, corrections: OkErr::new( - Correction::new(metrics.clone(), worker_metrics.clone()), - Correction::new(metrics, worker_metrics), + Correction::new(metrics.clone(), worker_metrics.clone(), growth_denominator), + Correction::new(metrics, worker_metrics, growth_denominator), ), desired_frontiers: OkErr::new_frontiers(), persist_frontiers: OkErr::new_frontiers(), From 2e6b956a09c8dd1095e15406946664bd528de1f4 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 21 Jan 2025 13:52:42 -0500 Subject: [PATCH 3/4] Reframe as dampener; adjust logic --- src/compute-types/src/dyncfgs.rs | 13 ++++---- src/compute/src/render/continual_task.rs | 2 +- src/compute/src/sink/correction.rs | 35 +++++++++++--------- src/compute/src/sink/materialized_view.rs | 12 +++---- src/compute/src/sink/materialized_view_v2.rs | 16 ++++----- 5 files changed, 42 insertions(+), 36 deletions(-) diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index 08f45a6cd3a35..b952c5528ea9f 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -99,11 +99,12 @@ pub const LGALLOC_SLOW_CLEAR_BYTES: Config = Config::new( "Clear byte size per size class for every invocation", ); -/// The denominator `n` in the growth rate `1 + 1/(n-1)` for `ConsolidatingVec` growth. -pub const CONSOLIDATING_VEC_GROWTH_DENOMINATOR: Config = Config::new( - "consolidating_vec_growth_denominator", - 2, - "Denominator in growth rate for consolidating vector size", +/// The term `n` in the growth rate `1 + 1/(n + 1)` for `ConsolidatingVec`. +/// The smallest value `0` corresponds to the greatest allowed growth, of doubling. +pub const CONSOLIDATING_VEC_GROWTH_DAMPENER: Config = Config::new( + "consolidating_vec_growth_dampener", + 0, + "Dampener in growth rate for consolidating vector size", ); /// The number of dataflows that may hydrate concurrently. @@ -186,5 +187,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&ENABLE_COMPUTE_REPLICA_EXPIRATION) .add(&COMPUTE_REPLICA_EXPIRATION_OFFSET) .add(&COMPUTE_APPLY_COLUMN_DEMANDS) - .add(&CONSOLIDATING_VEC_GROWTH_DENOMINATOR) + .add(&CONSOLIDATING_VEC_GROWTH_DAMPENER) } diff --git a/src/compute/src/render/continual_task.rs b/src/compute/src/render/continual_task.rs index a8ef6c3d64424..7c336c0cfd8f9 100644 --- a/src/compute/src/render/continual_task.rs +++ b/src/compute/src/render/continual_task.rs @@ -670,7 +670,7 @@ impl SinkState { SinkState { append_times: BTreeSet::new(), append_times_progress: Antichain::from_elem(Timestamp::minimum()), - to_append: ConsolidatingVec::new(128, 2), + to_append: ConsolidatingVec::new(128, 0), to_append_progress: Antichain::from_elem(Timestamp::minimum()), output_progress: Antichain::from_elem(Timestamp::minimum()), } diff --git a/src/compute/src/sink/correction.rs b/src/compute/src/sink/correction.rs index 45757c2b034d1..37acb84cf8a9c 100644 --- a/src/compute/src/sink/correction.rs +++ b/src/compute/src/sink/correction.rs @@ -47,8 +47,8 @@ pub(super) struct Correction { metrics: SinkMetrics, /// Per-worker persist sink metrics. worker_metrics: SinkWorkerMetrics, - /// Configuration for `ConsolidatingVec` determining the rate of growth (doubling, or less). - growth_denominator: usize, + /// Configuration for `ConsolidatingVec` driving the growth rate down from doubling. + growth_dampener: usize, } impl Correction { @@ -56,7 +56,7 @@ impl Correction { pub fn new( metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics, - growth_denominator: usize, + growth_dampener: usize, ) -> Self { Self { updates: Default::default(), @@ -64,7 +64,7 @@ impl Correction { total_size: Default::default(), metrics, worker_metrics, - growth_denominator, + growth_dampener, } } @@ -133,7 +133,7 @@ impl Correction { match self.updates.entry(time) { Entry::Vacant(entry) => { let mut vec: ConsolidatingVec<_> = data.collect(); - vec.growth_denominator = self.growth_denominator; + vec.growth_dampener = self.growth_dampener; new_size += (vec.len(), vec.capacity()); entry.insert(vec); } @@ -312,22 +312,21 @@ pub(crate) struct ConsolidatingVec { /// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap /// might start smaller than this. min_capacity: usize, - /// Denominator in the growth rate, where 2 corresponds to doubling, and `n` to `1 + 1/(n-1)`. + /// Dampener in the growth rate. 0 corresponds to doubling and in general `n` to `1+1/(n+1)`. /// /// If consolidation didn't free enough space, at least a linear amount, increase the capacity - /// The `slop` term describes the rate of growth, where we scale up by factors of 1/slop. - /// Setting `slop` to 2 results in doubling whenever the list is at least half full. + /// Setting this to 0 results in doubling whenever the list is at least half full. /// Larger numbers result in more conservative approaches that use more CPU, but less memory. - growth_denominator: usize, + growth_dampener: usize, } impl ConsolidatingVec { /// Creates a new instance from the necessary configuration arguments. - pub fn new(min_capacity: usize, growth_denominator: usize) -> Self { + pub fn new(min_capacity: usize, growth_dampener: usize) -> Self { ConsolidatingVec { data: Vec::new(), min_capacity, - growth_denominator, + growth_dampener, } } @@ -354,9 +353,15 @@ impl ConsolidatingVec { // The vector is full. First, consolidate to try to recover some space. self.consolidate(); - let slop = self.growth_denominator; - if self.data.len() > capacity * (slop - 1) / slop { - self.data.reserve(capacity / (slop - 1)); + // We may need more capacity if our current capacity is within `1+1/(n+1)` of the length. + // This corresponds to `cap < len + len/(n+1)`, which is the logic we use. + let length = self.data.len(); + let dampener = self.growth_dampener; + if capacity < length + length / (dampener + 1) { + // Increase as a function of capacity rather than length. + // When `capacity` is smaller than `dampener+1` this may have no effect, and we rely + // instead of standard `Vec` doubling to get the capacity past `dampener+1`. + self.data.reserve_exact(capacity / (dampener + 1)); } } @@ -404,7 +409,7 @@ impl FromIterator<(D, Diff)> for ConsolidatingVec { Self { data: Vec::from_iter(iter), min_capacity: 0, - growth_denominator: 2, + growth_dampener: 0, } } } diff --git a/src/compute/src/sink/materialized_view.rs b/src/compute/src/sink/materialized_view.rs index 08c283eaf7067..a38b9ff4a73c8 100644 --- a/src/compute/src/sink/materialized_view.rs +++ b/src/compute/src/sink/materialized_view.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use differential_dataflow::lattice::Lattice; use differential_dataflow::{Collection, Hashable}; use futures::StreamExt; -use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DENOMINATOR; +use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DAMPENER; use mz_compute_types::dyncfgs::ENABLE_MATERIALIZED_VIEW_SINK_V2; use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection}; use mz_ore::cast::CastFrom; @@ -223,7 +223,7 @@ where compute_state, ); - let growth_denominator = CONSOLIDATING_VEC_GROWTH_DENOMINATOR.get(&compute_state.worker_config); + let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(&compute_state.worker_config); let (written_batches, write_token) = write_batches( sink_id.clone(), @@ -236,7 +236,7 @@ where &persist_errs, Arc::clone(&persist_clients), compute_state.read_only_rx.clone(), - growth_denominator, + growth_dampener, ); let append_token = append_batches( @@ -612,7 +612,7 @@ fn write_batches( persist_errs: &Stream, persist_clients: Arc, mut read_only: watch::Receiver, - growth_denominator: usize, + growth_dampener: usize, ) -> (Stream, Rc) where G: Scope, @@ -675,10 +675,10 @@ where let mut correction_oks = Correction::new( sink_metrics.clone(), sink_worker_metrics.clone(), - growth_denominator, + growth_dampener, ); let mut correction_errs = - Correction::new(sink_metrics, sink_worker_metrics, growth_denominator); + Correction::new(sink_metrics, sink_worker_metrics, growth_dampener); // Contains descriptions of batches for which we know that we can // write data. We got these from the "centralized" operator that diff --git a/src/compute/src/sink/materialized_view_v2.rs b/src/compute/src/sink/materialized_view_v2.rs index 1b1b7a8a695e0..6d2e603fa1e8a 100644 --- a/src/compute/src/sink/materialized_view_v2.rs +++ b/src/compute/src/sink/materialized_view_v2.rs @@ -115,7 +115,7 @@ use std::sync::Arc; use differential_dataflow::{Collection, Hashable}; use futures::StreamExt; -use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DENOMINATOR; +use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DAMPENER; use mz_ore::cast::CastFrom; use mz_persist_client::batch::{Batch, ProtoBatch}; use mz_persist_client::cache::PersistClientCache; @@ -205,7 +205,7 @@ where &desired, ); - let growth_denominator = CONSOLIDATING_VEC_GROWTH_DENOMINATOR.get(&compute_state.worker_config); + let growth_dampener = CONSOLIDATING_VEC_GROWTH_DAMPENER.get(&compute_state.worker_config); let (batches, write_token) = write::render( sink_id, @@ -214,7 +214,7 @@ where &desired, &persist, &descs, - growth_denominator, + growth_dampener, ); let append_token = append::render(sink_id, persist_api, active_worker_id, &descs, &batches); @@ -672,7 +672,7 @@ mod write { desired: &DesiredStreams, persist: &PersistStreams, descs: &Stream, - growth_denominator: usize, + growth_dampener: usize, ) -> (BatchesStream, PressOnDropButton) where S: Scope, @@ -713,7 +713,7 @@ mod write { writer, sink_metrics, as_of, - growth_denominator, + growth_dampener, ); loop { @@ -833,7 +833,7 @@ mod write { persist_writer: WriteHandle, metrics: SinkMetrics, as_of: Antichain, - growth_denominator: usize, + growth_dampener: usize, ) -> Self { let worker_metrics = metrics.for_worker(worker_id); @@ -846,8 +846,8 @@ mod write { worker_id, persist_writer, corrections: OkErr::new( - Correction::new(metrics.clone(), worker_metrics.clone(), growth_denominator), - Correction::new(metrics, worker_metrics, growth_denominator), + Correction::new(metrics.clone(), worker_metrics.clone(), growth_dampener), + Correction::new(metrics, worker_metrics, growth_dampener), ), desired_frontiers: OkErr::new_frontiers(), persist_frontiers: OkErr::new_frontiers(), From 575d62d4de8a0b9e3fad8d1592cc07f9cae7d1fa Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 21 Jan 2025 14:02:41 -0500 Subject: [PATCH 4/4] Target a specific capacity --- src/compute/src/sink/correction.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/compute/src/sink/correction.rs b/src/compute/src/sink/correction.rs index 37acb84cf8a9c..a59a9a68bc32c 100644 --- a/src/compute/src/sink/correction.rs +++ b/src/compute/src/sink/correction.rs @@ -358,10 +358,11 @@ impl ConsolidatingVec { let length = self.data.len(); let dampener = self.growth_dampener; if capacity < length + length / (dampener + 1) { - // Increase as a function of capacity rather than length. - // When `capacity` is smaller than `dampener+1` this may have no effect, and we rely - // instead of standard `Vec` doubling to get the capacity past `dampener+1`. - self.data.reserve_exact(capacity / (dampener + 1)); + // We would like to increase the capacity by a factor of `1+1/(n+1)`, which involves + // determining the target capacity, and then reserving an amount that achieves this + // while working around the existing length. + let new_cap = capacity + capacity / (dampener + 1); + self.data.reserve_exact(new_cap - length); } }