Skip to content

Commit

Permalink
compute: consolidate AllowCompaction commands
Browse files Browse the repository at this point in the history
Defer the transmission of AllowCompaction commands to periodic
controller maintenance, to reduce their volume and remove some noise
from the command stream.
  • Loading branch information
teskje committed Jan 17, 2025
1 parent 843f53b commit 1f358e6
Showing 1 changed file with 26 additions and 5 deletions.
31 changes: 26 additions & 5 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1723,11 +1723,8 @@ where
.expect("frontiers don't regress");
}

// Produce `AllowCompaction` command.
self.send(ComputeCommand::AllowCompaction {
id,
frontier: new_since,
});
// Update the allowed compaction for this collection.
collection.allowed_compaction = Some(new_since);
}

/// Fulfills a registered peek and cleans up associated state.
Expand Down Expand Up @@ -2060,6 +2057,26 @@ where
}
}

/// Send pending `AllowCompaction` commands.
///
/// To reduce network traffic and make the command stream less noisy,
/// [`Instance::apply_read_hold_change`] doesn't immediately emit an `AllowCompaction` command
/// when a read frontier changes, but instead stashes the new frontier. If another read
/// frontier downgrade occurs before the next call to this method, it overrides the previous
/// read frontier and only one `AllowCompaction` commands needs to be sent, instead of two.
fn send_allow_compaction(&mut self) {
let mut commands = Vec::with_capacity(self.collections.len());
for (&id, collection) in &mut self.collections {
if let Some(frontier) = collection.allowed_compaction.take() {
commands.push(ComputeCommand::AllowCompaction { id, frontier });
}
}

for command in commands {
self.send(command);
}
}

/// Process pending maintenance work.
///
/// This method is invoked periodically by the global controller.
Expand All @@ -2069,6 +2086,7 @@ where
pub fn maintain(&mut self) {
self.rehydrate_failed_replicas();
self.downgrade_warmup_capabilities();
self.send_allow_compaction();
self.schedule_collections();
self.cleanup_collections();
self.update_frontier_introspection();
Expand Down Expand Up @@ -2131,6 +2149,8 @@ struct CollectionState<T: ComputeControllerTimestamp> {

/// Introspection state associated with this collection.
introspection: CollectionIntrospection<T>,
/// Allowed compaction that has yet to be communicated to replicas.
allowed_compaction: Option<Antichain<T>>,
}

impl<T: ComputeControllerTimestamp> CollectionState<T> {
Expand Down Expand Up @@ -2176,6 +2196,7 @@ impl<T: ComputeControllerTimestamp> CollectionState<T> {
storage_dependencies,
compute_dependencies,
introspection,
allowed_compaction: None,
}
}

Expand Down

0 comments on commit 1f358e6

Please sign in to comment.