-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slow the rate of growth in ConsolidatingVec
#31077
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -47,17 +47,24 @@ pub(super) struct Correction<D> { | |||||
metrics: SinkMetrics, | ||||||
/// Per-worker persist sink metrics. | ||||||
worker_metrics: SinkWorkerMetrics, | ||||||
/// Configuration for `ConsolidatingVec` determining the rate of growth (doubling, or less). | ||||||
growth_denominator: usize, | ||||||
Comment on lines
+50
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There isn't anything that enforces "doubling or less" here, right? People could set the config to |
||||||
} | ||||||
|
||||||
impl<D> Correction<D> { | ||||||
/// Construct a new `Correction` instance. | ||||||
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self { | ||||||
pub fn new( | ||||||
metrics: SinkMetrics, | ||||||
worker_metrics: SinkWorkerMetrics, | ||||||
growth_denominator: usize, | ||||||
) -> Self { | ||||||
Self { | ||||||
updates: Default::default(), | ||||||
since: Antichain::from_elem(Timestamp::MIN), | ||||||
total_size: Default::default(), | ||||||
metrics, | ||||||
worker_metrics, | ||||||
growth_denominator, | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -125,7 +132,8 @@ impl<D: Data> Correction<D> { | |||||
use std::collections::btree_map::Entry; | ||||||
match self.updates.entry(time) { | ||||||
Entry::Vacant(entry) => { | ||||||
let vec: ConsolidatingVec<_> = data.collect(); | ||||||
let mut vec: ConsolidatingVec<_> = data.collect(); | ||||||
vec.growth_denominator = self.growth_denominator; | ||||||
new_size += (vec.len(), vec.capacity()); | ||||||
entry.insert(vec); | ||||||
} | ||||||
|
@@ -304,13 +312,22 @@ pub(crate) struct ConsolidatingVec<D> { | |||||
/// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap | ||||||
/// might start smaller than this. | ||||||
min_capacity: usize, | ||||||
/// Denominator in the growth rate, where 2 corresponds to doubling, and `n` to `1 + 1/(n-1)`. | ||||||
/// | ||||||
/// If consolidation didn't free enough space, at least a linear amount, increase the capacity | ||||||
/// The `slop` term describes the rate of growth, where we scale up by factors of 1/slop. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
... right? |
||||||
/// Setting `slop` to 2 results in doubling whenever the list is at least half full. | ||||||
/// Larger numbers result in more conservative approaches that use more CPU, but less memory. | ||||||
growth_denominator: usize, | ||||||
} | ||||||
|
||||||
impl<D: Ord> ConsolidatingVec<D> { | ||||||
pub fn with_min_capacity(min_capacity: usize) -> Self { | ||||||
/// Creates a new instance from the necessary configuration arguments. | ||||||
pub fn new(min_capacity: usize, growth_denominator: usize) -> Self { | ||||||
ConsolidatingVec { | ||||||
data: Vec::new(), | ||||||
min_capacity, | ||||||
growth_denominator, | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -326,21 +343,20 @@ impl<D: Ord> ConsolidatingVec<D> { | |||||
|
||||||
/// Pushes `item` into the vector. | ||||||
/// | ||||||
/// If the vector does not have sufficient capacity, we try to consolidate and/or double its | ||||||
/// capacity. | ||||||
/// If the vector does not have sufficient capacity, we'll first consolidate and then increase | ||||||
/// its capacity if the consolidated results still occupy a significant fraction of the vector. | ||||||
/// | ||||||
/// The worst-case cost of this function is O(n log n) in the number of items the vector stores, | ||||||
/// but amortizes to O(1). | ||||||
/// but amortizes to O(log n). | ||||||
pub fn push(&mut self, item: (D, Diff)) { | ||||||
let capacity = self.data.capacity(); | ||||||
if self.data.len() == capacity { | ||||||
// The vector is full. First, consolidate to try to recover some space. | ||||||
self.consolidate(); | ||||||
|
||||||
// If consolidation didn't free at least half the available capacity, double the | ||||||
// capacity. This ensures we won't consolidate over and over again with small gains. | ||||||
if self.data.len() > capacity / 2 { | ||||||
self.data.reserve(capacity); | ||||||
let slop = self.growth_denominator; | ||||||
if self.data.len() > capacity * (slop - 1) / slop { | ||||||
self.data.reserve(capacity / (slop - 1)); | ||||||
} | ||||||
} | ||||||
|
||||||
|
@@ -352,7 +368,7 @@ impl<D: Ord> ConsolidatingVec<D> { | |||||
consolidate(&mut self.data); | ||||||
|
||||||
// We may have the opportunity to reclaim allocated memory. | ||||||
// Given that `push` will double the capacity when the vector is more than half full, and | ||||||
// Given that `push` will at most double the capacity when the vector is more than half full, and | ||||||
// we want to avoid entering into a resizing cycle, we choose to only shrink if the | ||||||
// vector's length is less than one fourth of its capacity. | ||||||
if self.data.len() < self.data.capacity() / 4 { | ||||||
|
@@ -388,6 +404,7 @@ impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> { | |||||
Self { | ||||||
data: Vec::from_iter(iter), | ||||||
min_capacity: 0, | ||||||
growth_denominator: 2, | ||||||
} | ||||||
} | ||||||
} | ||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be okay with defaulting this to our target (
11
?) and then opting out were we observe issues. Seems safe enough, considering that this trades of less memory for more CPU and we usually don't have CPU issues.