Skip to content

Commit

Permalink
[improve][broker] defer the ownership checks if the owner is inactive…
Browse files Browse the repository at this point in the history
… (ExtensibleLoadManager) (apache#21855)
  • Loading branch information
heesung-sn committed Jan 5, 2024
1 parent 7632083 commit 857792f
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,27 @@ public boolean isOwner(String serviceUnit) {
return isOwner(serviceUnit, lookupServiceAddress);
}

private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());

return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(Optional::ofNullable)))
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
Expand All @@ -509,18 +530,13 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
return CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker()));
}
case Splitting -> {
return CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
}).thenApply(
broker -> broker == null ? Optional.empty() : Optional.of(broker));
return getActiveOwnerAsync(serviceUnit, state, Optional.empty());
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down Expand Up @@ -781,9 +797,14 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
stateChangeListeners.notifyOnCompletion(
data.force() ? closeServiceUnit(serviceUnit)
: CompletableFuture.completedFuture(0), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand Down Expand Up @@ -1167,38 +1188,43 @@ private void scheduleCleanup(String broker, long delayInSecs) {


private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
String selectedBroker,
Optional<String> selectedBroker,
String inactiveBroker) {


if (selectedBroker.isEmpty()) {
return new ServiceUnitStateData(Free, null, inactiveBroker,
true, getNextVersionId(orphanData));
}

if (orphanData.state() == Splitting) {
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker,
return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker,
true, getNextVersionId(orphanData));
}
}

private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isPresent()) {
var override = getOverrideInactiveBrokerStateData(
orphanData, selectedBroker.get(), inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
} else {
log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. "
if (selectedBroker.isEmpty()) {
log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}."
+ "totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
}
var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
}

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
Expand Down Expand Up @@ -1300,7 +1326,7 @@ private synchronized void doCleanup(String broker) {
broker,
cleanupTime,
orphanServiceUnitCleanupCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());

}
Expand Down Expand Up @@ -1489,7 +1515,7 @@ protected void monitorOwnerships(List<String> brokers) {
inactiveBrokers, inactiveBrokers.size(),
orphanServiceUnitCleanupCnt,
serviceUnitTombstoneCleanupCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThrows;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.expectThrows;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -1557,6 +1558,80 @@ public void testOverrideOrphanStateData()
cleanTableViews();
}

@Test(priority = 19)
public void testActiveGetOwner() throws Exception {


// set the bundle owner is the broker
String broker = lookupServiceAddress2;
String bundle = "public/owned/0xfffffff0_0xffffffff";
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
assertEquals(owner, broker);

// simulate the owner is inactive
var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", spyRegistry , true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1000, true);


// verify getOwnerAsync times out because the owner is inactive now.
long start = System.currentTimeMillis();
var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get());
assertTrue(ex.getCause() instanceof TimeoutException);
assertTrue(System.currentTimeMillis() - start >= 1000);

// simulate ownership cleanup(no selected owner) by the leader channel
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(loadManager).selectAsync(any(), any());
var leaderChannel = channel1;
String leader1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader1, leader2);
if (leader1.equals(lookupServiceAddress2)) {
leaderChannel = channel2;
}
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

// verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);

// simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
.when(loadManager).selectAsync(any(), any());
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
getCleanupJobs(leaderChannel).clear();
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

// verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout
start = System.currentTimeMillis();
assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
assertTrue(System.currentTimeMillis() - start < 20_000);

// test clean-up
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", registry , true);
cleanTableViews();

}

private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,26 @@ public void startBroker() {
brokerContainer.start();
}
});
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/startBrokerCheck";
Awaitility.await().atMost(120, TimeUnit.SECONDS).ignoreExceptions().until(
() -> {
for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(
brokerContainer.getHttpServiceUrl()).build()) {
if (admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
return false;
}
try {
admin.topics().createPartitionedTopic(topicName, 10);
} catch (PulsarAdminException.ConflictException e) {
// expected
}
admin.lookups().lookupPartitionedTopic(topicName);
}
}
return true;
}
);
}
}

Expand Down Expand Up @@ -245,7 +265,7 @@ public void testDeleteNamespace() throws Exception {
assertFalse(admin.namespaces().getNamespaces(DEFAULT_TENANT).contains(namespace));
}

@Test(timeOut = 40 * 1000)
@Test(timeOut = 120 * 1000)
public void testStopBroker() throws Exception {
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/test-stop-broker-topic";

Expand All @@ -261,9 +281,11 @@ public void testStopBroker() throws Exception {
}
}

String broker1 = admin.lookups().lookupTopic(topicName);
Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String broker1 = admin.lookups().lookupTopic(topicName);
assertNotEquals(broker1, broker);
});

assertNotEquals(broker1, broker);
}

@Test(timeOut = 40 * 1000)
Expand Down Expand Up @@ -311,7 +333,7 @@ public void testIsolationPolicy() throws PulsarAdminException {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");

Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), NUM_BROKERS);
Expand Down Expand Up @@ -347,6 +369,7 @@ public void testIsolationPolicy() throws PulsarAdminException {
}

String broker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(broker), 0);

for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
Expand All @@ -355,25 +378,38 @@ public void testIsolationPolicy() throws PulsarAdminException {
}
}

assertEquals(extractBrokerIndex(broker), 0);

broker = admin.lookups().lookupTopic(topic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 2);
}
);

assertEquals(extractBrokerIndex(broker), 1);
Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String ownerBroker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(ownerBroker), 1);
});

for (BrokerContainer container : pulsarCluster.getBrokers()) {
String name = container.getHostName();
if (name.contains("1")) {
container.stop();
}
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 1);
}
);

try {
admin.lookups().lookupTopic(topic);
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
"Failed to select the new owner broker for bundle");
assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle");
}
}

Expand Down

0 comments on commit 857792f

Please sign in to comment.