Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow the rate of growth in ConsolidatingVec #31077

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,13 @@ pub const LGALLOC_SLOW_CLEAR_BYTES: Config<usize> = Config::new(
"Clear byte size per size class for every invocation",
);

/// The denominator `n` in the growth rate `1 + 1/(n-1)` for `ConsolidatingVec` growth.
pub const CONSOLIDATING_VEC_GROWTH_DENOMINATOR: Config<usize> = Config::new(
"consolidating_vec_growth_denominator",
2,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be okay with defaulting this to our target (11?) and then opting out were we observe issues. Seems safe enough, considering that this trades of less memory for more CPU and we usually don't have CPU issues.

"Denominator in growth rate for consolidating vector size",
);

/// The number of dataflows that may hydrate concurrently.
pub const HYDRATION_CONCURRENCY: Config<usize> = Config::new(
"compute_hydration_concurrency",
Expand Down Expand Up @@ -179,4 +186,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&ENABLE_COMPUTE_REPLICA_EXPIRATION)
.add(&COMPUTE_REPLICA_EXPIRATION_OFFSET)
.add(&COMPUTE_APPLY_COLUMN_DEMANDS)
.add(&CONSOLIDATING_VEC_GROWTH_DENOMINATOR)
}
2 changes: 1 addition & 1 deletion src/compute/src/render/continual_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,7 +670,7 @@ impl<D: Ord> SinkState<D, Timestamp> {
SinkState {
append_times: BTreeSet::new(),
append_times_progress: Antichain::from_elem(Timestamp::minimum()),
to_append: ConsolidatingVec::with_min_capacity(128),
to_append: ConsolidatingVec::new(128, 2),
to_append_progress: Antichain::from_elem(Timestamp::minimum()),
output_progress: Antichain::from_elem(Timestamp::minimum()),
}
Expand Down
39 changes: 28 additions & 11 deletions src/compute/src/sink/correction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,24 @@ pub(super) struct Correction<D> {
metrics: SinkMetrics,
/// Per-worker persist sink metrics.
worker_metrics: SinkWorkerMetrics,
/// Configuration for `ConsolidatingVec` determining the rate of growth (doubling, or less).
growth_denominator: usize,
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't anything that enforces "doubling or less" here, right? People could set the config to 1 or 0 and end up having a bad time. How about checking that in new and falling back to 2 on invalid values? Alternatively, redefine the semantics to have 0 mean doubling.

}

impl<D> Correction<D> {
/// Construct a new `Correction` instance.
pub fn new(metrics: SinkMetrics, worker_metrics: SinkWorkerMetrics) -> Self {
pub fn new(
metrics: SinkMetrics,
worker_metrics: SinkWorkerMetrics,
growth_denominator: usize,
) -> Self {
Self {
updates: Default::default(),
since: Antichain::from_elem(Timestamp::MIN),
total_size: Default::default(),
metrics,
worker_metrics,
growth_denominator,
}
}

Expand Down Expand Up @@ -125,7 +132,8 @@ impl<D: Data> Correction<D> {
use std::collections::btree_map::Entry;
match self.updates.entry(time) {
Entry::Vacant(entry) => {
let vec: ConsolidatingVec<_> = data.collect();
let mut vec: ConsolidatingVec<_> = data.collect();
vec.growth_denominator = self.growth_denominator;
new_size += (vec.len(), vec.capacity());
entry.insert(vec);
}
Expand Down Expand Up @@ -304,13 +312,22 @@ pub(crate) struct ConsolidatingVec<D> {
/// A lower bound for how small we'll shrink the Vec's capacity. NB: The cap
/// might start smaller than this.
min_capacity: usize,
/// Denominator in the growth rate, where 2 corresponds to doubling, and `n` to `1 + 1/(n-1)`.
///
/// If consolidation didn't free enough space, at least a linear amount, increase the capacity
/// The `slop` term describes the rate of growth, where we scale up by factors of 1/slop.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// The `slop` term describes the rate of growth, where we scale up by factors of 1/slop.
/// The `slop` term describes the rate of growth, where we scale up by factors of 1/(slop-1).

... right?

/// Setting `slop` to 2 results in doubling whenever the list is at least half full.
/// Larger numbers result in more conservative approaches that use more CPU, but less memory.
growth_denominator: usize,
}

impl<D: Ord> ConsolidatingVec<D> {
pub fn with_min_capacity(min_capacity: usize) -> Self {
/// Creates a new instance from the necessary configuration arguments.
pub fn new(min_capacity: usize, growth_denominator: usize) -> Self {
ConsolidatingVec {
data: Vec::new(),
min_capacity,
growth_denominator,
}
}

Expand All @@ -326,21 +343,20 @@ impl<D: Ord> ConsolidatingVec<D> {

/// Pushes `item` into the vector.
///
/// If the vector does not have sufficient capacity, we try to consolidate and/or double its
/// capacity.
/// If the vector does not have sufficient capacity, we'll first consolidate and then increase
/// its capacity if the consolidated results still occupy a significant fraction of the vector.
///
/// The worst-case cost of this function is O(n log n) in the number of items the vector stores,
/// but amortizes to O(1).
/// but amortizes to O(log n).
pub fn push(&mut self, item: (D, Diff)) {
let capacity = self.data.capacity();
if self.data.len() == capacity {
// The vector is full. First, consolidate to try to recover some space.
self.consolidate();

// If consolidation didn't free at least half the available capacity, double the
// capacity. This ensures we won't consolidate over and over again with small gains.
if self.data.len() > capacity / 2 {
self.data.reserve(capacity);
let slop = self.growth_denominator;
if self.data.len() > capacity * (slop - 1) / slop {
self.data.reserve(capacity / (slop - 1));
}
}

Expand All @@ -352,7 +368,7 @@ impl<D: Ord> ConsolidatingVec<D> {
consolidate(&mut self.data);

// We may have the opportunity to reclaim allocated memory.
// Given that `push` will double the capacity when the vector is more than half full, and
// Given that `push` will at most double the capacity when the vector is more than half full, and
// we want to avoid entering into a resizing cycle, we choose to only shrink if the
// vector's length is less than one fourth of its capacity.
if self.data.len() < self.data.capacity() / 4 {
Expand Down Expand Up @@ -388,6 +404,7 @@ impl<D> FromIterator<(D, Diff)> for ConsolidatingVec<D> {
Self {
data: Vec::from_iter(iter),
min_capacity: 0,
growth_denominator: 2,
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/compute/src/sink/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::{Collection, Hashable};
use futures::StreamExt;
use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DENOMINATOR;
use mz_compute_types::dyncfgs::ENABLE_MATERIALIZED_VIEW_SINK_V2;
use mz_compute_types::sinks::{ComputeSinkDesc, MaterializedViewSinkConnection};
use mz_ore::cast::CastFrom;
Expand Down Expand Up @@ -222,6 +223,8 @@ where
compute_state,
);

let growth_denominator = CONSOLIDATING_VEC_GROWTH_DENOMINATOR.get(&compute_state.worker_config);

let (written_batches, write_token) = write_batches(
sink_id.clone(),
operator_name.clone(),
Expand All @@ -233,6 +236,7 @@ where
&persist_errs,
Arc::clone(&persist_clients),
compute_state.read_only_rx.clone(),
growth_denominator,
);

let append_token = append_batches(
Expand Down Expand Up @@ -608,6 +612,7 @@ fn write_batches<G>(
persist_errs: &Stream<G, (DataflowError, Timestamp, Diff)>,
persist_clients: Arc<PersistClientCache>,
mut read_only: watch::Receiver<bool>,
growth_denominator: usize,
) -> (Stream<G, ProtoBatch>, Rc<dyn Any>)
where
G: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -667,8 +672,13 @@ where
// Contains `desired - persist`, reflecting the updates we would like to commit
// to `persist` in order to "correct" it to track `desired`. These collections are
// only modified by updates received from either the `desired` or `persist` inputs.
let mut correction_oks = Correction::new(sink_metrics.clone(), sink_worker_metrics.clone());
let mut correction_errs = Correction::new(sink_metrics, sink_worker_metrics);
let mut correction_oks = Correction::new(
sink_metrics.clone(),
sink_worker_metrics.clone(),
growth_denominator,
);
let mut correction_errs =
Correction::new(sink_metrics, sink_worker_metrics, growth_denominator);

// Contains descriptions of batches for which we know that we can
// write data. We got these from the "centralized" operator that
Expand Down
19 changes: 16 additions & 3 deletions src/compute/src/sink/materialized_view_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use std::sync::Arc;

use differential_dataflow::{Collection, Hashable};
use futures::StreamExt;
use mz_compute_types::dyncfgs::CONSOLIDATING_VEC_GROWTH_DENOMINATOR;
use mz_ore::cast::CastFrom;
use mz_persist_client::batch::{Batch, ProtoBatch};
use mz_persist_client::cache::PersistClientCache;
Expand Down Expand Up @@ -204,13 +205,16 @@ where
&desired,
);

let growth_denominator = CONSOLIDATING_VEC_GROWTH_DENOMINATOR.get(&compute_state.worker_config);

let (batches, write_token) = write::render(
sink_id,
persist_api.clone(),
as_of,
&desired,
&persist,
&descs,
growth_denominator,
);

let append_token = append::render(sink_id, persist_api, active_worker_id, &descs, &batches);
Expand Down Expand Up @@ -668,6 +672,7 @@ mod write {
desired: &DesiredStreams<S>,
persist: &PersistStreams<S>,
descs: &Stream<S, BatchDescription>,
growth_denominator: usize,
) -> (BatchesStream<S>, PressOnDropButton)
where
S: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -702,7 +707,14 @@ mod write {

let writer = persist_api.open_writer().await;
let sink_metrics = persist_api.open_metrics().await;
let mut state = State::new(sink_id, worker_id, writer, sink_metrics, as_of);
let mut state = State::new(
sink_id,
worker_id,
writer,
sink_metrics,
as_of,
growth_denominator,
);

loop {
// Read from the inputs, extract `desired` updates as positive contributions to
Expand Down Expand Up @@ -821,6 +833,7 @@ mod write {
persist_writer: WriteHandle<SourceData, (), Timestamp, Diff>,
metrics: SinkMetrics,
as_of: Antichain<Timestamp>,
growth_denominator: usize,
) -> Self {
let worker_metrics = metrics.for_worker(worker_id);

Expand All @@ -833,8 +846,8 @@ mod write {
worker_id,
persist_writer,
corrections: OkErr::new(
Correction::new(metrics.clone(), worker_metrics.clone()),
Correction::new(metrics, worker_metrics),
Correction::new(metrics.clone(), worker_metrics.clone(), growth_denominator),
Correction::new(metrics, worker_metrics, growth_denominator),
),
desired_frontiers: OkErr::new_frontiers(),
persist_frontiers: OkErr::new_frontiers(),
Expand Down
Loading