From b98f7fff2dcf41783b102eb9174db0cfe399e495 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 2 Oct 2024 19:00:08 +0800 Subject: [PATCH 1/2] [improve][broker] Add error logs if the broker failed to register itself for metadata node deletion --- .../broker/loadbalance/extensions/BrokerRegistryImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index a13b332e6eb5f..0a9f562f461b3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -220,7 +220,11 @@ private void handleMetadataStoreNotification(Notification t) { // is expired. In this case, we should register again. final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1); if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) { - registerAsync(); + registerAsync().exceptionally(e -> { + log.error("[{}] Failed to register self to {} (state: {})", getBrokerId(), brokerIdKeyPath, + state.get(), e); + return null; + }); } if (listeners.isEmpty()) { return; From f0b236f0b3840be41818fe06cd504cc780ec84aa Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 2 Oct 2024 19:09:43 +0800 Subject: [PATCH 2/2] Add the retry logic --- .../extensions/BrokerRegistryImpl.java | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java index 0a9f562f461b3..7a4a12a242321 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/BrokerRegistryImpl.java @@ -220,11 +220,7 @@ private void handleMetadataStoreNotification(Notification t) { // is expired. In this case, we should register again. final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1); if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) { - registerAsync().exceptionally(e -> { - log.error("[{}] Failed to register self to {} (state: {})", getBrokerId(), brokerIdKeyPath, - state.get(), e); - return null; - }); + registerWithRetry(); } if (listeners.isEmpty()) { return; @@ -239,6 +235,18 @@ private void handleMetadataStoreNotification(Notification t) { } } + private void registerWithRetry() { + registerAsync().exceptionallyAsync(e -> { + log.error("[{}] Failed to register self to {} (state: {}), retry registering", getBrokerId(), + brokerIdKeyPath, state.get(), e); + // Keep retrying registering itself with a fixed delay time because all lookup operations rely on the + // successful registering. + // Don't use the load manager executor because that thread might be blocked by some lookup operations. + pulsar.getExecutor().schedule(this::registerWithRetry, 100, TimeUnit.MILLISECONDS); + return null; + }); + } + @VisibleForTesting protected static boolean isVerifiedNotification(Notification t) { return t.getPath().startsWith(LOADBALANCE_BROKERS_ROOT + "/")