Skip to content

Commit

Permalink
[fix][broker] monitor the load store isConnected and fix testIsolatio…
Browse files Browse the repository at this point in the history
…nPolicy
  • Loading branch information
heesung-sn committed Dec 21, 2023
1 parent b944f10 commit df03c79
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,10 @@ private CompletableFuture<String> getOrSelectOwnerAsync(ServiceUnitId serviceUni
});
}
assignCounter.incrementSkip();

if (debug(conf, log)) {
log.info("Found the owner: {}.", broker.get());
}
// Already assigned, return it.
return CompletableFuture.completedFuture(broker.get());
});
Expand Down Expand Up @@ -887,19 +891,40 @@ private void monitor() {
// Monitor role
// Periodically check the role in case ZK watcher fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();

if (isChannelOwner) {
if (role != Leader) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the leader role.", role, isChannelOwner);
playLeader();
} else {
if (!topBundlesLoadDataStore.isConnected()) {
log.warn("Leader's topBundlesLoadDataStore is disconnected. Restarting it.");
topBundlesLoadDataStore.init();
}
if (!brokerLoadDataStore.isConnected()) {
log.warn("Leader's brokerLoadDataStore is disconnected. Restarting it.");
brokerLoadDataStore.init();
}
}
} else {
if (role != Follower) {
log.warn("Current role:{} does not match with the channel ownership:{}. "
+ "Playing the follower role.", role, isChannelOwner);
playFollower();
} else {
if (!topBundlesLoadDataStore.isConnected()) {
log.warn("Follower's topBundlesLoadDataStore is disconnected. Restarting it.");
topBundlesLoadDataStore.close();
topBundlesLoadDataStore.startProducer();
}
if (!brokerLoadDataStore.isConnected()) {
log.warn("Follower's brokerLoadDataStore is disconnected. Restarting it.");
brokerLoadDataStore.init();
}
}
}

} catch (Throwable e) {
log.error("Failed to get the channel ownership.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,9 @@ public interface LoadDataStore<T> extends Closeable {
*/
void startProducer() throws LoadDataStoreException;

/**
* Check if this store is connected.
*/
boolean isConnected();

}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,17 @@ public void startProducer() throws LoadDataStoreException {
}
}

@Override
public boolean isConnected() {
if (producer != null) {
return producer.isConnected();
}

// TODO: Currently, table view does not expose isConnected.
// Consider adding tableview.isConnected() in the future
return tableView != null;
}

@Override
public void close() throws IOException {
if (producer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,7 @@ protected CompletableFuture<Void> validateTopicOwnershipAsync(TopicName topicNam
.replaceQueryParam("authoritative", newAuthoritative)
.build();
// Redirect
if (log.isDebugEnabled()) {
log.debug("Redirecting the rest call to {}", redirect);
}
log.info("Redirecting the rest call to {}", redirect);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}).exceptionally(ex -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};
configuration.setPreferLaterVersions(true);
doReturn(configuration).when(mockContext).brokerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};

var topBundleLoadDataStore = new LoadDataStore<TopBundlesLoadData>() {
Expand Down Expand Up @@ -470,6 +475,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};

BrokerRegistry brokerRegistry = mock(BrokerRegistry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,11 @@ public void startTableView() throws LoadDataStoreException {
public void startProducer() throws LoadDataStoreException {

}

@Override
public boolean isConnected() {
return true;
}
};

doReturn(conf).when(ctx).brokerConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.tests.integration.loadbalance;

import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT;
import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -266,9 +265,11 @@ public void testStopBroker() throws Exception {
}
}

String broker1 = admin.lookups().lookupTopic(topicName);
Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
String broker1 = admin.lookups().lookupTopic(topicName);
assertNotEquals(broker1, broker);
});

assertNotEquals(broker1, broker);
}

@Test(timeOut = 40 * 1000)
Expand Down Expand Up @@ -308,15 +309,15 @@ public void testAntiaffinityPolicy() throws PulsarAdminException {
assertEquals(result.size(), NUM_BROKERS);
}

@Test(timeOut = 40 * 1000)
@Test(timeOut = 90 * 1000, invocationCount = 30)
public void testIsolationPolicy() throws Exception {
final String namespaceIsolationPolicyName = "my-isolation-policy";
final String isolationEnabledNameSpace = DEFAULT_TENANT + "/my-isolation-policy" + nsSuffix;
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");

Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), NUM_BROKERS);
Expand Down Expand Up @@ -345,13 +346,25 @@ public void testIsolationPolicy() throws Exception {
}

final String topic = "persistent://" + isolationEnabledNameSpace + "/topic";
try {
admin.topics().createNonPartitionedTopic(topic);
} catch (PulsarAdminException.ConflictException e) {
//expected when retried
}
Awaitility.await().atMost(30, TimeUnit.SECONDS).until(

Check failure on line 349 in tests/integration/src/test/java/org/apache/pulsar/tests/integration/loadbalance/ExtensibleLoadManagerTest.java

View workflow job for this annotation

GitHub Actions / CI - Integration - LoadBalance

ExtensibleLoadManagerTest.testIsolationPolicy

Condition with org.apache.pulsar.tests.integration.loadbalance.ExtensibleLoadManagerTest was not fulfilled within 30 seconds.
() -> {
try {
admin.topics().createNonPartitionedTopic(topic);
return true;
} catch (PulsarAdminException.ConflictException e) {
return true;
//expected when retried
} catch (Exception e) {
return false;
}
});

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String broker = admin.lookups().lookupTopic(topic);
// This isolated topic should be assigned to the primary broker, broker-0
assertEquals(extractBrokerIndex(broker), 0);
});

String broker = admin.lookups().lookupTopic(topic);

for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
Expand All @@ -360,28 +373,29 @@ public void testIsolationPolicy() throws Exception {
}
}

assertEquals(extractBrokerIndex(broker), 0);

broker = admin.lookups().lookupTopic(topic);

final String brokerName = broker;
retryStrategically((test) -> extractBrokerIndex(brokerName) == 1, 100, 200);
assertEquals(extractBrokerIndex(broker), 1);
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String broker = admin.lookups().lookupTopic(topic);
// This isolated topic should be assigned to the secondary broker, broker-1
assertEquals(extractBrokerIndex(broker), 1);
});

for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
if (name.contains("1")) {
container.stop();
}
}
try {
admin.lookups().lookupTopic(topic);
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
"Failed to select the new owner broker for bundle");
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
// This isolated topic cannot be assigned to the remaining broker, broker-2
try {
String broker = admin.lookups().lookupTopic(topic);
log.info("Looked upbroker {}", broker);
} catch (Exception ex) {
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
"Failed to select the new owner broker for bundle");
}
});
}

private String getBrokerUrl(int index) {
Expand Down

0 comments on commit df03c79

Please sign in to comment.