diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index d3119365ddfea..67bab9b12ffb1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -38,6 +38,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.stream.Collectors; @@ -178,6 +179,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager { private final UnloadCounter unloadCounter = new UnloadCounter(); private final SplitCounter splitCounter = new SplitCounter(); + // Record the ignored send msg count during unloading + @Getter + private final AtomicLong ignoredSendMsgCounter = new AtomicLong(); + // record unload metrics private final AtomicReference> unloadMetrics = new AtomicReference<>(); // record split metrics @@ -262,6 +267,10 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) { return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName()); } + public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) { + return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl; + } + public static ExtensibleLoadManagerImpl get(LoadManager loadManager) { if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) { throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'."); @@ -269,6 +278,15 @@ public static ExtensibleLoadManagerImpl get(LoadManager loadManager) { return loadManagerWrapper.get(); } + /** + * A static util func to get the ExtensibleLoadManagerImpl instance. + * @param pulsar PulsarService + * @return the ExtensibleLoadManagerImpl instance + */ + public static ExtensibleLoadManagerImpl get(PulsarService pulsar) { + return get(pulsar.getLoadManager().get()); + } + public static boolean debug(ServiceConfiguration config, Logger log) { return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled(); } @@ -286,6 +304,37 @@ public static void createSystemTopic(PulsarService pulsar, String topic) throws } } + /** + * Gets the assigned broker for the given topic. + * @param pulsar PulsarService instance + * @param topic Topic Name + * @return the assigned broker's BrokerLookupData instance. Empty, if not assigned by Extensible LoadManager. + */ + public static CompletableFuture> getAssignedBrokerLookupData(PulsarService pulsar, + String topic) { + if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) { + var topicName = TopicName.get(topic); + try { + return pulsar.getNamespaceService().getBundleAsync(topicName) + .thenCompose(bundle -> { + var loadManager = ExtensibleLoadManagerImpl.get(pulsar); + var assigned = loadManager.getServiceUnitStateChannel() + .getAssigned(bundle.toString()); + if (assigned.isPresent()) { + return loadManager.getBrokerRegistry().lookupAsync(assigned.get()); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + } + ); + } catch (Throwable e) { + log.error("Failed to lookup destination broker for topic:{}", topic, e); + return CompletableFuture.completedFuture(Optional.empty()); + } + } + return CompletableFuture.completedFuture(Optional.empty()); + } + @Override public void start() throws PulsarServerException { if (this.started) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java index 6e75fe91a914f..9be76e1b0f44d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannel.java @@ -131,6 +131,16 @@ public interface ServiceUnitStateChannel extends Closeable { */ CompletableFuture> getOwnerAsync(String serviceUnit); + /** + * Gets the assigned broker of the service unit. + * + * + * @param serviceUnit (e.g. bundle)) + * @return the future object of the assigned broker + */ + Optional getAssigned(String serviceUnit); + + /** * Checks if the target broker is the owner of the service unit. * diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java index 3cf16709cde1b..cff45b18ec8b7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java @@ -538,6 +538,41 @@ public CompletableFuture> getOwnerAsync(String serviceUnit) { } } + @Override + public Optional getAssigned(String serviceUnit) { + if (!validateChannelState(Started, true)) { + return Optional.empty(); + } + + ServiceUnitStateData data = tableview.get(serviceUnit); + if (data == null) { + return Optional.empty(); + } + ServiceUnitState state = state(data); + switch (state) { + case Owned, Assigning -> { + return Optional.of(data.dstBroker()); + } + case Releasing -> { + return Optional.ofNullable(data.dstBroker()); + } + case Splitting -> { + return Optional.of(data.sourceBroker()); + } + case Init, Free -> { + return Optional.empty(); + } + case Deleted -> { + log.warn("Trying to get the assigned broker from the deleted serviceUnit:{}", serviceUnit); + return Optional.empty(); + } + default -> { + log.warn("Trying to get the assigned broker from unknown state:{} serviceUnit:{}", state, serviceUnit); + return Optional.empty(); + } + } + } + private long getNextVersionId(String serviceUnit) { var data = tableview.get(serviceUnit); return getNextVersionId(data); @@ -732,14 +767,19 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { if (getOwnerRequest != null) { getOwnerRequest.complete(data.dstBroker()); } - stateChangeListeners.notify(serviceUnit, data, null); + if (isTargetBroker(data.dstBroker())) { - log(null, serviceUnit, data, null); pulsar.getNamespaceService() .onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit)); lastOwnEventHandledAt = System.currentTimeMillis(); - } else if (data.force() && isTargetBroker(data.sourceBroker())) { - closeServiceUnit(serviceUnit); + stateChangeListeners.notify(serviceUnit, data, null); + log(null, serviceUnit, data, null); + } else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) { + stateChangeListeners.notifyOnCompletion( + closeServiceUnit(serviceUnit, true), serviceUnit, data) + .whenComplete((__, e) -> log(e, serviceUnit, data, null)); + } else { + stateChangeListeners.notify(serviceUnit, data, null); } } @@ -755,15 +795,17 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) { private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) { if (isTargetBroker(data.sourceBroker())) { ServiceUnitStateData next; + CompletableFuture unloadFuture; if (isTransferCommand(data)) { next = new ServiceUnitStateData( Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data)); - // TODO: when close, pass message to clients to connect to the new broker + unloadFuture = closeServiceUnit(serviceUnit, false); } else { next = new ServiceUnitStateData( Free, null, data.sourceBroker(), getNextVersionId(data)); + unloadFuture = closeServiceUnit(serviceUnit, true); } - stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit) + stateChangeListeners.notifyOnCompletion(unloadFuture .thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data) .whenComplete((__, e) -> log(e, serviceUnit, data, next)); } @@ -861,12 +903,13 @@ private CompletableFuture deferGetOwnerRequest(String serviceUnit) { } } - private CompletableFuture closeServiceUnit(String serviceUnit) { + private CompletableFuture closeServiceUnit(String serviceUnit, boolean disconnectClients) { long startTime = System.nanoTime(); MutableInt unloadedTopics = new MutableInt(); NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit); return pulsar.getBrokerService().unloadServiceUnit( bundle, + disconnectClients, true, pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS) @@ -875,8 +918,10 @@ private CompletableFuture closeServiceUnit(String serviceUnit) { return numUnloadedTopics; }) .whenComplete((__, ex) -> { - // clean up topics that failed to unload from the broker ownership cache - pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle); + if (disconnectClients) { + // clean up topics that failed to unload from the broker ownership cache + pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle); + } pulsar.getNamespaceService().onNamespaceBundleUnload(bundle); double unloadBundleTime = TimeUnit.NANOSECONDS .toMillis((System.nanoTime() - startTime)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java index e7cf23a042750..cdedac1136e4d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnedBundle.java @@ -136,7 +136,7 @@ public CompletableFuture handleUnloadRequest(PulsarService pulsar, long ti return pulsar.getNamespaceService().getOwnershipCache() .updateBundleState(this.bundle, false) .thenCompose(v -> pulsar.getBrokerService().unloadServiceUnit( - bundle, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit)) + bundle, true, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit)) .handle((numUnloadedTopics, ex) -> { if (ex != null) { // ignore topic-close failure to unload bundle diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 4e176c4fc0bd9..bc2b358200c87 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -956,6 +956,11 @@ protected void checkTopicFenced() throws BrokerServiceException { } } + @Override + public boolean isFenced() { + return isFenced; + } + protected CompletableFuture internalAddProducer(Producer producer) { if (isProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 93337bafd906f..da556e4422fe3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2197,8 +2197,10 @@ public CompletableFuture checkTopicNsOwnership(final String topic) { } public CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit, + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) { - CompletableFuture future = unloadServiceUnit(serviceUnit, closeWithoutWaitingClientDisconnect); + CompletableFuture future = unloadServiceUnit( + serviceUnit, disconnectClients, closeWithoutWaitingClientDisconnect); ScheduledFuture taskTimeout = executor().schedule(() -> { if (!future.isDone()) { log.warn("Unloading of {} has timed out", serviceUnit); @@ -2215,11 +2217,13 @@ public CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit, * Unload all the topic served by the broker service under the given service unit. * * @param serviceUnit + * @param disconnectClients disconnect clients * @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect * and forcefully close managed-ledger * @return */ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit, + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { List> closeFutures = new ArrayList<>(); topics.forEach((name, topicFuture) -> { @@ -2243,7 +2247,8 @@ private CompletableFuture unloadServiceUnit(NamespaceBundle serviceUnit } } closeFutures.add(topicFuture - .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) + .thenCompose(t -> t.isPresent() ? t.get().close( + disconnectClients, closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null))); } }); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index acaa7c02d197d..1a4490d6b1b46 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -39,6 +39,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.Topic.PublishContext; @@ -699,17 +700,21 @@ public void closeNow(boolean removeFromTopic) { isDisconnecting.set(false); } + public CompletableFuture disconnect() { + return disconnect(Optional.empty()); + } + /** * It closes the producer from server-side and sends command to client to disconnect producer from existing * connection without closing that connection. * * @return Completable future indicating completion of producer close */ - public CompletableFuture disconnect() { + public CompletableFuture disconnect(Optional assignedBrokerLookupData) { if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) { log.info("Disconnecting producer: {}", this); cnx.execute(() -> { - cnx.closeProducer(this); + cnx.closeProducer(this, assignedBrokerLookupData); closeNow(true); }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index da51eaea8fb39..e052384642302 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -81,6 +81,8 @@ import org.apache.pulsar.broker.authentication.AuthenticationState; import org.apache.pulsar.broker.intercept.BrokerInterceptor; import org.apache.pulsar.broker.limiter.ConnectionController; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; @@ -1598,7 +1600,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { remoteAddress, producerId); } producers.remove(producerId, producerFuture); - closeProducer(producerId, -1L); + closeProducer(producerId, -1L, Optional.empty()); return null; } } @@ -1760,6 +1762,19 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { printSendCommandDebug(send, headersAndPayload); } + PulsarService pulsar = getBrokerService().pulsar(); + if (producer.getTopic().isFenced() + && ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) { + long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar) + .getIgnoredSendMsgCounter().incrementAndGet(); + if (log.isDebugEnabled()) { + log.debug("Ignored send msg from:{}:{} to fenced topic:{} during unloading." + + " Ignored message count:{}.", + remoteAddress, send.getProducerId(), producer.getTopic().getName(), ignoredMsgCount); + } + return; + } + if (producer.isNonPersistentTopic()) { // avoid processing non-persist message if reached max concurrent-message limit if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) { @@ -3020,13 +3035,26 @@ protected void interceptCommand(BaseCommand command) throws InterceptException { public void closeProducer(Producer producer) { // removes producer-connection from map and send close command to producer safelyRemoveProducer(producer); - closeProducer(producer.getProducerId(), producer.getEpoch()); + closeProducer(producer.getProducerId(), producer.getEpoch(), Optional.empty()); + } + @Override + public void closeProducer(Producer producer, Optional assignedBrokerLookupData) { + // removes producer-connection from map and send close command to producer + safelyRemoveProducer(producer); + closeProducer(producer.getProducerId(), producer.getEpoch(), assignedBrokerLookupData); } - public void closeProducer(long producerId, long epoch) { + private void closeProducer(long producerId, long epoch, Optional assignedBrokerLookupData) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { - writeAndFlush(Commands.newCloseProducer(producerId, -1L)); + if (assignedBrokerLookupData.isPresent()) { + writeAndFlush(Commands.newCloseProducer(producerId, -1L, + assignedBrokerLookupData.get().pulsarServiceUrl(), + assignedBrokerLookupData.get().pulsarServiceUrlTls())); + } else { + writeAndFlush(Commands.newCloseProducer(producerId, -1L)); + } + // The client does not necessarily know that the producer is closed, but the connection is still // active, and there could be messages in flight already. We want to ignore these messages for a time // because they are expected. Once the interval has passed, the client should have received the @@ -3049,7 +3077,7 @@ public void closeConsumer(Consumer consumer) { closeConsumer(consumer.consumerId()); } - public void closeConsumer(long consumerId) { + private void closeConsumer(long consumerId) { if (getRemoteEndpointProtocolVersion() >= v5.getValue()) { writeAndFlush(Commands.newCloseConsumer(consumerId, -1L)); } else { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index c697639ff4fa1..6e2eb75a79512 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -196,6 +196,9 @@ CompletableFuture createSubscription(String subscriptionName, Init CompletableFuture close(boolean closeWithoutWaitingClientDisconnect); + CompletableFuture close( + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect); + void checkGC(); CompletableFuture checkClusterMigration(); @@ -331,6 +334,8 @@ default boolean isSystemTopic() { boolean isPersistent(); + boolean isFenced(); + /* ------ Transaction related ------ */ /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java index 94f934fec681e..c09d63a9232eb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TransportCnx.java @@ -21,8 +21,10 @@ import io.netty.handler.codec.haproxy.HAProxyMessage; import io.netty.util.concurrent.Promise; import java.net.SocketAddress; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; public interface TransportCnx { @@ -57,6 +59,7 @@ public interface TransportCnx { void removedProducer(Producer producer); void closeProducer(Producer producer); + void closeProducer(Producer producer, Optional assignedBrokerLookupData); void cancelPublishRateLimiting(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index f3857b5ad2aef..0d80de3aa6de3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.PulsarServerException; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.AbstractReplicator; @@ -484,14 +485,22 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, boolean c return deleteFuture; } + + @Override + public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect) { + return close(true, closeWithoutWaitingClientDisconnect); + } + /** * Close this topic - close all producers and subscriptions associated with this topic. * + * @param disconnectClients disconnect clients * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger * @return Completable future indicating completion of close operation */ @Override - public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect) { + public CompletableFuture close( + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { CompletableFuture closeFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -510,7 +519,12 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect List> futures = new ArrayList<>(); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); - producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (disconnectClients) { + futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( + brokerService.getPulsar(), topic).thenAccept(lookupData -> + producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData))) + )); + } if (topicPublishRateLimiter != null) { topicPublishRateLimiter.close(); } @@ -538,9 +552,13 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect // unload topic iterates over topics map and removing from the map with the same thread creates deadlock. // so, execute it in different thread brokerService.executor().execute(() -> { - brokerService.removeTopicFromCache(NonPersistentTopic.this); - unregisterTopicPolicyListener(); + + if (disconnectClients) { + brokerService.removeTopicFromCache(NonPersistentTopic.this); + unregisterTopicPolicyListener(); + } closeFuture.complete(null); + }); }).exceptionally(exception -> { log.error("[{}] Error closing topic", topic, exception); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index a8a921f3b624c..ba40a75651d63 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -84,6 +84,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; +import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl; import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateCompactionStrategy; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -1448,17 +1449,24 @@ public void deleteLedgerComplete(Object ctx) { } public CompletableFuture close() { - return close(false); + return close(true, false); + } + + @Override + public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect) { + return close(true, closeWithoutWaitingClientDisconnect); } /** * Close this topic - close all producers and subscriptions associated with this topic. * + * @param disconnectClients disconnect clients * @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger * @return Completable future indicating completion of close operation */ @Override - public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect) { + public CompletableFuture close( + boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) { CompletableFuture closeFuture = new CompletableFuture<>(); lock.writeLock().lock(); @@ -1481,7 +1489,12 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect futures.add(transactionBuffer.closeAsync()); replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect())); shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect())); - producers.values().forEach(producer -> futures.add(producer.disconnect())); + if (disconnectClients) { + futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData( + brokerService.getPulsar(), topic).thenAccept(lookupData -> + producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData))) + )); + } if (topicPublishRateLimiter != null) { topicPublishRateLimiter.close(); } @@ -1518,14 +1531,22 @@ public CompletableFuture close(boolean closeWithoutWaitingClientDisconnect ledger.asyncClose(new CloseCallback() { @Override public void closeComplete(Object ctx) { - // Everything is now closed, remove the topic from map - disposeTopic(closeFuture); + if (disconnectClients) { + // Everything is now closed, remove the topic from map + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } } @Override public void closeFailed(ManagedLedgerException exception, Object ctx) { log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception); - disposeTopic(closeFuture); + if (disconnectClients) { + disposeTopic(closeFuture); + } else { + closeFuture.complete(null); + } } }, null); }).exceptionally(exception -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java index c7cd55e183bee..158488bc84a7b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java @@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -98,6 +99,8 @@ import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.testcontext.PulsarTestContext; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.LookupService; import org.apache.pulsar.client.impl.TableViewImpl; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; @@ -119,6 +122,7 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -126,6 +130,7 @@ */ @Slf4j @Test(groups = "broker") +@SuppressWarnings("unchecked") public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { private PulsarService pulsar1; @@ -142,6 +147,8 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest { private final String defaultTestNamespace = "public/test"; + private LookupService lookupService; + @BeforeClass @Override public void setup() throws Exception { @@ -190,6 +197,7 @@ public void setup() throws Exception { admin.namespaces().createNamespace(defaultTestNamespace); admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace, Sets.newHashSet(this.conf.getClusterName())); + lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true); } } @@ -203,9 +211,10 @@ protected void cleanup() throws Exception { } @BeforeMethod(alwaysRun = true) - protected void initializeState() throws PulsarAdminException { + protected void initializeState() throws PulsarAdminException, IllegalAccessException { admin.namespaces().unload(defaultTestNamespace); reset(primaryLoadManager, secondaryLoadManager); + FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true); } @Test @@ -392,7 +401,7 @@ public boolean test(NamespaceBundle namespaceBundle) { admin.namespaces().unloadNamespaceBundle(topicName.getNamespace(), bundle.getBundleRange(), dstBrokerUrl); Awaitility.await().untilAsserted(() -> { assertEquals(onloadCount.get(), 3); - assertEquals(unloadCount.get(), 2); + assertEquals(unloadCount.get(), 3); //one from releasing and one from owned }); assertEquals(admin.lookups().lookupTopic(topicName.toString()), dstBrokerServiceUrl); @@ -406,6 +415,109 @@ public boolean test(NamespaceBundle namespaceBundle) { assertTrue(ex.getMessage().contains("cannot be transfer to same broker")); } } + @DataProvider(name = "isPersistentTopicTest") + public Object[][] isPersistentTopicTest() { + return new Object[][] { { true }, { false }}; + } + @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest") + public void testTransferClientReconnectionWithoutLookup(boolean isPersistentTopicTest) throws Exception { + String topicType = isPersistentTopicTest? "persistent" : "non-persistent"; + String topic = topicType + "://" + defaultTestNamespace + "/test-transfer-client-reconnect"; + TopicName topicName = TopicName.get(topic); + + AtomicInteger lookupCount = new AtomicInteger(); + var lookup = spyLookupService(lookupCount, topicName); + var producer = pulsarClient.newProducer().topic(topic).create(); + int lookupCountBeforeUnload = lookupCount.get(); + + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get(); + String broker = admin.lookups().lookupTopic(topic); + String dstBrokerUrl = pulsar1.getLookupServiceAddress(); + String dstBrokerServiceUrl; + if (broker.equals(pulsar1.getBrokerServiceUrl())) { + dstBrokerUrl = pulsar2.getLookupServiceAddress(); + dstBrokerServiceUrl = pulsar2.getBrokerServiceUrl(); + } else { + dstBrokerServiceUrl = pulsar1.getBrokerServiceUrl(); + } + checkOwnershipState(broker, bundle); + + final String finalDstBrokerUrl = dstBrokerUrl; + CompletableFuture.runAsync(() -> { + try { + admin.namespaces().unloadNamespaceBundle( + defaultTestNamespace, bundle.getBundleRange(), finalDstBrokerUrl); + } catch (PulsarAdminException e) { + throw new RuntimeException(e); + } + } + ); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + try { + producer.send("hi".getBytes()); + String newOwner = admin.lookups().lookupTopic(topic); + assertEquals(dstBrokerServiceUrl, newOwner); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } catch (PulsarAdminException e) { + throw new RuntimeException(e); + } + }); + assertTrue(producer.isConnected()); + verify(lookup, times(lookupCountBeforeUnload)).getBroker(topicName); + producer.close(); + } + + + + @Test(timeOut = 30 * 1000, dataProvider = "isPersistentTopicTest") + public void testUnloadClientReconnectionWithLookup(boolean isPersistentTopicTest) throws Exception { + String topicType = isPersistentTopicTest? "persistent" : "non-persistent"; + String topic = topicType + "://" + defaultTestNamespace + "/test-unload-client-reconnect-" + + isPersistentTopicTest; + TopicName topicName = TopicName.get(topic); + + AtomicInteger lookupCount = new AtomicInteger(); + var lookup = spyLookupService(lookupCount, topicName); + + var producer = pulsarClient.newProducer().topic(topic).create(); + int lookupCountBeforeUnload = lookupCount.get(); + + NamespaceBundle bundle = getBundleAsync(pulsar1, TopicName.get(topic)).get(); + CompletableFuture.runAsync(() -> { + try { + admin.namespaces().unloadNamespaceBundle( + defaultTestNamespace, bundle.getBundleRange()); + } catch (PulsarAdminException e) { + throw new RuntimeException(e); + } + } + ); + MutableInt sendCount = new MutableInt(); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + try { + producer.send("hi".getBytes()); + assertEquals(sendCount.incrementAndGet(), 10); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + }); + assertTrue(producer.isConnected()); + verify(lookup, times(lookupCountBeforeUnload + 1)).getBroker(topicName); + producer.close(); + } + + private LookupService spyLookupService(AtomicInteger lookupCount, TopicName topicName) + throws IllegalAccessException { + var lookup = spy(lookupService); + FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookup, true); + doAnswer(invocationOnMock -> { + lookupCount.incrementAndGet(); + return invocationOnMock.callRealMethod(); + }).when(lookup).getBroker(topicName); + return lookup; + } private void checkOwnershipState(String broker, NamespaceBundle bundle) throws ExecutionException, InterruptedException { @@ -550,9 +662,11 @@ public void testCheckOwnershipPresentWithSystemNamespace() throws Exception { @Test public void testMoreThenOneFilter() throws Exception { - TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception"); + // Use a different namespace to avoid flaky test failures + // from unloading the default namespace and the following topic policy lookups at the init state step + String namespace = "public/my-namespace"; + TopicName topicName = TopicName.get(namespace + "/test-filter-has-exception"); NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get(); - String lookupServiceAddress1 = pulsar1.getLookupServiceAddress(); doReturn(List.of(new MockBrokerFilter() { @Override @@ -570,10 +684,18 @@ public CompletableFuture> filterAsync(Map brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get(); - assertTrue(brokerLookupData.isPresent()); - assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + assertTrue(brokerLookupData.isPresent()); + assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress()); + assertEquals(brokerLookupData.get().getPulsarServiceUrl(), + pulsar1.getAdminClient().lookups().lookupTopic(topicName.toString())); + assertEquals(brokerLookupData.get().getPulsarServiceUrl(), + pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString())); + }); + + admin.namespaces().deleteNamespace(namespace, true); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index 31705264fba6c..df1bfd12d3eaf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -101,6 +101,7 @@ import org.testng.annotations.Test; @Test(groups = "broker") +@SuppressWarnings("unchecked") public class ServiceUnitStateChannelTest extends MockedPulsarServiceBaseTest { private PulsarService pulsar1; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index 9e3d9e3a41340..c92127457aaf2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -102,7 +102,7 @@ public void setup() throws Exception { nsService = mock(NamespaceService.class); brokerService = mock(BrokerService.class); doReturn(CompletableFuture.completedFuture(1)).when(brokerService) - .unloadServiceUnit(any(), anyBoolean(), anyLong(), any()); + .unloadServiceUnit(any(), anyBoolean(), anyBoolean(), anyLong(), any()); doReturn(config).when(pulsar).getConfiguration(); doReturn(nsService).when(pulsar).getNamespaceService(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index f3e8b2354b344..ba79f1b824765 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -34,6 +34,7 @@ import io.netty.util.concurrent.Promise; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.net.URI; import java.net.URISyntaxException; import java.nio.channels.ClosedChannelException; import java.util.Arrays; @@ -801,11 +802,32 @@ protected void handleError(CommandError error) { @Override protected void handleCloseProducer(CommandCloseProducer closeProducer) { - log.info("[{}] Broker notification of Closed producer: {}", remoteAddress, closeProducer.getProducerId()); final long producerId = closeProducer.getProducerId(); ProducerImpl producer = producers.remove(producerId); if (producer != null) { - producer.connectionClosed(this); + if (closeProducer.hasAssignedBrokerServiceUrl() || closeProducer.hasAssignedBrokerServiceUrlTls()) { + try { + final URI uri = new URI(producer.client.conf.isUseTls() + ? closeProducer.getAssignedBrokerServiceUrlTls() + : closeProducer.getAssignedBrokerServiceUrl()); + log.info("[{}] Broker notification of Closed producer: {}. Redirecting to {}.", + remoteAddress, closeProducer.getProducerId(), uri); + producer.getConnectionHandler().connectionClosed( + this, Optional.of(0L), Optional.of(uri)); + } catch (Throwable e) { + log.error("[{}] Invalid redirect url {}/{} for {}", remoteAddress, + closeProducer.hasAssignedBrokerServiceUrl() + ? closeProducer.getAssignedBrokerServiceUrl() : "", + closeProducer.hasAssignedBrokerServiceUrlTls() + ? closeProducer.getAssignedBrokerServiceUrlTls() : "", + closeProducer.getRequestId(), e); + producer.connectionClosed(this); + } + } else { + log.info("[{}] Broker notification of Closed producer: {}.", + remoteAddress, closeProducer.getProducerId()); + producer.connectionClosed(this); + } } else { log.warn("Producer with id {} not found while closing producer ", producerId); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java index 6979914274e85..600dc17a1b09a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java @@ -19,6 +19,8 @@ package org.apache.pulsar.client.impl; import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -66,6 +68,10 @@ protected ConnectionHandler(HandlerState state, Backoff backoff, Connection conn } protected void grabCnx() { + grabCnx(Optional.empty()); + } + + protected void grabCnx(Optional hostURI) { if (!duringConnect.compareAndSet(false, true)) { log.info("[{}] [{}] Skip grabbing the connection since there is a pending connection", state.topic, state.getHandlerName()); @@ -87,7 +93,12 @@ protected void grabCnx() { try { CompletableFuture cnxFuture; - if (state.redirectedClusterURI != null) { + if (hostURI.isPresent()) { + InetSocketAddress address = InetSocketAddress.createUnresolved( + hostURI.get().getHost(), + hostURI.get().getPort()); + cnxFuture = state.client.getConnection(address, address, randomKeyForSelectConnection); + } else if (state.redirectedClusterURI != null) { if (state.topic == null) { InetSocketAddress address = InetSocketAddress.createUnresolved(state.redirectedClusterURI.getHost(), state.redirectedClusterURI.getPort()); @@ -156,6 +167,10 @@ void reconnectLater(Throwable exception) { } public void connectionClosed(ClientCnx cnx) { + connectionClosed(cnx, Optional.empty(), Optional.empty()); + } + + public void connectionClosed(ClientCnx cnx, Optional initialConnectionDelayMs, Optional hostUrl) { lastConnectionClosedTimestamp = System.currentTimeMillis(); duringConnect.set(false); state.client.getCnxPool().releaseConnection(cnx); @@ -165,14 +180,14 @@ public void connectionClosed(ClientCnx cnx) { state.topic, state.getHandlerName(), state.getState()); return; } - long delayMs = backoff.next(); + long delayMs = initialConnectionDelayMs.orElse(backoff.next()); state.setState(State.Connecting); log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", state.topic, state.getHandlerName(), cnx.channel(), delayMs / 1000.0); state.client.timer().newTimeout(timeout -> { log.info("[{}] [{}] Reconnecting after timeout", state.topic, state.getHandlerName()); - grabCnx(); + grabCnx(hostUrl); }, delayMs, TimeUnit.MILLISECONDS); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index cf0cd820a6d10..ff116d2406b40 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -60,6 +60,7 @@ import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn; import org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse; import org.apache.pulsar.common.api.proto.CommandAuthChallenge; +import org.apache.pulsar.common.api.proto.CommandCloseProducer; import org.apache.pulsar.common.api.proto.CommandConnect; import org.apache.pulsar.common.api.proto.CommandConnected; import org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse; @@ -761,11 +762,28 @@ public static ByteBuf newTopicMigrated(ResourceType type, long resourceId, Strin return serializeWithSize(cmd); } - public static ByteBuf newCloseProducer(long producerId, long requestId) { + public static ByteBuf newCloseProducer( + long producerId, long requestId) { + return newCloseProducer(producerId, requestId, null, null); + } + + public static ByteBuf newCloseProducer( + long producerId, long requestId, String assignedBrokerUrl, String assignedBrokerUrlTls) { BaseCommand cmd = localCmd(Type.CLOSE_PRODUCER); - cmd.setCloseProducer() - .setProducerId(producerId) - .setRequestId(requestId); + CommandCloseProducer commandCloseProducer = cmd.setCloseProducer() + .setProducerId(producerId) + .setRequestId(requestId); + + if (assignedBrokerUrl != null) { + commandCloseProducer + .setAssignedBrokerServiceUrl(assignedBrokerUrl); + } + + if (assignedBrokerUrlTls != null){ + commandCloseProducer + .setAssignedBrokerServiceUrlTls(assignedBrokerUrlTls); + } + return serializeWithSize(cmd); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index afe193eeb7e9d..2c350aaf8a10e 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -641,6 +641,8 @@ message CommandTopicMigrated { message CommandCloseProducer { required uint64 producer_id = 1; required uint64 request_id = 2; + optional string assignedBrokerServiceUrl = 3; + optional string assignedBrokerServiceUrlTls = 4; } message CommandCloseConsumer {