From 5bc4a23dce7d32966326b27451321369f6808ce4 Mon Sep 17 00:00:00 2001 From: Heesung Sohn Date: Fri, 3 Nov 2023 15:08:07 -0700 Subject: [PATCH] resolved comments --- .../extensions/ExtensibleLoadManagerImpl.java | 4 ++++ .../channel/ServiceUnitStateChannelImpl.java | 12 +++++----- .../pulsar/broker/service/BrokerService.java | 10 ++++----- .../pulsar/broker/service/ServerCnx.java | 7 +++--- .../apache/pulsar/broker/service/Topic.java | 2 +- .../nonpersistent/NonPersistentTopic.java | 10 ++++----- .../service/persistent/PersistentTopic.java | 22 +++++++++---------- .../ExtensibleLoadManagerImplTest.java | 20 ++++++++++++----- .../pulsar/client/impl/ConnectionHandler.java | 2 +- 9 files changed, 51 insertions(+), 38 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 14c81a6a492159..67bab9b12ffb18 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -267,6 +267,10 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) { return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName()); } + public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) { + return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl; + } + public static ExtensibleLoadManagerImpl get(LoadManager loadManager) { if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) { throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'."); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 1fe7b83d77db1c..cff45b18ec8b7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -776,7 +776,7 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { log(null, serviceUnit, data, null); } else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) { stateChangeListeners.notifyOnCompletion( - closeServiceUnit(serviceUnit, false), serviceUnit, data) + closeServiceUnit(serviceUnit, true), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, null)); } else { stateChangeListeners.notify(serviceUnit, data, null); @@ -799,11 +799,11 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { if (isTransferCommand(data)) { next = new ServiceUnitStateData( Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data)); - unloadFuture = closeServiceUnit(serviceUnit, true); + unloadFuture = closeServiceUnit(serviceUnit, false); } else { next = new ServiceUnitStateData( Free, null, data.sourceBroker(), getNextVersionId(data)); - unloadFuture = closeServiceUnit(serviceUnit, false); + unloadFuture = closeServiceUnit(serviceUnit, true); } stateChangeListeners.notifyOnCompletion(unloadFuture .thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data) @@ -903,13 +903,13 @@ private CompletableFuture deferGetOwnerRequest(String serviceUnit) { } } - private CompletableFuture closeServiceUnit(String serviceUnit, boolean closeWithoutDisconnectingClients) { + private CompletableFuture closeServiceUnit(String serviceUnit, boolean disconnectClients) { long startTime = System.nanoTime(); MutableInt unloadedTopics = new MutableInt(); NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit); return pulsar.getBrokerService().unloadServiceUnit( bundle, - closeWithoutDisconnectingClients, + disconnectClients, true, pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS) @@ -918,7 +918,7 @@ private CompletableFuture closeServiceUnit(String serviceUnit, boolean return numUnloadedTopics; }) .whenComplete((__, ex) -> { - if (!closeWithoutDisconnectingClients) { + if (disconnectClients) { // clean up topics that failed to unload from the broker ownership cache pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e74801ca455768..da556e4422fe3d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2197,10 +2197,10 @@ public CompletableFuture checkTopicNsOwnership(final String topic) { } public CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit, - boolean closeWithoutDisconnectingClients, + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) { CompletableFuture future = unloadServiceUnit( - serviceUnit, closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect); + serviceUnit, disconnectClients, closeWithoutWaitingClientDisconnect); ScheduledFuture taskTimeout = executor().schedule(() -> { if (!future.isDone()) { log.warn("Unloading of {} has timed out", serviceUnit); @@ -2217,13 +2217,13 @@ public CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit, * Unload all the topic served by the broker service under the given service unit. * * @param serviceUnit - * @param closeWithoutDisconnectingClients don't disconnect clients + * @param disconnectClients disconnect clients * @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect * and forcefully close managed-ledger * @return */ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit, - boolean closeWithoutDisconnectingClients, + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { List> closeFutures = new ArrayList<>(); topics.forEach((name, topicFuture) -> { @@ -2248,7 +2248,7 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit } closeFutures.add(topicFuture .thenCompose(t -> t.isPresent() ? t.get().close( - closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect) + disconnectClients, closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null))); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 4a43649e902329..e0523846423028 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1762,11 +1762,10 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { printSendCommandDebug(send, headersAndPayload); } - - ServiceConfiguration conf = getBrokerService().pulsar().getConfiguration(); + PulsarService pulsar = getBrokerService().pulsar(); if (producer.getTopic().isFenced() - && ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(conf)) { - long ignoredMsgCount = ExtensibleLoadManagerImpl.get(getBrokerService().pulsar()) + && ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar) .getIgnoredSendMsgCounter().incrementAndGet(); if (log.isDebugEnabled()) { log.debug("Ignored send msg from:{}:{} to fenced topic:{} during unloading." diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 41040a9f83036e..6e2eb75a795123 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -197,7 +197,7 @@ CompletableFuture createSubscription(String subscriptionName, Init CompletableFuture close(boolean closeWithoutWaitingClientDisconnect); CompletableFuture close( - boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect); + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect); void checkGC(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index c299652adf8421..0d80de3aa6de37 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -488,19 +488,19 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c @Override public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect) { - return close(false, closeWithoutWaitingClientDisconnect); + return close(true, closeWithoutWaitingClientDisconnect); } /** * Close this topic - close all producers and subscriptions associated with this topic. * - * @param closeWithoutDisconnectingClients don't disconnect clients + * @param disconnectClients disconnect clients * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger * @return Completable future indicating completion of close operation */ @Override public CompletableFuture close( - boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) { + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { CompletableFuture closeFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -519,7 +519,7 @@ public CompletableFuture close( List> futures = new ArrayList<>(); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - if (!closeWithoutDisconnectingClients) { + if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData))) @@ -553,7 +553,7 @@ public CompletableFuture close( // so, execute it in different thread brokerService.executor().execute(() -> { - if (!closeWithoutDisconnectingClients) { + if (disconnectClients) { brokerService.removeTopicFromCache(NonPersistentTopic.this); unregisterTopicPolicyListener(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 66a8f9da0cc2ed..ba40a75651d633 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1449,24 +1449,24 @@ public void deleteLedgerComplete(Object ctx) { } public CompletableFuture close() { - return close(false, false); + return close(true, false); } @Override public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect) { - return close(false, closeWithoutWaitingClientDisconnect); + return close(true, closeWithoutWaitingClientDisconnect); } /** * Close this topic - close all producers and subscriptions associated with this topic. * - * @param closeWithoutDisconnectingClients don't disconnect clients + * @param disconnectClients disconnect clients * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger * @return Completable future indicating completion of close operation */ @Override public CompletableFuture close( - boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) { + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { CompletableFuture closeFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -1489,7 +1489,7 @@ public CompletableFuture close( futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); - if (!closeWithoutDisconnectingClients) { + if (disconnectClients) { futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( brokerService.getPulsar(), topic).thenAccept(lookupData -> producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData))) @@ -1531,21 +1531,21 @@ public CompletableFuture close( ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { - if (closeWithoutDisconnectingClients) { - closeFuture.complete(null); - } else { + if (disconnectClients) { // Everything is now closed, remove the topic from map disposeTopic(closeFuture); + } else { + closeFuture.complete(null); } } @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - if (closeWithoutDisconnectingClients) { - closeFuture.complete(null); - } else { + if (disconnectClients) { disposeTopic(closeFuture); + } else { + closeFuture.complete(null); } } }, null); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index b8873ebb6bbb5c..158488bc84a7b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -662,9 +662,11 @@ public void testCheckOwnershipPresentWithSystemNamespace() throws Exception { @Test public void testMoreThenOneFilter() throws Exception { - TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception"); + // Use a different namespace to avoid flaky test failures + // from unloading the default namespace and the following topic policy lookups at the init state step + String namespace = "public/my-namespace"; + TopicName topicName = TopicName.get(namespace + "/test-filter-has-exception"); NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); - String lookupServiceAddress1 = pulsar1.getLookupServiceAddress(); doReturn(List.of(new MockBrokerFilter() { @Override @@ -682,10 +684,18 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); - assertTrue(brokerLookupData.isPresent()); - assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(brokerLookupData.isPresent()); + assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); + assertEquals(brokerLookupData.get().getPulsarServiceUrl(), + pulsar1.getAdminClient().lookups().lookupTopic(topicName.toString())); + assertEquals(brokerLookupData.get().getPulsarServiceUrl(), + pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString())); + }); + + admin.namespaces().deleteNamespace(namespace, true); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 7361c27155ccd5..600dc17a1b09a7 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -167,7 +167,7 @@ void reconnectLater(Throwable exception) { } public void connectionClosed(ClientCnx cnx) { - connectionClosed(cnx, null, Optional.empty()); + connectionClosed(cnx, Optional.empty(), Optional.empty()); } public void connectionClosed(ClientCnx cnx, Optional initialConnectionDelayMs, Optional hostUrl) {