From 1ddcf75e40f8c0e87ed58ce490893ee070472013 Mon Sep 17 00:00:00 2001 From: Gantigmaa Selenge Date: Tue, 2 Jul 2024 16:25:04 +0100 Subject: [PATCH] Add configurable wait time between restart and preferred leader election. Send elect request only if there are partitions that need leader relections. After reconfiguration, wait for nodes to have SERVING state, rathen than LEADING_ALL_PREFERRED (this is from the proposal PR comment). Refactor tests to reflect the reconfiguration change, and accurately test logging reconfiguration. Signed-off-by: Gantigmaa Selenge --- .../operator/assembly/KafkaReconciler.java | 1 + .../operator/resource/rolling/Alarm.java | 2 +- .../resource/rolling/RackRolling.java | 45 ++++------ .../resource/rolling/RollClientImpl.java | 19 +++-- .../resource/rolling/RackRollingTest.java | 85 ++++++++++--------- 5 files changed, 78 insertions(+), 74 deletions(-) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java index d1421fcd5f3..198415bf663 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaReconciler.java @@ -505,6 +505,7 @@ private Future maybeRollKafkaKraft(Set nodes, kafka.getKafkaVersion(), logging, operationTimeoutMs, + 10000L, 1, 3, 3, diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java index 19f65ac9ede..b3403c24314 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/Alarm.java @@ -44,7 +44,7 @@ private Alarm(Time time, long deadline, Supplier timeoutMessageSupplier) } /** - * Creates an Alerm + * Creates an Alarm * @param time The source of time * @param timeoutMs The timeout for this alarm. * @param timeoutMessageSupplier The exception message diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java index 2a8f899d370..38e638747be 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRolling.java @@ -348,6 +348,7 @@ private void restartInParallel(Reconciliation reconciliation, AgentClient agentClient, Set batch, long timeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestarts) throws TimeoutException { for (Context context : batch) { restartNode(reconciliation, time, platformClient, context, maxRestarts); @@ -357,6 +358,7 @@ private void restartInParallel(Reconciliation reconciliation, try { remainingTimeoutMs = awaitState(reconciliation, time, platformClient, agentClient, context, State.SERVING, remainingTimeoutMs); if (context.currentRoles().broker()) { + time.sleep(waitBetweenRestartAndPreferredLeaderElection, 0); awaitPreferred(reconciliation, time, rollClient, context, remainingTimeoutMs); } } catch (TimeoutException e) { @@ -507,6 +509,7 @@ public static RackRolling rollingRestart(PodOperator podOperator, KafkaVersion kafkaVersion, String kafkaLogging, long postOperationTimeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -530,6 +533,7 @@ public static RackRolling rollingRestart(PodOperator podOperator, kafkaConfigProvider, kafkaLogging, postOperationTimeoutMs, + waitBetweenRestartAndPreferredLeaderElection, maxRestartBatchSize, maxRestarts, maxReconfigs, @@ -551,6 +555,7 @@ protected static RackRolling rollingRestart(Time time, Function kafkaConfigProvider, String desiredLogging, long postOperationTimeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -568,6 +573,7 @@ protected static RackRolling rollingRestart(Time time, kafkaConfigProvider, desiredLogging, postOperationTimeoutMs, + waitBetweenRestartAndPreferredLeaderElection, maxRestartBatchSize, maxRestarts, maxReconfigs, @@ -590,6 +596,7 @@ protected static RackRolling rollingRestart(Time time, private final Function kafkaConfigProvider; private final String desiredLogging; private final long postOperationTimeoutMs; + private final long waitBetweenRestartAndPreferredLeaderElectionMs; private final int maxRestartBatchSize; private final int maxRestarts; private final int maxReconfigs; @@ -625,6 +632,7 @@ public RackRolling(Time time, Function kafkaConfigProvider, String desiredLogging, long postOperationTimeoutMs, + long waitBetweenRestartAndPreferredLeaderElection, int maxRestartBatchSize, int maxRestarts, int maxReconfigs, @@ -640,6 +648,7 @@ public RackRolling(Time time, this.kafkaConfigProvider = kafkaConfigProvider; this.desiredLogging = desiredLogging; this.postOperationTimeoutMs = postOperationTimeoutMs; + this.waitBetweenRestartAndPreferredLeaderElectionMs = waitBetweenRestartAndPreferredLeaderElection; this.maxRestartBatchSize = maxRestartBatchSize; this.maxRestarts = maxRestarts; this.maxReconfigs = maxReconfigs; @@ -726,7 +735,7 @@ public List loop() throws TimeoutException, InterruptedException, Execu // We want to give nodes chance to get ready before we try to connect to the or consider them for rolling. // This is important especially for nodes which were just started. LOGGER.debugCr(reconciliation, "Waiting for nodes {} to become ready before initialising plan in case they just started", unreadyNodes); - waitForUnreadyNodes(unreadyNodes, true); + awaitReadiness(unreadyNodes, true); } var byPlan = initialPlan(contexts, rollClient); @@ -774,7 +783,7 @@ public List loop() throws TimeoutException, InterruptedException, Execu // from taking out a node each time (due, e.g. to a configuration error). LOGGER.debugCr(reconciliation, "Nodes {} do not need to be restarted", unreadyNodes); LOGGER.debugCr(reconciliation, "Waiting for non-restarted nodes {} to become ready", unreadyNodes); - return waitForUnreadyNodes(unreadyNodes, false); + return awaitReadiness(unreadyNodes, false); } } @@ -830,34 +839,22 @@ private List restartNodes(List nodesToRestart, int totalNumOfC var batchOfContexts = nodesToRestart.stream().filter(context -> batchOfIds.contains(context.nodeId())).collect(Collectors.toSet()); LOGGER.debugCr(reconciliation, "Restart batch: {}", batchOfContexts); // restart a batch - restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, maxRestarts); + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, batchOfContexts, postOperationTimeoutMs, waitBetweenRestartAndPreferredLeaderElectionMs, maxRestarts); return batchOfIds.stream().toList(); } private List reconfigureNodes(List contexts) { - List reconfiguredNode = List.of(); for (var context : contexts) { - // TODO decide whether to support canary reconfiguration for cluster-scoped configs (nice to have) try { reconfigureNode(reconciliation, time, rollClient, context, maxReconfigs); } catch (RuntimeException e) { return List.of(context.nodeId()); } - time.sleep(postOperationTimeoutMs / 2, 0); - // TODO decide whether we need an explicit healthcheck here - // or at least to know that the kube health check probe will have failed at the time - // we break to OUTER (We need to test a scenario of breaking configuration change, does this sleep catch it?) - awaitPreferred(reconciliation, time, rollClient, context, postOperationTimeoutMs / 2); - // termination condition - if (contexts.stream().allMatch(context2 -> context2.state().equals(State.LEADING_ALL_PREFERRED))) { - LOGGER.debugCr(reconciliation, "Terminate: All nodes leading preferred replicas after reconfigure"); - break; - } - reconfiguredNode = List.of(context.nodeId()); } - return reconfiguredNode; + awaitReadiness(contexts, false); + return contexts.stream().map(Context::nodeId).collect(Collectors.toList()); } private List waitForLogRecovery(List contexts) { @@ -877,7 +874,7 @@ private List waitForLogRecovery(List contexts) { return contexts.stream().map(Context::nodeId).collect(Collectors.toList()); } - private List waitForUnreadyNodes(List contexts, boolean ignoreTimeout) { + private List awaitReadiness(List contexts, boolean ignoreTimeout) { long remainingTimeoutMs = postOperationTimeoutMs; for (Context context : contexts) { try { @@ -915,7 +912,7 @@ private List restartUnReadyNodes(List contexts, int totalNumOf LOGGER.warnCr(reconciliation, "All controller nodes are combined and they are not running, therefore restarting them all now"); // if all controller nodes (except a single node quorum) are combined and all of them are not running e.g. Pending, we need to restart them all at the same time to form the quorum. // This is because until the quorum has been formed and broker process can connect to it, the combined nodes do not become ready. - restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, maxRestarts); + restartInParallel(reconciliation, time, platformClient, rollClient, agentClient, combinedNodesToRestart, postOperationTimeoutMs, waitBetweenRestartAndPreferredLeaderElectionMs, maxRestarts); return combinedNodesToRestart.stream().map(Context::nodeId).toList(); } @@ -987,14 +984,8 @@ private Map> initialPlan(List contexts, RollClient // If a pure controller's configuration has changed, it should have non-empty reasons to restart. return Plan.NOP; } else { - if (context.numReconfigs() > 0 - && context.state() == State.LEADING_ALL_PREFERRED) { - LOGGER.debugCr(reconciliation, "{} has already been reconfigured", context.nodeRef()); - return Plan.NOP; - } else { - LOGGER.debugCr(reconciliation, "{} may need to be reconfigured", context.nodeRef()); - return Plan.MAYBE_RECONFIGURE; - } + LOGGER.debugCr(reconciliation, "{} may need to be reconfigured", context.nodeRef()); + return Plan.MAYBE_RECONFIGURE; } } })); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java index 1bfe7a7c4b2..b9cb02e4397 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/rolling/RollClientImpl.java @@ -306,18 +306,21 @@ public int tryElectAllPreferredLeaders(NodeRef nodeRef) { for (TopicPartitionInfo topicPartitionInfo : td.partitions()) { if (!topicPartitionInfo.replicas().isEmpty() && topicPartitionInfo.replicas().get(0).id() == nodeRef.nodeId() // this node is preferred leader - && topicPartitionInfo.leader().id() != nodeRef.nodeId()) { // this onde is not current leader + && topicPartitionInfo.leader().id() != nodeRef.nodeId()) { // this node is not current leader toElect.add(new TopicPartition(td.name(), topicPartitionInfo.partition())); } } } - - var electionResults = brokerAdmin.electLeaders(ElectionType.PREFERRED, toElect).partitions().get(); - - long count = electionResults.values().stream() - .filter(Optional::isPresent) - .count(); - return count > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) count; + if (toElect.size() > 0) { + var electionResults = brokerAdmin.electLeaders(ElectionType.PREFERRED, toElect).partitions().get(); + + long count = electionResults.values().stream() + .filter(Optional::isPresent) + .count(); + return count > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) count; + } else { + return 0; + } } catch (InterruptedException e) { throw new UncheckedInterruptedException(e); } catch (ExecutionException e) { diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java index 9b8d9c31979..87801f17f49 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/operator/resource/rolling/RackRollingTest.java @@ -45,6 +45,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; public class RackRollingTest { @@ -215,6 +216,17 @@ MockBuilder mockDescribeConfigs(RollClient rollClient, Set nodeConf return this; } + MockBuilder mockDescribeConfigsWithUpdatedResult(RollClient rollClient, Configs currentConfigs, Configs updatedConfigs, int nodeId) { + when(rollClient.describeControllerConfigs(any())) + .thenReturn(Map.of(nodeId, currentConfigs)) + .thenReturn(Map.of(nodeId, updatedConfigs)); + + when(rollClient.describeBrokerConfigs(any())) + .thenReturn(Map.of(nodeId, currentConfigs)) + .thenReturn(Map.of(nodeId, updatedConfigs)); + return this; + } + MockBuilder mockQuorumLastCaughtUpTimestamps(RollClient rollClient, Map quorumState) { doReturn(quorumState) .when(rollClient) @@ -305,6 +317,7 @@ private RackRolling newRollingRestart(PlatformClient platformClient, kafkaConfigProvider, null, 120_000, + 0, maxRestartsBatchSize, 1, 1, @@ -318,6 +331,7 @@ private void doRollingRestart(PlatformClient platformClient, Collection nodeRefList, Function reason, Function kafkaConfigProvider, + String desiredLogging, int maxRestartsBatchSize, int maxRestart) throws ExecutionException, InterruptedException, TimeoutException { @@ -332,8 +346,9 @@ private void doRollingRestart(PlatformClient platformClient, KafkaVersionTestUtils.getLatestVersion(), true, kafkaConfigProvider, - null, + desiredLogging, 120_000, + 0, maxRestartsBatchSize, maxRestart, 1, @@ -358,7 +373,7 @@ void shouldNotRestartBrokersWithNoTopicsIfAllHealthyAndNoReason() throws Executi .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -382,7 +397,7 @@ void shouldRestartBrokerWithNoTopicIfReasonManualRolling() throws ExecutionExcep .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -406,7 +421,7 @@ void shouldRestartBrokerIfReasonManualRolling() throws ExecutionException, Inter .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); @@ -432,7 +447,7 @@ void shouldThrowMaxRestartsExceededIfBrokerRestartsMoreThanMaxRestarts() { // when var ex = assertThrows(MaxRestartsExceededException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 1)); //then assertEquals("Node pool-kafka-0/0 has been restarted 1 times", ex.getMessage()); @@ -451,7 +466,6 @@ void shouldRestartIfMaxReconfigExceeded() throws ExecutionException, Interrupted .addTopic("topic-A", 0) .mockTopics(rollClient) .mockDescribeConfigs(rollClient, Set.of(new ConfigEntry("compression.type", "zstd")), Set.of(), 0) - .mockElectLeaders(rollClient, List.of(0), 0) .done().get(0); var rr = newRollingRestart(platformClient, @@ -494,7 +508,7 @@ void shouldNotThrowExceptionIfAllPreferredLeaderNotElectedAfterRestart() throws 0) .done(); - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 1, 1); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(0)), any(), any()); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(1)), any(), any()); @@ -526,7 +540,7 @@ void shouldNotThrowExceptionIfAllPreferredLeadersNotElectedAfterReconfig() throw .mockElectLeaders(rollClient, List.of(1), 0) .done(); - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, serverId -> "compression.type=snappy", null, 1, 1); Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRefs.get(0)), any(), any()); Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRefs.get(1)), any(), any()); @@ -556,7 +570,7 @@ void shouldRepeatAllPreferredLeaderElectionCallsUntilAllPreferredLeaderElected() .mockElectLeaders(rollClient, List.of(1, 1, 1, 1, 0), 0) .done().get(0); - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 1); Mockito.verify(rollClient, never()).reconfigureNode(any(), any(), any()); Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRef), any()); @@ -580,7 +594,7 @@ void shouldThrowExceptionIfNodeNotAbleToRecoverAfterRestart() { .done().get(0); var te = assertThrows(UnrestartableNodesException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 3)); assertEquals("The max attempts (2) to wait for this node pool-kafka-0/0 to finish performing log recovery has been reached. There are 0 logs and 0 segments left to recover.", te.getMessage()); @@ -607,7 +621,7 @@ void shouldThrowExceptionWithRecoveryProgressIfNodeIsInRecovery() { .done().get(0); var te = assertThrows(RuntimeException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::podUnresponsive, EMPTY_CONFIG_SUPPLIER, null, 1, 1)); assertEquals("The max attempts (2) to wait for this node pool-kafka-0/0 to finish performing log recovery has been reached. There are 100 logs and 300 segments left to recover.", te.getMessage()); @@ -633,7 +647,7 @@ void shouldNotRestartUnreadyNodes() { .done().get(0); var te = assertThrows(RuntimeException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 1)); + () -> doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 1)); assertEquals("java.util.concurrent.TimeoutException: Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:00:00Z, reason=[], numRestarts=0, numReconfigs=0, numAttempts=2]", te.getMessage()); @@ -676,7 +690,6 @@ void shouldRestartUnresponsiveNode() throws ExecutionException, InterruptedExcep @Test void shouldReconfigureBrokerIfChangedReconfigurableParameter() throws ExecutionException, InterruptedException, TimeoutException { - // given PlatformClient platformClient = mock(PlatformClient.class); RollClient rollClient = mock(RollClient.class); @@ -687,21 +700,18 @@ void shouldReconfigureBrokerIfChangedReconfigurableParameter() throws ExecutionE .mockCanConnectToNodes(rollClient, true, 0) .mockBrokerState(agentClient, List.of(BrokerState.RUNNING, BrokerState.NOT_RUNNING, BrokerState.STARTING, BrokerState.RECOVERY, BrokerState.RUNNING), 0) .addTopic("topic-A", 0) - .mockDescribeConfigs(rollClient, - Set.of(new ConfigEntry("compression.type", "zstd")), - Set.of(), - 0) - .mockElectLeaders(rollClient, 0) + .mockDescribeConfigsWithUpdatedResult(rollClient, + new Configs(new Config(Set.of(new ConfigEntry("compression.type", "zstd"))), new Config(Set.of())), + new Configs(new Config(Set.of(new ConfigEntry("compression.type", "snappy"))), new Config(Set.of())), + 0) .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "compression.type=snappy", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "compression.type=snappy", null, 0, 0); // then Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRef), any(), any()); - Mockito.verify(platformClient, never()).restartNode(eq(nodeRef), any()); - Mockito.verify(rollClient, times(1)).tryElectAllPreferredLeaders(eq(nodeRef)); - } + Mockito.verify(platformClient, never()).restartNode(eq(nodeRef), any());} @Test void shouldRestartBrokerIfChangedNonReconfigurableParameter() throws ExecutionException, InterruptedException, TimeoutException { @@ -724,7 +734,7 @@ void shouldRestartBrokerIfChangedNonReconfigurableParameter() throws ExecutionEx .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "auto.leader.rebalance.enable=false", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "auto.leader.rebalance.enable=false", null, 1, 1); // then Mockito.verify(rollClient, never()).reconfigureNode(eq(nodeRef), any(), any()); @@ -746,19 +756,18 @@ void shouldReconfigureBrokerIfChangedReconfigurableLoggingParameter() throws Exe .mockCanConnectToNodes(rollClient, true, 0) .addTopic("topic-A", 0) .mockTopics(rollClient) - .mockDescribeConfigs(rollClient, - Set.of(), - Set.of(new ConfigEntry("org.apache.kafka", "DEBUG")), 0) - .mockElectLeaders(rollClient, 0) + .mockDescribeConfigsWithUpdatedResult(rollClient, + new Configs(new Config(Set.of()), new Config(Set.of())), + new Configs(new Config(Set.of()), new Config(Set.of(new ConfigEntry("org.apache.kafka", "DEBUG"), new ConfigEntry("root", "WARN")))), + 0) .done().get(0); // when - doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, serverId -> "log.retention.ms=1000", 1, 1); + doRollingRestart(platformClient, rollClient, agentClient, List.of(nodeRef), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, "log4j.logger.org.apache.kafka=DEBUG", 1, 1); // then Mockito.verify(rollClient, times(1)).reconfigureNode(eq(nodeRef), any(), any()); Mockito.verify(platformClient, never()).restartNode(eq(nodeRef), any()); - Mockito.verify(rollClient, times(1)).tryElectAllPreferredLeaders(eq(nodeRef)); } @Test @@ -780,7 +789,7 @@ void shouldNotRestartBrokersIfHealthyAndNoReason() throws ExecutionException, In .done(); // when - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 3, 1); + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 3, 1); // then for (var nodeRef : nodeRefs.values()) { @@ -812,7 +821,7 @@ void shouldRestartBrokersIfReasonManualRolling() throws ExecutionException, Inte // when doRollingRestart(platformClient, rollClient, agentClient, - nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1); + nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1); // then for (var nodeRef : nodeRefs.values()) { @@ -1186,7 +1195,7 @@ public void shouldRollOddSizedQuorumOneControllerBehind() throws ExecutionExcept .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because neither controller 0 nor 1 can be restarted without impacting the quorum health"); // We should be able to restart only the controller that is behind @@ -1221,7 +1230,7 @@ public void shouldNotRollEvenSizedQuorumTwoControllersBehind() { // we should not restart any controllers as the majority have not caught up to the leader assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because none of the controllers can be restarted without impacting the quorum health"); for (var nodeRef : nodeRefs.values()) { @@ -1295,7 +1304,7 @@ public void shouldNotRollControllersWithInvalidTimestamp() { .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because of invalid timestamps for controller 0 and 2"); for (var nodeRef : nodeRefs.values()) { @@ -1323,7 +1332,7 @@ public void shouldNotRollControllersWithInvalidLeader() { .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because of invalid quorum leader"); for (var nodeRef : nodeRefs.values()) { @@ -1385,7 +1394,7 @@ public void shouldRollTwoNodesQuorumOneControllerBehind() throws ExecutionExcept .done(); assertThrows(UnrestartableNodesException.class, () -> - doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, 3, 1), + doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::manualRolling, EMPTY_CONFIG_SUPPLIER, null, 3, 1), "Expect UnrestartableNodesException because of controller 2 has fallen behind therefore controller 1 cannot be restarted"); //only the controller that has fallen behind should be restarted Mockito.verify(platformClient, times(1)).restartNode(eq(nodeRefs.get(2)), any()); @@ -1516,7 +1525,7 @@ void shouldFailReconciliationIfControllerNodeNeverBecomeReady() { .done(); var ex = assertThrows(TimeoutException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 3)); assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-1/1, currentRoles=NodeRoles[controller=true, broker=false], state=NOT_RUNNING, lastTransition=1970-01-01T00:00:00Z, reason=[POD_STUCK], numRestarts=0, numReconfigs=0, numAttempts=2]", ex.getMessage()); @@ -1541,7 +1550,7 @@ void shouldFailReconciliationIfBrokerNodeNeverBecomeReady() { //If nodes never got into running state, we have to exit the inner loop somehow. var ex = assertThrows(TimeoutException.class, - () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, 1, 3)); + () -> doRollingRestart(platformClient, rollClient, agentClient, nodeRefs.values(), RackRollingTest::noReasons, EMPTY_CONFIG_SUPPLIER, null, 1, 3)); assertEquals("Failed to reach SERVING within 120000 ms: Context[nodeRef=pool-kafka-0/0, currentRoles=NodeRoles[controller=false, broker=true], state=NOT_READY, lastTransition=1970-01-01T00:10:19Z, reason=[POD_STUCK], numRestarts=2, numReconfigs=0, numAttempts=2]", ex.getMessage());