diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java index 2a8f899d370..bc4194e6c2d 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java @@ -22,6 +22,7 @@ import io.vertx.core.Future; import io.vertx.core.Promise; import io.vertx.core.Vertx; +import java.util.Collections; import org.apache.kafka.clients.admin.ConfigEntry; import org.apache.kafka.clients.admin.TopicListing; @@ -454,16 +455,19 @@ private static State observe(Reconciliation reconciliation, PlatformClient platf } enum Plan { - // Used for brokers that are initially healthy and require neither restart not reconfigure + // Used for nodes that are initially healthy and require neither restart not reconfigure NOP, - // Used for brokers that are initially not healthy - RESTART_FIRST, - // Used in {@link #initialPlan(List, RollClient)} for brokers that require reconfigure + // Used for nodes that are initially not running + RESTART_NOT_RUNNING, + // Used for nodes that are not responsive to connections + RESTART_UNRESPONSIVE, + // Used in {@link #initialPlan(List, RollClient)} for nodes that may require reconfigure // before we know whether the actual config changes are reconfigurable MAYBE_RECONFIGURE, // Used in {@link #refinePlanForReconfigurability(Reconciliation, KafkaVersion, Function, String, RollClient, Map)} // once we know a MAYBE_RECONFIGURE node can actually be reconfigured RECONFIGURE, + // Used for nodes that have non-empty reasons to restart RESTART, // Used in {@link #initialPlan(List, RollClient)} for nodes that require waiting for // log recovery to complete @@ -737,8 +741,13 @@ public List loop() throws TimeoutException, InterruptedException, Execu } // Restart any initially unready nodes - if (!byPlan.getOrDefault(Plan.RESTART_FIRST, List.of()).isEmpty()) { - return restartUnReadyNodes(byPlan.get(Plan.RESTART_FIRST), totalNumOfControllerNodes); + if (!byPlan.getOrDefault(Plan.RESTART_NOT_RUNNING, List.of()).isEmpty()) { + return restartNotRunningNodes(byPlan.get(Plan.RESTART_NOT_RUNNING), totalNumOfControllerNodes); + } + + // Restart any initially unready nodes + if (!byPlan.getOrDefault(Plan.RESTART_UNRESPONSIVE, List.of()).isEmpty()) { + return restartUnresponsiveNodes(byPlan.get(Plan.RESTART_UNRESPONSIVE)); } // If we get this far we know all nodes are ready @@ -895,30 +904,37 @@ private List waitForUnreadyNodes(List contexts, boolean ignore return contexts.stream().map(Context::nodeId).collect(Collectors.toList()); } - private List restartUnReadyNodes(List contexts, int totalNumOfControllers) throws TimeoutException { + private List restartNotRunningNodes(List contexts, int totalNumOfControllers) throws TimeoutException { + Set notRunningControllers = contexts.stream().filter(context -> context.currentRoles().controller()).collect(Collectors.toSet()); + + if (totalNumOfControllers > 1 && totalNumOfControllers == notRunningControllers.size()) { + LOGGER.warnCr(reconciliation, "None of the controller nodes are running, therefore restarting them all now to to give the best chance to recover and form a quorum"); + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, notRunningControllers, postOperationTimeoutMs, maxRestarts); + return notRunningControllers.stream().map(Context::nodeId).toList(); + } + + Set notRunningAndHasOldRevision = contexts.stream().filter(context -> context.state().equals(State.NOT_RUNNING) && + context.reason().contains(RestartReason.POD_HAS_OLD_REVISION)).collect(Collectors.toSet()); + + if (notRunningAndHasOldRevision.size() > 0) { + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, notRunningAndHasOldRevision, postOperationTimeoutMs, maxRestarts); + return notRunningAndHasOldRevision.stream().map(Context::nodeId).toList(); + } + + return waitForUnreadyNodes(contexts, false); + } + + private List restartUnresponsiveNodes(List contexts) throws TimeoutException { Set pureControllerNodesToRestart = new HashSet<>(); Set combinedNodesToRestart = new HashSet<>(); - var notRunningCombinedNodes = 0; - for (var context : contexts) { if (context.currentRoles().controller() && !context.currentRoles().broker()) { pureControllerNodesToRestart.add(context); } else if (context.currentRoles().controller() && context.currentRoles().broker()) { combinedNodesToRestart.add(context); - if (context.state().equals(State.NOT_RUNNING)) { - notRunningCombinedNodes++; - } } } - if (totalNumOfControllers > 1 && totalNumOfControllers == notRunningCombinedNodes) { - LOGGER.warnCr(reconciliation, "All controller nodes are combined and they are not running, therefore restarting them all now"); - // if all controller nodes (except a single node quorum) are combined and all of them are not running e.g. Pending, we need to restart them all at the same time to form the quorum. - // This is because until the quorum has been formed and broker process can connect to it, the combined nodes do not become ready. - restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, maxRestarts); - return combinedNodesToRestart.stream().map(Context::nodeId).toList(); - } - // restart in the following order: pure controllers, combined nodes and brokers Context nodeToRestart; if (pureControllerNodesToRestart.size() > 0) { @@ -929,28 +945,10 @@ private List restartUnReadyNodes(List contexts, int totalNumOf nodeToRestart = contexts.get(0); } - if (nodeToRestart.state() == State.NOT_RUNNING && !nodeToRestart.reason().contains(RestartReason.POD_HAS_OLD_REVISION)) { - // If the node is not running (e.g. unschedulable) then restarting it, likely won't make any difference. - // Proceeding and deleting another node may result in it not running too. Avoid restarting it unless it has an old revision. - LOGGER.warnCr(reconciliation, "Node {} has been already restarted but still not running. Therefore will not restart it", nodeToRestart); - } else { - restartNode(reconciliation, time, platformClient, nodeToRestart, maxRestarts); - } + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, Collections.singleton(nodeToRestart), postOperationTimeoutMs, maxRestarts); + + return Collections.singletonList(nodeToRestart.nodeId()); - try { - long remainingTimeoutMs = awaitState(reconciliation, time, platformClient, agentClient, nodeToRestart, State.SERVING, postOperationTimeoutMs); - if (nodeToRestart.currentRoles().broker()) { - awaitPreferred(reconciliation, time, rollClient, nodeToRestart, remainingTimeoutMs); - } - } catch (TimeoutException e) { - LOGGER.warnCr(reconciliation, "Timed out waiting for node {} to become ready after a restart", nodeToRestart.nodeRef()); - if (nodeToRestart.numAttempts() >= maxAttempts) { - LOGGER.warnCr(reconciliation, "Reached the maximum attempt of waiting for node {} to become ready after a restart", nodeToRestart.nodeRef()); - throw e; - } - nodeToRestart.incrementNumAttempts(); - } - return List.of(nodeToRestart.nodeId()); } private Map> initialPlan(List contexts, RollClient rollClient) { @@ -958,7 +956,7 @@ private Map> initialPlan(List contexts, RollClient if (context.state() == State.NOT_RUNNING) { LOGGER.debugCr(reconciliation, "{} is in {} state therefore may get restarted first", context.nodeRef(), context.state()); context.reason().add(RestartReason.POD_STUCK); - return Plan.RESTART_FIRST; + return Plan.RESTART_NOT_RUNNING; } else if (context.state() == State.RECOVERING) { LOGGER.debugCr(reconciliation, "{} is in log recovery therefore will not be restarted", context.nodeRef()); @@ -967,7 +965,7 @@ private Map> initialPlan(List contexts, RollClient } else if (!rollClient.canConnectToNode(context.nodeRef(), context.currentRoles().controller())) { LOGGER.debugCr(reconciliation, "{} will be restarted because it does not seem to responding to connection attempt", context.nodeRef()); context.reason().add(RestartReason.POD_UNRESPONSIVE); - return Plan.RESTART_FIRST; + return Plan.RESTART_UNRESPONSIVE; } else { var reasons = context.reason(); diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java index 9b8d9c31979..079ccb20d03 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java @@ -1457,16 +1457,44 @@ public void shouldRollNodesIfAllNotRunning() throws ExecutionException, Interrup true, 3); - // the order we expect are pure controller, combined and broker only - assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 0); - - assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 1); + // we expect the controller nodes to be restarted together as none of the controllers are running + assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 0, 1); assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 2); assertNodesRestarted(platformClient, rollClient, nodeRefs, rr); } + @Test + public void shouldRollNodeIfNotRunningAndHasOldRevision() throws ExecutionException, InterruptedException, TimeoutException { + // given + PlatformClient platformClient = mock(PlatformClient.class); + RollClient rollClient = mock(RollClient.class); + AgentClient agentClient = mock(AgentClient.class); + var nodeRefs = new MockBuilder() + .addNodes(platformClient, true, true, 0) + .mockLeader(rollClient, 0) + .addTopic("topic-A", 0) + .mockCanConnectToNodes(rollClient, true, 0) + .mockNodeState(platformClient, List.of(PlatformClient.NodeState.NOT_RUNNING, PlatformClient.NodeState.READY), 0) + .mockTopics(rollClient) + .done(); + + var rr = newRollingRestart(platformClient, + rollClient, + agentClient, + nodeRefs.values(), + RackRollingTest::podHasOldRevision, + EMPTY_CONFIG_SUPPLIER, + true, + 3); + + // we expect the controller nodes to be restarted together as none of the controllers are running + assertNodesRestarted(platformClient, rollClient, nodeRefs, rr, 0); + + assertNodesRestarted(platformClient, rollClient, nodeRefs, rr); + } + @Test public void shouldRestartCombinedNodesIfAllNotRunning() throws ExecutionException, InterruptedException, TimeoutException { // given @@ -1508,17 +1536,15 @@ void shouldFailReconciliationIfControllerNodeNeverBecomeReady() { RollClient rollClient = mock(RollClient.class); AgentClient agentClient = mock(AgentClient.class); var nodeRefs = new MockBuilder() - .addNode(platformClient, false, true, 0) .addNode(platformClient, true, false, 1) - .mockNodeState(platformClient, List.of(PlatformClient.NodeState.NOT_RUNNING), 0) .mockNodeState(platformClient, List.of(PlatformClient.NodeState.NOT_RUNNING), 1) .mockTopics(rollClient) .done(); - var ex = assertThrows(TimeoutException.class, + var ex = assertThrows(RuntimeException.class, () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); - assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-1/1, currentRoles=NodeRoles[controller=true, broker=false], state=NOT_RUNNING, lastTransition=1970-01-01T00:00:00Z, reason=[POD_STUCK], numRestarts=0, numReconfigs=0, numAttempts=2]", ex.getMessage()); + assertEquals("java.util.concurrent.TimeoutException: Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-1/1, currentRoles=NodeRoles[controller=true, broker=false], state=NOT_RUNNING, lastTransition=1970-01-01T00:00:00Z, reason=[POD_STUCK], numRestarts=0, numReconfigs=0, numAttempts=2]", ex.getMessage()); Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); Mockito.verify(platformClient, times(0)).restartNode(eq(nodeRefs.get(1)), any()); @@ -1543,7 +1569,7 @@ void shouldFailReconciliationIfBrokerNodeNeverBecomeReady() { var ex = assertThrows(TimeoutException.class, () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); - assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:19Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage()); + assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:17Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage()); Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRefs.get(1)), any());