From c43e55f5a9324684829eeb615bd891cb19faa8cc Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Sun, 24 Dec 2023 14:37:39 -0800 Subject: [PATCH] [improve][broker] check if the owner is active when returning ownership (ExtensibleLoadManager) --- .../ExtensibleLoadManagerWrapper.java | 2 - .../channel/ServiceUnitStateChannelImpl.java | 90 ++++++++++++------- .../channel/ServiceUnitStateChannelTest.java | 84 +++++++++++++++++ .../ExtensibleLoadManagerTest.java | 39 ++++++-- 4 files changed, 175 insertions(+), 40 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 18e949537dedb..cd1561cb70e2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() { @Override public void writeLoadReportOnZookeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override public void writeResourceQuotasToZooKeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 713d98b72507e..e88f7d783a021 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -487,6 +487,30 @@ public boolean isOwner(String serviceUnit) { return isOwner(serviceUnit, lookupServiceAddress); } + private CompletableFuture> getActiveOwnerAsync( + String serviceUnit, + ServiceUnitState state, + Optional owner) { + CompletableFuture> activeOwner = owner.isPresent() + ? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner)) + : CompletableFuture.completedFuture(Optional.empty()); + + + return activeOwner + .thenCompose(broker -> broker + .map(__ -> activeOwner) + .orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply( + ownerAfterDeferred -> ownerAfterDeferred == null ? Optional.empty() + : Optional.of(ownerAfterDeferred)))) + .whenComplete((__, e) -> { + if (e != null) { + log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}", + serviceUnit, state, owner, e); + ownerLookUpCounters.get(state).getFailure().incrementAndGet(); + } + }); + } + public CompletableFuture> getOwnerAsync(String serviceUnit) { if (!validateChannelState(Started, true)) { return CompletableFuture.failedFuture( @@ -498,18 +522,13 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { ownerLookUpCounters.get(state).getTotal().incrementAndGet(); switch (state) { case Owned -> { - return CompletableFuture.completedFuture(Optional.of(data.dstBroker())); + return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker())); } case Splitting -> { - return CompletableFuture.completedFuture(Optional.of(data.sourceBroker())); + return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker())); } case Assigning, Releasing -> { - return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> { - if (e != null) { - ownerLookUpCounters.get(state).getFailure().incrementAndGet(); - } - }).thenApply( - broker -> broker == null ? Optional.empty() : Optional.of(broker)); + return getActiveOwnerAsync(serviceUnit, state, Optional.empty()); } case Init, Free -> { return CompletableFuture.completedFuture(Optional.empty()); @@ -812,9 +831,14 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.complete(null); } - stateChangeListeners.notify(serviceUnit, data, null); + if (isTargetBroker(data.sourceBroker())) { - log(null, serviceUnit, data, null); + stateChangeListeners.notifyOnCompletion( + data.force() ? closeServiceUnit(serviceUnit, true) + : CompletableFuture.completedFuture(0), serviceUnit, data) + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + } else { + stateChangeListeners.notify(serviceUnit, data, null); } } @@ -1202,38 +1226,44 @@ private void scheduleCleanup(String broker, long delayInSecs) { private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData, - String selectedBroker, + Optional selectedBroker, String inactiveBroker) { + + + if (selectedBroker.isEmpty()) { + return new ServiceUnitStateData(Free, null, inactiveBroker, + true, getNextVersionId(orphanData)); + } + if (orphanData.state() == Splitting) { - return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker, + return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(), Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, getNextVersionId(orphanData)); } else { - return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker, + return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker, true, getNextVersionId(orphanData)); } } private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) { Optional selectedBroker = selectBroker(serviceUnit, inactiveBroker); - if (selectedBroker.isPresent()) { - var override = getOverrideInactiveBrokerStateData( - orphanData, selectedBroker.get(), inactiveBroker); - log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", - serviceUnit, orphanData, override); - publishOverrideEventAsync(serviceUnit, orphanData, override) - .exceptionally(e -> { - log.error( - "Failed to override the ownership serviceUnit:{} orphanData:{}. " - + "Failed to publish override event. totalCleanupErrorCnt:{}", - serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); - return null; - }); - } else { - log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. " + if (selectedBroker.isEmpty()) { + log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}." + "totalCleanupErrorCnt:{}", serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); } + var override = getOverrideInactiveBrokerStateData( + orphanData, selectedBroker, inactiveBroker); + log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}", + serviceUnit, orphanData, override); + publishOverrideEventAsync(serviceUnit, orphanData, override) + .exceptionally(e -> { + log.error( + "Failed to override the ownership serviceUnit:{} orphanData:{}. " + + "Failed to publish override event. totalCleanupErrorCnt:{}", + serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet()); + return null; + }); } private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) { @@ -1335,7 +1365,7 @@ private synchronized void doCleanup(String broker) { broker, cleanupTime, orphanServiceUnitCleanupCnt, - totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), + totalCleanupErrorCnt.get() - totalCleanupErrorCntStart, printCleanupMetrics()); } @@ -1524,7 +1554,7 @@ protected void monitorOwnerships(List brokers) { inactiveBrokers, inactiveBrokers.size(), orphanServiceUnitCleanupCnt, serviceUnitTombstoneCleanupCnt, - totalCleanupErrorCntStart - totalCleanupErrorCnt.get(), + totalCleanupErrorCnt.get() - totalCleanupErrorCntStart, printCleanupMetrics()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index f99a167ff4883..5a23accfebe1a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -39,6 +39,7 @@ import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertThrows; +import static org.testng.Assert.fail; import static org.testng.AssertJUnit.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; @@ -1560,6 +1561,89 @@ public void testOverrideOrphanStateData() cleanTableViews(); } + @Test(priority = 19) + public void testActiveGetOwner() + throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { + + + // set the bundle owner is the broker + String broker = lookupServiceAddress2; + String bundle = "public/owned/0xfffffff0_0xffffffff"; + overrideTableViews(bundle, + new ServiceUnitStateData(Owned, broker, null, 1)); + var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get(); + assertEquals(owner, broker); + + // simulate the owner is inactive + var spyRegistry = spy(new BrokerRegistryImpl(pulsar)); + doReturn(CompletableFuture.completedFuture(Optional.empty())) + .when(spyRegistry).lookupAsync(eq(broker)); + FieldUtils.writeDeclaredField(channel1, + "brokerRegistry", spyRegistry , true); + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 1000, true); + + + // verify getOwnerAsync times out because the owner is inactive now. + long start = System.currentTimeMillis(); + try { + channel1.getOwnerAsync(bundle).get(); + fail(); + } catch (Exception e) { + if (e.getCause() instanceof TimeoutException) { + // expected + } else { + fail(); + } + } + assertTrue(System.currentTimeMillis() - start >= 1000); + + // simulate ownership cleanup(no selected owner) by the leader channel + doReturn(CompletableFuture.completedFuture(Optional.empty())) + .when(loadManager).selectAsync(any(), any()); + var leaderChannel = channel1; + String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); + String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); + assertEquals(leader, leader2); + if (leader.equals(lookupServiceAddress2)) { + leaderChannel = channel2; + } + leaderChannel.handleMetadataSessionEvent(SessionReestablished); + FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", + System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + + // verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 20 * 1000, true); + start = System.currentTimeMillis(); + assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty()); + assertTrue(System.currentTimeMillis() - start < 20_000); + + // simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel + overrideTableViews(bundle, + new ServiceUnitStateData(Owned, broker, null, 1)); + doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1))) + .when(loadManager).selectAsync(any(), any()); + leaderChannel.handleMetadataSessionEvent(SessionReestablished); + FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp", + System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true); + getCleanupJobs(leaderChannel).clear(); + leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted); + + // verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout + start = System.currentTimeMillis(); + assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get()); + assertTrue(System.currentTimeMillis() - start < 20_000); + + // test clean-up + FieldUtils.writeDeclaredField(channel1, + "inFlightStateWaitingTimeInMillis", 30 * 1000, true); + FieldUtils.writeDeclaredField(channel1, + "brokerRegistry", registry , true); + cleanTableViews(); + + } private static ConcurrentHashMap>> getOwnerRequests( ServiceUnitStateChannel channel) throws IllegalAccessException { diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index 64a05503e1970..8c99ec10c0e66 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -125,6 +125,28 @@ public void startBroker() { brokerContainer.start(); } }); + String topicName = "persistent://" + DEFAULT_NAMESPACE + "/startBrokerCheck"; + Awaitility.await().atMost(120, TimeUnit.SECONDS).until( + () -> { + for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) { + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl( + brokerContainer.getHttpServiceUrl()).build()) { + if (admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) { + return false; + } + try { + admin.topics().createPartitionedTopic(topicName, 10); + } catch (PulsarAdminException.ConflictException e) { + // expected + } + admin.lookups().lookupPartitionedTopic(topicName); + } catch (Throwable e) { + return false; + } + } + return true; + } + ); } } @@ -259,9 +281,11 @@ public void testStopBroker() throws Exception { } } - String broker1 = admin.lookups().lookupTopic(topicName); + Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String broker1 = admin.lookups().lookupTopic(topicName); + assertNotEquals(broker1, broker); + }); - assertNotEquals(broker1, broker); } @Test(timeOut = 80 * 1000) @@ -309,7 +333,7 @@ public void testIsolationPolicy() throws Exception { parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); - Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( () -> { List activeBrokers = admin.brokers().getActiveBrokers(); assertEquals(activeBrokers.size(), NUM_BROKERS); @@ -350,14 +374,14 @@ public void testIsolationPolicy() throws Exception { } } - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted( + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( () -> { List activeBrokers = admin.brokers().getActiveBrokers(); assertEquals(activeBrokers.size(), 2); } ); - Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { String ownerBroker = admin.lookups().lookupTopic(topic); assertEquals(extractBrokerIndex(ownerBroker), 1); }); @@ -369,7 +393,7 @@ public void testIsolationPolicy() throws Exception { } } - Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted( + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted( () -> { List activeBrokers = admin.brokers().getActiveBrokers(); assertEquals(activeBrokers.size(), 1); @@ -380,8 +404,7 @@ public void testIsolationPolicy() throws Exception { fail(); } catch (Exception ex) { log.error("Failed to lookup topic: ", ex); - assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker", - "Failed to select the new owner broker for bundle"); + assertThat(ex.getMessage()).containsAnyOf("Failed to select the new owner broker for bundle"); } }