Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: consolidate AllowCompaction commands #31085

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,9 @@ where
self.replica_tx.clone(),
);

// Take this opportunity to clean up the history we should present.
// Apply all pending compaction commands and reduce the history to remove dataflows that
// have already been dropped and advance others to their most current as-of.
self.send_allow_compaction();
self.history.reduce();

// Replay the commands at the client, creating new dataflow identifiers.
Expand Down Expand Up @@ -1723,11 +1725,19 @@ where
.expect("frontiers don't regress");
}

// Produce `AllowCompaction` command.
self.send(ComputeCommand::AllowCompaction {
id,
frontier: new_since,
});
// Update the allowed compaction for this collection.
// We defer reporting of non-empty compaction frontiers, to reduce churn when read holds
// are downgraded multiple times. Empty frontiers we report immediately, to ensure
// dataflows are cleaned up as quickly as possible.
if new_since.is_empty() {
collection.allowed_compaction = None;
self.send(ComputeCommand::AllowCompaction {
id,
frontier: new_since,
})
} else {
collection.allowed_compaction = Some(new_since);
}
}

/// Fulfills a registered peek and cleans up associated state.
Expand Down Expand Up @@ -2060,6 +2070,26 @@ where
}
}

/// Send pending `AllowCompaction` commands.
///
/// To reduce network traffic and make the command stream less noisy,
/// [`Instance::apply_read_hold_change`] doesn't immediately emit an `AllowCompaction` command
/// when a read frontier changes, but instead stashes the new frontier. If another read
/// frontier downgrade occurs before the next call to this method, it overrides the previous
/// read frontier and only one `AllowCompaction` commands needs to be sent, instead of two.
fn send_allow_compaction(&mut self) {
let mut commands = Vec::with_capacity(self.collections.len());
for (&id, collection) in &mut self.collections {
if let Some(frontier) = collection.allowed_compaction.take() {
commands.push(ComputeCommand::AllowCompaction { id, frontier });
}
}

for command in commands {
self.send(command);
}
}

/// Process pending maintenance work.
///
/// This method is invoked periodically by the global controller.
Expand All @@ -2069,6 +2099,7 @@ where
pub fn maintain(&mut self) {
self.rehydrate_failed_replicas();
self.downgrade_warmup_capabilities();
self.send_allow_compaction();
self.schedule_collections();
self.cleanup_collections();
self.update_frontier_introspection();
Expand Down Expand Up @@ -2131,6 +2162,8 @@ struct CollectionState<T: ComputeControllerTimestamp> {

/// Introspection state associated with this collection.
introspection: CollectionIntrospection<T>,
/// Allowed compaction that has yet to be communicated to replicas.
allowed_compaction: Option<Antichain<T>>,
}

impl<T: ComputeControllerTimestamp> CollectionState<T> {
Expand Down Expand Up @@ -2176,6 +2209,7 @@ impl<T: ComputeControllerTimestamp> CollectionState<T> {
storage_dependencies,
compute_dependencies,
introspection,
allowed_compaction: None,
}
}

Expand Down
Loading