From 443ade66dc0b4f2a2ccd707c6e2c51eedf4dc05d Mon Sep 17 00:00:00 2001 From: Heesung Sohn <103456639+heesung-sn@users.noreply.github.com> Date: Sat, 30 Dec 2023 23:05:29 -0800 Subject: [PATCH] [improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) (#21811) --- .../ExtensibleLoadManagerWrapper.java | 2 - .../channel/ServiceUnitStateChannelImpl.java | 86 ++++++++++++------- .../channel/ServiceUnitStateChannelTest.java | 75 ++++++++++++++++ .../ExtensibleLoadManagerTest.java | 56 +++++++++--- 4 files changed, 177 insertions(+), 42 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 18e949537dedb..cd1561cb70e2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() { @Override public void writeLoadReportOnZookeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override public void writeResourceQuotasToZooKeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index bd7e032a24a0c..ad6bac1feeb57 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -498,6 +498,27 @@ public boolean isOwner(String serviceUnit) { return isOwner(serviceUnit, lookupServiceAddress); } + private CompletableFuture> getActiveOwnerAsync( + String serviceUnit, + ServiceUnitState state, + Optional owner) { + CompletableFuture> activeOwner = owner.isPresent() + ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner)) + : CompletableFuture.completedFuture(Optional.empty()); + + return activeOwner + .thenCompose(broker -> broker + .map(__ -> activeOwner) + .orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable))) + .whenComplete((__, e) -> { + if (e != null) { + log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}", + serviceUnit, state, owner, e); + ownerLookUpCounters.get(state).getFailure().incrementAndGet(); + } + }); + } + public CompletableFuture> getOwnerAsync(String serviceUnit) { if (!validateChannelState(Started, true)) { return CompletableFuture.failedFuture( @@ -509,18 +530,13 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { ownerLookUpCounters.get(state).getTotal().incrementAndGet(); switch (state) { case Owned -> { - return CompletableFuture.completedFuture(Optional.of(data.dstBroker())); + return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker())); } case Splitting -> { - return CompletableFuture.completedFuture(Optional.of(data.sourceBroker())); + return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker())); } case Assigning, Releasing -> { - return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> { - if (e != null) { - ownerLookUpCounters.get(state).getFailure().incrementAndGet(); - } - }).thenApply( - broker -> broker == null ? Optional.empty() : Optional.of(broker)); + return getActiveOwnerAsync(serviceUnit, state, Optional.empty()); } case Init, Free -> { return CompletableFuture.completedFuture(Optional.empty()); @@ -781,9 +797,14 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.complete(null); } - stateChangeListeners.notify(serviceUnit, data, null); + if (isTargetBroker(data.sourceBroker())) { - log(null, serviceUnit, data, null); + stateChangeListeners.notifyOnCompletion( + data.force() ? closeServiceUnit(serviceUnit) + : CompletableFuture.completedFuture(0), serviceUnit, data) + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + } else { + stateChangeListeners.notify(serviceUnit, data, null); } } @@ -1168,38 +1189,43 @@ private void scheduleCleanup(String broker, long delayInSecs) { private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData, - String selectedBroker, + Optional selectedBroker, String inactiveBroker) { + + + if (selectedBroker.isEmpty()) { + return new ServiceUnitStateData(Free, null, inactiveBroker, + true, getNextVersionId(orphanData)); + } + if (orphanData.state() == Splitting) { - return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker, + return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(), Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, getNextVersionId(orphanData)); } else { - return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker, + return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker, true, getNextVersionId(orphanData)); } } private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) { Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); - if (selectedBroker.isPresent()) { - var override = getOverrideInactiveBrokerStateData( - orphanData, selectedBroker.get(), inactiveBroker); - log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", - serviceUnit, orphanData, override); - publishOverrideEventAsync(serviceUnit, orphanData, override) - .exceptionally(e -> { - log.error( - "Failed to override the ownership serviceUnit:{} orphanData:{}. " - + "Failed to publish override event. totalCleanupErrorCnt:{}", - serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); - return null; - }); - } else { - log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. " + if (selectedBroker.isEmpty()) { + log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}." + "totalCleanupErrorCnt:{}", serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); } + var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker); + log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", + serviceUnit, orphanData, override); + publishOverrideEventAsync(serviceUnit, orphanData, override) + .exceptionally(e -> { + log.error( + "Failed to override the ownership serviceUnit:{} orphanData:{}. " + + "Failed to publish override event. totalCleanupErrorCnt:{}", + serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); + return null; + }); } private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) { @@ -1301,7 +1327,7 @@ private synchronized void doCleanup(String broker) { broker, cleanupTime, orphanServiceUnitCleanupCnt, - totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), + totalCleanupErrorCnt.get() - totalCleanupErrorCntStart, printCleanupMetrics()); } @@ -1490,7 +1516,7 @@ protected void monitorOwnerships(List brokers) { inactiveBrokers, inactiveBrokers.size(), orphanServiceUnitCleanupCnt, serviceUnitTombstoneCleanupCnt, - totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), + totalCleanupErrorCnt.get() - totalCleanupErrorCntStart, printCleanupMetrics()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index acf87ec750007..1cffc3c626e27 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.expectThrows; import static org.testng.AssertJUnit.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -1557,6 +1558,80 @@ public void testOverrideOrphanStateData() cleanTableViews(); } + @Test(priority = 19) + public void testActiveGetOwner() throws Exception { + + + // set the bundle owner is the broker + String broker = lookupServiceAddress2; + String bundle = "public/owned/0xfffffff0_0xffffffff"; + overrideTableViews(bundle, + new ServiceUnitStateData(Owned, broker, null, 1)); + var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get(); + assertEquals(owner, broker); + + // simulate the owner is inactive + var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); + doReturn(CompletableFuture.completedFuture(Optional.empty())) + .when(spyRegistry).lookupAsync(eq(broker)); + FieldUtils.writeDeclaredField(channel1, + "brokerRegistry", spyRegistry , true); + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 1000, true); + + + // verify getOwnerAsync times out because the owner is inactive now. + long start = System.currentTimeMillis(); + var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get()); + assertTrue(ex.getCause() instanceof TimeoutException); + assertTrue(System.currentTimeMillis() - start >= 1000); + + // simulate ownership cleanup(no selected owner) by the leader channel + doReturn(CompletableFuture.completedFuture(Optional.empty())) + .when(loadManager).selectAsync(any(), any()); + var leaderChannel = channel1; + String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); + String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); + assertEquals(leader1, leader2); + if (leader1.equals(lookupServiceAddress2)) { + leaderChannel = channel2; + } + leaderChannel.handleMetadataSessionEvent(SessionReestablished); + FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", + System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + + // verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 20 * 1000, true); + start = System.currentTimeMillis(); + assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty()); + assertTrue(System.currentTimeMillis() - start < 20_000); + + // simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel + overrideTableViews(bundle, + new ServiceUnitStateData(Owned, broker, null, 1)); + doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))) + .when(loadManager).selectAsync(any(), any()); + leaderChannel.handleMetadataSessionEvent(SessionReestablished); + FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", + System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + getCleanupJobs(leaderChannel).clear(); + leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + + // verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout + start = System.currentTimeMillis(); + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get()); + assertTrue(System.currentTimeMillis() - start < 20_000); + + // test clean-up + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + FieldUtils.writeDeclaredField(channel1, + "brokerRegistry", registry , true); + cleanTableViews(); + + } private static ConcurrentHashMap>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 23abf50bdb063..e262b27fe2306 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -129,6 +129,26 @@ public void startBroker() { brokerContainer.start(); } }); + String topicName = "persistent://" + DEFAULT_NAMESPACE + "/startBrokerCheck"; + Awaitility.await().atMost(120, TimeUnit.SECONDS).ignoreExceptions().until( + () -> { + for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) { + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl( + brokerContainer.getHttpServiceUrl()).build()) { + if (admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) { + return false; + } + try { + admin.topics().createPartitionedTopic(topicName, 10); + } catch (PulsarAdminException.ConflictException e) { + // expected + } + admin.lookups().lookupPartitionedTopic(topicName); + } + } + return true; + } + ); } } @@ -245,7 +265,7 @@ public void testDeleteNamespace() throws Exception { assertFalse(admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace)); } - @Test(timeOut = 40 * 1000) + @Test(timeOut = 120 * 1000) public void testStopBroker() throws Exception { String topicName = "persistent://" + DEFAULT_NAMESPACE + "/test-stop-broker-topic"; @@ -261,9 +281,11 @@ public void testStopBroker() throws Exception { } } - String broker1 = admin.lookups().lookupTopic(topicName); + Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String broker1 = admin.lookups().lookupTopic(topicName); + assertNotEquals(broker1, broker); + }); - assertNotEquals(broker1, broker); } @Test(timeOut = 40 * 1000) @@ -311,7 +333,7 @@ public void testIsolationPolicy() throws PulsarAdminException { parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); - Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( () -> { List activeBrokers = admin.brokers().getActiveBrokers(); assertEquals(activeBrokers.size(), NUM_BROKERS); @@ -347,6 +369,7 @@ public void testIsolationPolicy() throws PulsarAdminException { } String broker = admin.lookups().lookupTopic(topic); + assertEquals(extractBrokerIndex(broker), 0); for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -355,11 +378,17 @@ public void testIsolationPolicy() throws PulsarAdminException { } } - assertEquals(extractBrokerIndex(broker), 0); - - broker = admin.lookups().lookupTopic(topic); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( + () -> { + List activeBrokers = admin.brokers().getActiveBrokers(); + assertEquals(activeBrokers.size(), 2); + } + ); - assertEquals(extractBrokerIndex(broker), 1); + Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String ownerBroker = admin.lookups().lookupTopic(topic); + assertEquals(extractBrokerIndex(ownerBroker), 1); + }); for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -367,13 +396,20 @@ public void testIsolationPolicy() throws PulsarAdminException { container.stop(); } } + + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( + () -> { + List activeBrokers = admin.brokers().getActiveBrokers(); + assertEquals(activeBrokers.size(), 1); + } + ); + try { admin.lookups().lookupTopic(topic); fail(); } catch (Exception ex) { log.error("Failed to lookup topic: ", ex); - assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker", - "Failed to select the new owner broker for bundle"); + assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle"); } }