diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index d5a436cb56a29..b74a8fab13023 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -1066,7 +1066,9 @@ where self.replica_tx.clone(), ); - // Take this opportunity to clean up the history we should present. + // Apply all pending compaction commands and reduce the history to remove dataflows that + // have already been dropped and advance others to their most current as-of. + self.send_allow_compaction(); self.history.reduce(); // Replay the commands at the client, creating new dataflow identifiers. @@ -1723,11 +1725,19 @@ where .expect("frontiers don't regress"); } - // Produce `AllowCompaction` command. - self.send(ComputeCommand::AllowCompaction { - id, - frontier: new_since, - }); + // Update the allowed compaction for this collection. + // We defer reporting of non-empty compaction frontiers, to reduce churn when read holds + // are downgraded multiple times. Empty frontiers we report immediately, to ensure + // dataflows are cleaned up as quickly as possible. + if new_since.is_empty() { + collection.allowed_compaction = None; + self.send(ComputeCommand::AllowCompaction { + id, + frontier: new_since, + }) + } else { + collection.allowed_compaction = Some(new_since); + } } /// Fulfills a registered peek and cleans up associated state. @@ -2060,6 +2070,26 @@ where } } + /// Send pending `AllowCompaction` commands. + /// + /// To reduce network traffic and make the command stream less noisy, + /// [`Instance::apply_read_hold_change`] doesn't immediately emit an `AllowCompaction` command + /// when a read frontier changes, but instead stashes the new frontier. If another read + /// frontier downgrade occurs before the next call to this method, it overrides the previous + /// read frontier and only one `AllowCompaction` commands needs to be sent, instead of two. + fn send_allow_compaction(&mut self) { + let mut commands = Vec::with_capacity(self.collections.len()); + for (&id, collection) in &mut self.collections { + if let Some(frontier) = collection.allowed_compaction.take() { + commands.push(ComputeCommand::AllowCompaction { id, frontier }); + } + } + + for command in commands { + self.send(command); + } + } + /// Process pending maintenance work. /// /// This method is invoked periodically by the global controller. @@ -2069,6 +2099,7 @@ where pub fn maintain(&mut self) { self.rehydrate_failed_replicas(); self.downgrade_warmup_capabilities(); + self.send_allow_compaction(); self.schedule_collections(); self.cleanup_collections(); self.update_frontier_introspection(); @@ -2131,6 +2162,8 @@ struct CollectionState { /// Introspection state associated with this collection. introspection: CollectionIntrospection, + /// Allowed compaction that has yet to be communicated to replicas. + allowed_compaction: Option>, } impl CollectionState { @@ -2176,6 +2209,7 @@ impl CollectionState { storage_dependencies, compute_dependencies, introspection, + allowed_compaction: None, } }