Skip to content

Commit

Permalink
compute: consolidate AllowCompaction commands
Browse files Browse the repository at this point in the history
Defer the transmission of AllowCompaction commands to periodic
controller maintenance, to reduce their volume and remove some noise
from the command stream.
  • Loading branch information
teskje committed Jan 17, 2025
1 parent 843f53b commit e8407de
Showing 1 changed file with 40 additions and 6 deletions.
46 changes: 40 additions & 6 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,9 @@ where
self.replica_tx.clone(),
);

// Take this opportunity to clean up the history we should present.
// Apply all pending compaction commands and reduce the history to remove dataflows that
// have already been dropped and advance others to their most current as-of.
self.send_allow_compaction();
self.history.reduce();

// Replay the commands at the client, creating new dataflow identifiers.
Expand Down Expand Up @@ -1723,11 +1725,19 @@ where
.expect("frontiers don't regress");
}

// Produce `AllowCompaction` command.
self.send(ComputeCommand::AllowCompaction {
id,
frontier: new_since,
});
// Update the allowed compaction for this collection.
// We defer reporting of non-empty compaction frontiers, to reduce churn when read holds
// are downgraded multiple times. Empty frontiers we report immediately, to ensure
// dataflows are cleaned up as quickly as possible.
if new_since.is_empty() {
collection.allowed_compaction = None;
self.send(ComputeCommand::AllowCompaction {
id,
frontier: new_since,
})
} else {
collection.allowed_compaction = Some(new_since);
}
}

/// Fulfills a registered peek and cleans up associated state.
Expand Down Expand Up @@ -2060,6 +2070,26 @@ where
}
}

/// Send pending `AllowCompaction` commands.
///
/// To reduce network traffic and make the command stream less noisy,
/// [`Instance::apply_read_hold_change`] doesn't immediately emit an `AllowCompaction` command
/// when a read frontier changes, but instead stashes the new frontier. If another read
/// frontier downgrade occurs before the next call to this method, it overrides the previous
/// read frontier and only one `AllowCompaction` commands needs to be sent, instead of two.
fn send_allow_compaction(&mut self) {
let mut commands = Vec::with_capacity(self.collections.len());
for (&id, collection) in &mut self.collections {
if let Some(frontier) = collection.allowed_compaction.take() {
commands.push(ComputeCommand::AllowCompaction { id, frontier });
}
}

for command in commands {
self.send(command);
}
}

/// Process pending maintenance work.
///
/// This method is invoked periodically by the global controller.
Expand All @@ -2069,6 +2099,7 @@ where
pub fn maintain(&mut self) {
self.rehydrate_failed_replicas();
self.downgrade_warmup_capabilities();
self.send_allow_compaction();
self.schedule_collections();
self.cleanup_collections();
self.update_frontier_introspection();
Expand Down Expand Up @@ -2131,6 +2162,8 @@ struct CollectionState<T: ComputeControllerTimestamp> {

/// Introspection state associated with this collection.
introspection: CollectionIntrospection<T>,
/// Allowed compaction that has yet to be communicated to replicas.
allowed_compaction: Option<Antichain<T>>,
}

impl<T: ComputeControllerTimestamp> CollectionState<T> {
Expand Down Expand Up @@ -2176,6 +2209,7 @@ impl<T: ComputeControllerTimestamp> CollectionState<T> {
storage_dependencies,
compute_dependencies,
introspection,
allowed_compaction: None,
}
}

Expand Down

0 comments on commit e8407de

Please sign in to comment.