diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java index 581183cf95ad3..9e0aa4c40ea95 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java @@ -504,6 +504,10 @@ private CompletableFuture getOrSelectOwnerAsync(ServiceUnitId serviceUni }); } assignCounter.incrementSkip(); + + if (debug(conf, log)) { + log.info("Found the owner: {}.", broker.get()); + } // Already assigned, return it. return CompletableFuture.completedFuture(broker.get()); }); @@ -887,19 +891,40 @@ private void monitor() { // Monitor role // Periodically check the role in case ZK watcher fails. var isChannelOwner = serviceUnitStateChannel.isChannelOwner(); + if (isChannelOwner) { if (role != Leader) { log.warn("Current role:{} does not match with the channel ownership:{}. " + "Playing the leader role.", role, isChannelOwner); playLeader(); + } else { + if (!topBundlesLoadDataStore.isConnected()) { + log.warn("Leader's topBundlesLoadDataStore is disconnected. Restarting it."); + topBundlesLoadDataStore.init(); + } + if (!brokerLoadDataStore.isConnected()) { + log.warn("Leader's brokerLoadDataStore is disconnected. Restarting it."); + brokerLoadDataStore.init(); + } } } else { if (role != Follower) { log.warn("Current role:{} does not match with the channel ownership:{}. " + "Playing the follower role.", role, isChannelOwner); playFollower(); + } else { + if (!topBundlesLoadDataStore.isConnected()) { + log.warn("Follower's topBundlesLoadDataStore is disconnected. Restarting it."); + topBundlesLoadDataStore.close(); + topBundlesLoadDataStore.startProducer(); + } + if (!brokerLoadDataStore.isConnected()) { + log.warn("Follower's brokerLoadDataStore is disconnected. Restarting it."); + brokerLoadDataStore.init(); + } } } + } catch (Throwable e) { log.error("Failed to get the channel ownership.", e); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java index 18e949537dedb..cd1561cb70e2d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerWrapper.java @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() { @Override public void writeLoadReportOnZookeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override public void writeResourceQuotasToZooKeeper() throws Exception { // No-op, this operation is not useful, the load data reporter will automatically write. - throw new UnsupportedOperationException(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java index a7deeeaad8a5c..cdb91614e234b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/LoadDataStore.java @@ -103,4 +103,9 @@ public interface LoadDataStore extends Closeable { */ void startProducer() throws LoadDataStoreException; + /** + * Check if this store is connected. + */ + boolean isConnected(); + } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java index ead0a7081fd37..5e811cac47d74 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.java @@ -128,6 +128,17 @@ public void startProducer() throws LoadDataStoreException { } } + @Override + public boolean isConnected() { + if (producer != null) { + return producer.isConnected(); + } + + // TODO: Currently, table view does not expose isConnected. + // Consider adding tableview.isConnected() in the future + return tableView != null; + } + @Override public void close() throws IOException { if (producer != null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index e8192cde3fdf3..74f04d3d75165 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -782,9 +782,7 @@ protected CompletableFuture validateTopicOwnershipAsync(TopicName topicNam .replaceQueryParam("authoritative", newAuthoritative) .build(); // Redirect - if (log.isDebugEnabled()) { - log.debug("Redirecting the rest call to {}", redirect); - } + log.info("Redirecting the rest call to {}", redirect); throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); } }).exceptionally(ex -> { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java index a120ef473e9a5..1dc4a121cdc91 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/filter/BrokerFilterTestBase.java @@ -109,6 +109,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; configuration.setPreferLaterVersions(true); doReturn(configuration).when(mockContext).brokerConfiguration(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java index 4eec612477758..440abf766e8d6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/scheduler/TransferShedderTest.java @@ -402,6 +402,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; var topBundleLoadDataStore = new LoadDataStore() { @@ -470,6 +475,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; BrokerRegistry brokerRegistry = mock(BrokerRegistry.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java index b1e09bf2f3afb..343c54b0085fb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/strategy/LeastResourceUsageWithWeightTest.java @@ -271,6 +271,11 @@ public void startTableView() throws LoadDataStoreException { public void startProducer() throws LoadDataStoreException { } + + @Override + public boolean isConnected() { + return true; + } }; doReturn(conf).when(ctx).brokerConfiguration(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java index bbb92f6a6cf4c..a910cc507e352 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java @@ -19,7 +19,6 @@ package org.apache.pulsar.tests.integration.loadbalance; import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT; -import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically; import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -266,9 +265,11 @@ public void testStopBroker() throws Exception { } } - String broker1 = admin.lookups().lookupTopic(topicName); + Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> { + String broker1 = admin.lookups().lookupTopic(topicName); + assertNotEquals(broker1, broker); + }); - assertNotEquals(broker1, broker); } @Test(timeOut = 40 * 1000) @@ -308,7 +309,7 @@ public void testAntiaffinityPolicy() throws PulsarAdminException { assertEquals(result.size(), NUM_BROKERS); } - @Test(timeOut = 40 * 1000) + @Test(timeOut = 90 * 1000, invocationCount = 30) public void testIsolationPolicy() throws Exception { final String namespaceIsolationPolicyName = "my-isolation-policy"; final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix; @@ -316,7 +317,7 @@ public void testIsolationPolicy() throws Exception { parameters1.put("min_limit", "1"); parameters1.put("usage_threshold", "100"); - Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted( () -> { List activeBrokers = admin.brokers().getActiveBrokers(); assertEquals(activeBrokers.size(), NUM_BROKERS); @@ -345,13 +346,25 @@ public void testIsolationPolicy() throws Exception { } final String topic = "persistent://" + isolationEnabledNameSpace + "/topic"; - try { - admin.topics().createNonPartitionedTopic(topic); - } catch (PulsarAdminException.ConflictException e) { - //expected when retried - } + Awaitility.await().atMost(30, TimeUnit.SECONDS).until( + () -> { + try { + admin.topics().createNonPartitionedTopic(topic); + return true; + } catch (PulsarAdminException.ConflictException e) { + return true; + //expected when retried + } catch (Exception e) { + return false; + } + }); + + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String broker = admin.lookups().lookupTopic(topic); + // This isolated topic should be assigned to the primary broker, broker-0 + assertEquals(extractBrokerIndex(broker), 0); + }); - String broker = admin.lookups().lookupTopic(topic); for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -360,13 +373,11 @@ public void testIsolationPolicy() throws Exception { } } - assertEquals(extractBrokerIndex(broker), 0); - - broker = admin.lookups().lookupTopic(topic); - - final String brokerName = broker; - retryStrategically((test) -> extractBrokerIndex(brokerName) == 1, 100, 200); - assertEquals(extractBrokerIndex(broker), 1); + Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { + String broker = admin.lookups().lookupTopic(topic); + // This isolated topic should be assigned to the secondary broker, broker-1 + assertEquals(extractBrokerIndex(broker), 1); + }); for (BrokerContainer container : pulsarCluster.getBrokers()) { String name = container.getHostName(); @@ -374,14 +385,17 @@ public void testIsolationPolicy() throws Exception { container.stop(); } } - try { - admin.lookups().lookupTopic(topic); - fail(); - } catch (Exception ex) { - log.error("Failed to lookup topic: ", ex); - assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker", - "Failed to select the new owner broker for bundle"); - } + + Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { + // This isolated topic cannot be assigned to the remaining broker, broker-2 + try { + String broker = admin.lookups().lookupTopic(topic); + log.info("Looked upbroker {}", broker); + } catch (Exception ex) { + assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker", + "Failed to select the new owner broker for bundle"); + } + }); } private String getBrokerUrl(int index) {