Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cool down interval between restarts #10

Open
wants to merge 1 commit into
base: rackrolling-update
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ private Future<Void> maybeRollKafkaKraft(Set<NodeRef> nodes,
kafka.getKafkaVersion(),
logging,
operationTimeoutMs,
0L,
1,
3,
3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ private void restartInParallel(Reconciliation reconciliation,
AgentClient agentClient,
Set<Context> batch,
long timeoutMs,
long postRestartDelayMs,
int maxRestarts) throws TimeoutException {
for (Context context : batch) {
restartNode(reconciliation, time, platformClient, context, maxRestarts);
Expand All @@ -370,6 +371,8 @@ private void restartInParallel(Reconciliation reconciliation,
}
}
}
//added delay between batch restarts which is configurable
time.sleep(postRestartDelayMs, 0);
}

private static Map<Plan, List<Context>> refinePlanForReconfigurability(Reconciliation reconciliation,
Expand Down Expand Up @@ -486,7 +489,8 @@ enum Plan {
* @param kafkaConfigProvider Kafka configuration provider
* @param totalNumOfControllerNodes The total number of controller nodes
* @param kafkaLogging Kafka logging configuration
* @param postOperationTimeoutMs The maximum time in milliseconds to wait after a restart or reconfigure
* @param postOperationTimeoutMs The maximum time in milliseconds to wait after a restart or reconfigur
* @param postRestartDelayMs Cooldown delay for next rolling restart
* @param maxRestartBatchSize The maximum number of nodes that might be restarted at once
* @param maxRestarts The maximum number of restart that can be done for a node
* @param maxReconfigs The maximum number of reconfiguration that can be done for a node
Expand All @@ -507,6 +511,7 @@ public static RackRolling rollingRestart(PodOperator podOperator,
KafkaVersion kafkaVersion,
String kafkaLogging,
long postOperationTimeoutMs,
long postRestartDelayMs,
int maxRestartBatchSize,
int maxRestarts,
int maxReconfigs,
Expand All @@ -530,6 +535,7 @@ public static RackRolling rollingRestart(PodOperator podOperator,
kafkaConfigProvider,
kafkaLogging,
postOperationTimeoutMs,
postRestartDelayMs,
maxRestartBatchSize,
maxRestarts,
maxReconfigs,
Expand All @@ -551,6 +557,7 @@ protected static RackRolling rollingRestart(Time time,
Function<Integer, String> kafkaConfigProvider,
String desiredLogging,
long postOperationTimeoutMs,
long postRestartDelayMs,
int maxRestartBatchSize,
int maxRestarts,
int maxReconfigs,
Expand All @@ -568,6 +575,7 @@ protected static RackRolling rollingRestart(Time time,
kafkaConfigProvider,
desiredLogging,
postOperationTimeoutMs,
postRestartDelayMs,
maxRestartBatchSize,
maxRestarts,
maxReconfigs,
Expand All @@ -590,6 +598,7 @@ protected static RackRolling rollingRestart(Time time,
private final Function<Integer, String> kafkaConfigProvider;
private final String desiredLogging;
private final long postOperationTimeoutMs;
private final long postRestartDelayMs;
private final int maxRestartBatchSize;
private final int maxRestarts;
private final int maxReconfigs;
Expand All @@ -608,6 +617,7 @@ protected static RackRolling rollingRestart(Time time,
* @param kafkaConfigProvider Kafka configuration provider
* @param desiredLogging Kafka logging configuration
* @param postOperationTimeoutMs The maximum time in milliseconds to wait after a restart or reconfigure
* @param postRestartDelayMs Cooldown delay for next rolling restart
* @param maxRestartBatchSize The maximum number of nodes that might be restarted at once
* @param maxRestarts The maximum number of restart that can be done for a node
* @param maxReconfigs The maximum number of reconfiguration that can be done for a node
Expand All @@ -625,6 +635,7 @@ public RackRolling(Time time,
Function<Integer, String> kafkaConfigProvider,
String desiredLogging,
long postOperationTimeoutMs,
long postRestartDelayMs,
int maxRestartBatchSize,
int maxRestarts,
int maxReconfigs,
Expand All @@ -640,6 +651,7 @@ public RackRolling(Time time,
this.kafkaConfigProvider = kafkaConfigProvider;
this.desiredLogging = desiredLogging;
this.postOperationTimeoutMs = postOperationTimeoutMs;
this.postRestartDelayMs = postRestartDelayMs;
this.maxRestartBatchSize = maxRestartBatchSize;
this.maxRestarts = maxRestarts;
this.maxReconfigs = maxReconfigs;
Expand Down Expand Up @@ -830,7 +842,7 @@ private List<Integer> restartNodes(List<Context> nodesToRestart, int totalNumOfC
var batchOfContexts = nodesToRestart.stream().filter(context -> batchOfIds.contains(context.nodeId())).collect(Collectors.toSet());
LOGGER.debugCr(reconciliation, "Restart batch: {}", batchOfContexts);
// restart a batch
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, maxRestarts);
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, postRestartDelayMs, maxRestarts);

return batchOfIds.stream().toList();
}
Expand Down Expand Up @@ -915,7 +927,7 @@ private List<Integer> restartUnReadyNodes(List<Context> contexts, int totalNumOf
LOGGER.warnCr(reconciliation, "All controller nodes are combined and they are not running, therefore restarting them all now");
// if all controller nodes (except a single node quorum) are combined and all of them are not running e.g. Pending, we need to restart them all at the same time to form the quorum.
// This is because until the quorum has been formed and broker process can connect to it, the combined nodes do not become ready.
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, maxRestarts);
restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, postRestartDelayMs, maxRestarts);
return combinedNodesToRestart.stream().map(Context::nodeId).toList();
}

Expand Down
Loading