diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index d1421fcd5f3..54eca32b358 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -505,6 +505,7 @@ private Future maybeRollKafkaKraft(Set nodes, kafka.getKafkaVersion(), logging, operationTimeoutMs, + 0L, 1, 3, 3, diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java index 2a8f899d370..a174d8b8f7b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java @@ -348,6 +348,7 @@ private void restartInParallel(Reconciliation reconciliation, AgentClient agentClient, Set batch, long timeoutMs, + long postRestartDelayMs, int maxRestarts) throws TimeoutException { for (Context context : batch) { restartNode(reconciliation, time, platformClient, context, maxRestarts); @@ -370,6 +371,8 @@ private void restartInParallel(Reconciliation reconciliation, } } } + //added delay between batch restarts which is configurable + time.sleep(postRestartDelayMs, 0); } private static Map> refinePlanForReconfigurability(Reconciliation reconciliation, @@ -486,7 +489,8 @@ enum Plan { * @param kafkaConfigProvider Kafka configuration provider * @param totalNumOfControllerNodes The total number of controller nodes * @param kafkaLogging Kafka logging configuration - * @param postOperationTimeoutMs The maximum time in milliseconds to wait after a restart or reconfigure + * @param postOperationTimeoutMs The maximum time in milliseconds to wait after a restart or reconfigur + * @param postRestartDelayMs Cooldown delay for next rolling restart * @param maxRestartBatchSize The maximum number of nodes that might be restarted at once * @param maxRestarts The maximum number of restart that can be done for a node * @param maxReconfigs The maximum number of reconfiguration that can be done for a node @@ -507,6 +511,7 @@ public static RackRolling rollingRestart(PodOperator podOperator, KafkaVersion kafkaVersion, String kafkaLogging, long postOperationTimeoutMs, + long postRestartDelayMs, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -530,6 +535,7 @@ public static RackRolling rollingRestart(PodOperator podOperator, kafkaConfigProvider, kafkaLogging, postOperationTimeoutMs, + postRestartDelayMs, maxRestartBatchSize, maxRestarts, maxReconfigs, @@ -551,6 +557,7 @@ protected static RackRolling rollingRestart(Time time, Function kafkaConfigProvider, String desiredLogging, long postOperationTimeoutMs, + long postRestartDelayMs, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -568,6 +575,7 @@ protected static RackRolling rollingRestart(Time time, kafkaConfigProvider, desiredLogging, postOperationTimeoutMs, + postRestartDelayMs, maxRestartBatchSize, maxRestarts, maxReconfigs, @@ -590,6 +598,7 @@ protected static RackRolling rollingRestart(Time time, private final Function kafkaConfigProvider; private final String desiredLogging; private final long postOperationTimeoutMs; + private final long postRestartDelayMs; private final int maxRestartBatchSize; private final int maxRestarts; private final int maxReconfigs; @@ -608,6 +617,7 @@ protected static RackRolling rollingRestart(Time time, * @param kafkaConfigProvider Kafka configuration provider * @param desiredLogging Kafka logging configuration * @param postOperationTimeoutMs The maximum time in milliseconds to wait after a restart or reconfigure + * @param postRestartDelayMs Cooldown delay for next rolling restart * @param maxRestartBatchSize The maximum number of nodes that might be restarted at once * @param maxRestarts The maximum number of restart that can be done for a node * @param maxReconfigs The maximum number of reconfiguration that can be done for a node @@ -625,6 +635,7 @@ public RackRolling(Time time, Function kafkaConfigProvider, String desiredLogging, long postOperationTimeoutMs, + long postRestartDelayMs, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -640,6 +651,7 @@ public RackRolling(Time time, this.kafkaConfigProvider = kafkaConfigProvider; this.desiredLogging = desiredLogging; this.postOperationTimeoutMs = postOperationTimeoutMs; + this.postRestartDelayMs = postRestartDelayMs; this.maxRestartBatchSize = maxRestartBatchSize; this.maxRestarts = maxRestarts; this.maxReconfigs = maxReconfigs; @@ -830,7 +842,7 @@ private List restartNodes(List nodesToRestart, int totalNumOfC var batchOfContexts = nodesToRestart.stream().filter(context -> batchOfIds.contains(context.nodeId())).collect(Collectors.toSet()); LOGGER.debugCr(reconciliation, "Restart batch: {}", batchOfContexts); // restart a batch - restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, maxRestarts); + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, postRestartDelayMs, maxRestarts); return batchOfIds.stream().toList(); } @@ -915,7 +927,7 @@ private List restartUnReadyNodes(List contexts, int totalNumOf LOGGER.warnCr(reconciliation, "All controller nodes are combined and they are not running, therefore restarting them all now"); // if all controller nodes (except a single node quorum) are combined and all of them are not running e.g. Pending, we need to restart them all at the same time to form the quorum. // This is because until the quorum has been formed and broker process can connect to it, the combined nodes do not become ready. - restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, maxRestarts); + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, postRestartDelayMs, maxRestarts); return combinedNodesToRestart.stream().map(Context::nodeId).toList(); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java index 9b8d9c31979..c974f61b512 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java @@ -305,6 +305,7 @@ private RackRolling newRollingRestart(PlatformClient platformClient, kafkaConfigProvider, null, 120_000, + 0, maxRestartsBatchSize, 1, 1, @@ -318,6 +319,7 @@ private void doRollingRestart(PlatformClient platformClient, Collection nodeRefList, Function reason, Function kafkaConfigProvider, + long postRestartDelayMs, int maxRestartsBatchSize, int maxRestart) throws ExecutionException, InterruptedException, TimeoutException { @@ -334,6 +336,7 @@ private void doRollingRestart(PlatformClient platformClient, kafkaConfigProvider, null, 120_000, + postRestartDelayMs, maxRestartsBatchSize, maxRestart, 1, @@ -358,7 +361,7 @@ void shouldNotRestartBrokersWithNoTopicsIfAllHealthyAndNoReason() throws Executi .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -382,7 +385,7 @@ void shouldRestartBrokerWithNoTopicIfReasonManualRolling() throws ExecutionExcep .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -406,7 +409,7 @@ void shouldRestartBrokerIfReasonManualRolling() throws ExecutionException, Inter .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -432,7 +435,7 @@ void shouldThrowMaxRestartsExceededIfBrokerRestartsMoreThanMaxRestarts() { // when var ex = assertThrows(MaxRestartsExceededException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1)); //then assertEquals("Node pool-kafka-0/0 has been restarted 1 times", ex.getMessage()); @@ -494,7 +497,7 @@ void shouldNotThrowExceptionIfAllPreferredLeaderNotElectedAfterRestart() throws 0) .done(); - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(0)), any(), any()); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(1)), any(), any()); @@ -526,7 +529,7 @@ void shouldNotThrowExceptionIfAllPreferredLeadersNotElectedAfterReconfig() throw .mockElectLeaders(rollClient, List.of(1), 0) .done(); - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 10000L, 1, 1); Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRefs.get(0)), any(), any()); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(1)), any(), any()); @@ -556,7 +559,7 @@ void shouldRepeatAllPreferredLeaderElectionCallsUntilAllPreferredLeaderElected() .mockElectLeaders(rollClient, List.of(1, 1, 1, 1, 0), 0) .done().get(0); - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1); Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRef), any()); @@ -580,7 +583,7 @@ void shouldThrowExceptionIfNodeNotAbleToRecoverAfterRestart() { .done().get(0); var te = assertThrows(UnrestartableNodesException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 3)); assertEquals("The max attempts (2) to wait for this node pool-kafka-0/0 to finish performing log recovery has been reached. There are 0 logs and 0 segments left to recover.", te.getMessage()); @@ -607,7 +610,7 @@ void shouldThrowExceptionWithRecoveryProgressIfNodeIsInRecovery() { .done().get(0); var te = assertThrows(RuntimeException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1)); assertEquals("The max attempts (2) to wait for this node pool-kafka-0/0 to finish performing log recovery has been reached. There are 100 logs and 300 segments left to recover.", te.getMessage()); @@ -633,7 +636,7 @@ void shouldNotRestartUnreadyNodes() { .done().get(0); var te = assertThrows(RuntimeException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 1)); assertEquals("java.util.concurrent.TimeoutException: Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:00:00Z, reason=[], numRestarts=0, numReconfigs=0, numAttempts=2]", te.getMessage()); @@ -695,7 +698,7 @@ void shouldReconfigureBrokerIfChangedReconfigurableParameter() throws ExecutionE .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 10000L, 1, 1); // then Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRef), any(), any()); @@ -724,7 +727,7 @@ void shouldRestartBrokerIfChangedNonReconfigurableParameter() throws ExecutionEx .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "auto.leader.rebalance.enable=false", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "auto.leader.rebalance.enable=false", 10000L, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRef), any(), any()); @@ -753,7 +756,7 @@ void shouldReconfigureBrokerIfChangedReconfigurableLoggingParameter() throws Exe .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "log.retention.ms=1000", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "log.retention.ms=1000", 10000L, 1, 1); // then Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRef), any(), any()); @@ -780,7 +783,7 @@ void shouldNotRestartBrokersIfHealthyAndNoReason() throws ExecutionException, In .done(); // when - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 3, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 10000L, 3, 1); // then for (var nodeRef : nodeRefs.values()) { @@ -812,7 +815,7 @@ void shouldRestartBrokersIfReasonManualRolling() throws ExecutionException, Inte // when doRollingRestart(platformClient, rollClient, agentClient, - nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1); + nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 3, 1); // then for (var nodeRef : nodeRefs.values()) { @@ -1186,7 +1189,7 @@ public void shouldRollOddSizedQuorumOneControllerBehind() throws ExecutionExcept .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 3, 1), "Expect UnrestartableNodesException because neither controller 0 nor 1 can be restarted without impacting the quorum health"); // We should be able to restart only the controller that is behind @@ -1221,7 +1224,7 @@ public void shouldNotRollEvenSizedQuorumTwoControllersBehind() { // we should not restart any controllers as the majority have not caught up to the leader assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 3, 1), "Expect UnrestartableNodesException because none of the controllers can be restarted without impacting the quorum health"); for (var nodeRef : nodeRefs.values()) { @@ -1295,7 +1298,7 @@ public void shouldNotRollControllersWithInvalidTimestamp() { .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 3, 1), "Expect UnrestartableNodesException because of invalid timestamps for controller 0 and 2"); for (var nodeRef : nodeRefs.values()) { @@ -1323,7 +1326,7 @@ public void shouldNotRollControllersWithInvalidLeader() { .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 3, 1), "Expect UnrestartableNodesException because of invalid quorum leader"); for (var nodeRef : nodeRefs.values()) { @@ -1385,7 +1388,7 @@ public void shouldRollTwoNodesQuorumOneControllerBehind() throws ExecutionExcept .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 10000L, 3, 1), "Expect UnrestartableNodesException because of controller 2 has fallen behind therefore controller 1 cannot be restarted"); //only the controller that has fallen behind should be restarted Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRefs.get(2)), any()); @@ -1516,7 +1519,7 @@ void shouldFailReconciliationIfControllerNodeNeverBecomeReady() { .done(); var ex = assertThrows(TimeoutException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 3)); assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-1/1, currentRoles=NodeRoles[controller=true, broker=false], state=NOT_RUNNING, lastTransition=1970-01-01T00:00:00Z, reason=[POD_STUCK], numRestarts=0, numReconfigs=0, numAttempts=2]", ex.getMessage()); @@ -1541,9 +1544,9 @@ void shouldFailReconciliationIfBrokerNodeNeverBecomeReady() { //If nodes never got into running state, we have to exit the inner loop somehow. var ex = assertThrows(TimeoutException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 10000L, 1, 3)); - assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:19Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage()); + assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:29Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage()); Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRefs.get(1)), any());