Skip to content

Commit

Permalink
[improve][broker] check if the owner is active when returning ownersh…
Browse files Browse the repository at this point in the history
…ip (ExtensibleLoadManager)
  • Loading branch information
heesung-sn committed Dec 27, 2023
1 parent 0dd1672 commit c43e55f
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,11 @@ public void setLoadReportForceUpdateFlag() {
@Override
public void writeLoadReportOnZookeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
public void writeResourceQuotasToZooKeeper() throws Exception {
// No-op, this operation is not useful, the load data reporter will automatically write.
throw new UnsupportedOperationException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,30 @@ public boolean isOwner(String serviceUnit) {
return isOwner(serviceUnit, lookupServiceAddress);
}

private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {
CompletableFuture<Optional<String>> activeOwner = owner.isPresent()
? brokerRegistry.lookupAsync(owner.get()).thenApply(lookupData -> lookupData.flatMap(__ -> owner))
: CompletableFuture.completedFuture(Optional.empty());


return activeOwner
.thenCompose(broker -> broker
.map(__ -> activeOwner)
.orElseGet(() -> deferGetOwnerRequest(serviceUnit).thenApply(
ownerAfterDeferred -> ownerAfterDeferred == null ? Optional.empty()
: Optional.of(ownerAfterDeferred))))
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed to get active owner broker. serviceUnit:{}, state:{}, owner:{}",
serviceUnit, state, owner, e);
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
});
}

public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
if (!validateChannelState(Started, true)) {
return CompletableFuture.failedFuture(
Expand All @@ -498,18 +522,13 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
ownerLookUpCounters.get(state).getTotal().incrementAndGet();
switch (state) {
case Owned -> {
return CompletableFuture.completedFuture(Optional.of(data.dstBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.dstBroker()));
}
case Splitting -> {
return CompletableFuture.completedFuture(Optional.of(data.sourceBroker()));
return getActiveOwnerAsync(serviceUnit, state, Optional.of(data.sourceBroker()));
}
case Assigning, Releasing -> {
return deferGetOwnerRequest(serviceUnit).whenComplete((__, e) -> {
if (e != null) {
ownerLookUpCounters.get(state).getFailure().incrementAndGet();
}
}).thenApply(
broker -> broker == null ? Optional.empty() : Optional.of(broker));
return getActiveOwnerAsync(serviceUnit, state, Optional.empty());
}
case Init, Free -> {
return CompletableFuture.completedFuture(Optional.empty());
Expand Down Expand Up @@ -812,9 +831,14 @@ private void handleFreeEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(null);
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.sourceBroker())) {
log(null, serviceUnit, data, null);
stateChangeListeners.notifyOnCompletion(
data.force() ? closeServiceUnit(serviceUnit, true)
: CompletableFuture.completedFuture(0), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand Down Expand Up @@ -1202,38 +1226,44 @@ private void scheduleCleanup(String broker, long delayInSecs) {


private ServiceUnitStateData getOverrideInactiveBrokerStateData(ServiceUnitStateData orphanData,
String selectedBroker,
Optional<String> selectedBroker,
String inactiveBroker) {


if (selectedBroker.isEmpty()) {
return new ServiceUnitStateData(Free, null, inactiveBroker,
true, getNextVersionId(orphanData));
}

if (orphanData.state() == Splitting) {
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker.get(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, getNextVersionId(orphanData));
} else {
return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker,
return new ServiceUnitStateData(Owned, selectedBroker.get(), inactiveBroker,
true, getNextVersionId(orphanData));
}
}

private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isPresent()) {
var override = getOverrideInactiveBrokerStateData(
orphanData, selectedBroker.get(), inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
} else {
log.error("Failed to override the ownership serviceUnit:{} orphanData:{}. Empty selected broker. "
if (selectedBroker.isEmpty()) {
log.warn("Empty selected broker for ownership serviceUnit:{} orphanData:{}."
+ "totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
}
var override = getOverrideInactiveBrokerStateData(
orphanData, selectedBroker, inactiveBroker);
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
}

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
Expand Down Expand Up @@ -1335,7 +1365,7 @@ private synchronized void doCleanup(String broker) {
broker,
cleanupTime,
orphanServiceUnitCleanupCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());

}
Expand Down Expand Up @@ -1524,7 +1554,7 @@ protected void monitorOwnerships(List<String> brokers) {
inactiveBrokers, inactiveBrokers.size(),
orphanServiceUnitCleanupCnt,
serviceUnitTombstoneCleanupCnt,
totalCleanupErrorCntStart - totalCleanupErrorCnt.get(),
totalCleanupErrorCnt.get() - totalCleanupErrorCntStart,
printCleanupMetrics());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
Expand Down Expand Up @@ -1560,6 +1561,89 @@ public void testOverrideOrphanStateData()
cleanTableViews();
}

@Test(priority = 19)
public void testActiveGetOwner()
throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException {


// set the bundle owner is the broker
String broker = lookupServiceAddress2;
String bundle = "public/owned/0xfffffff0_0xffffffff";
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
var owner = channel1.getOwnerAsync(bundle).get(5, TimeUnit.SECONDS).get();
assertEquals(owner, broker);

// simulate the owner is inactive
var spyRegistry = spy(new BrokerRegistryImpl(pulsar));
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(spyRegistry).lookupAsync(eq(broker));
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", spyRegistry , true);
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 1000, true);


// verify getOwnerAsync times out because the owner is inactive now.
long start = System.currentTimeMillis();
try {
channel1.getOwnerAsync(bundle).get();
fail();
} catch (Exception e) {
if (e.getCause() instanceof TimeoutException) {
// expected
} else {
fail();
}
}
assertTrue(System.currentTimeMillis() - start >= 1000);

// simulate ownership cleanup(no selected owner) by the leader channel
doReturn(CompletableFuture.completedFuture(Optional.empty()))
.when(loadManager).selectAsync(any(), any());
var leaderChannel = channel1;
String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
String leader2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get();
assertEquals(leader, leader2);
if (leader.equals(lookupServiceAddress2)) {
leaderChannel = channel2;
}
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

// verify the ownership cleanup, and channel's getOwnerAsync returns empty result without timeout
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 20 * 1000, true);
start = System.currentTimeMillis();
assertTrue(channel1.getOwnerAsync(bundle).get().isEmpty());
assertTrue(System.currentTimeMillis() - start < 20_000);

// simulate ownership cleanup(lookupServiceAddress1 selected owner) by the leader channel
overrideTableViews(bundle,
new ServiceUnitStateData(Owned, broker, null, 1));
doReturn(CompletableFuture.completedFuture(Optional.of(lookupServiceAddress1)))
.when(loadManager).selectAsync(any(), any());
leaderChannel.handleMetadataSessionEvent(SessionReestablished);
FieldUtils.writeDeclaredField(leaderChannel, "lastMetadataSessionEventTimestamp",
System.currentTimeMillis() - (MAX_CLEAN_UP_DELAY_TIME_IN_SECS * 1000 + 1000), true);
getCleanupJobs(leaderChannel).clear();
leaderChannel.handleBrokerRegistrationEvent(broker, NotificationType.Deleted);

// verify the ownership cleanup, and channel's getOwnerAsync returns lookupServiceAddress1 without timeout
start = System.currentTimeMillis();
assertEquals(lookupServiceAddress1, channel1.getOwnerAsync(bundle).get().get());
assertTrue(System.currentTimeMillis() - start < 20_000);

// test clean-up
FieldUtils.writeDeclaredField(channel1,
"inFlightStateWaitingTimeInMillis", 30 * 1000, true);
FieldUtils.writeDeclaredField(channel1,
"brokerRegistry", registry , true);
cleanTableViews();

}

private static ConcurrentHashMap<String, CompletableFuture<Optional<String>>> getOwnerRequests(
ServiceUnitStateChannel channel) throws IllegalAccessException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,28 @@ public void startBroker() {
brokerContainer.start();
}
});
String topicName = "persistent://" + DEFAULT_NAMESPACE + "/startBrokerCheck";
Awaitility.await().atMost(120, TimeUnit.SECONDS).until(
() -> {
for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) {
try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(
brokerContainer.getHttpServiceUrl()).build()) {
if (admin.brokers().getActiveBrokers(clusterName).size() != NUM_BROKERS) {
return false;
}
try {
admin.topics().createPartitionedTopic(topicName, 10);
} catch (PulsarAdminException.ConflictException e) {
// expected
}
admin.lookups().lookupPartitionedTopic(topicName);
} catch (Throwable e) {
return false;
}
}
return true;
}
);
}
}

Expand Down Expand Up @@ -259,9 +281,11 @@ public void testStopBroker() throws Exception {
}
}

String broker1 = admin.lookups().lookupTopic(topicName);
Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String broker1 = admin.lookups().lookupTopic(topicName);
assertNotEquals(broker1, broker);
});

assertNotEquals(broker1, broker);
}

@Test(timeOut = 80 * 1000)
Expand Down Expand Up @@ -309,7 +333,7 @@ public void testIsolationPolicy() throws Exception {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");

Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(10, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), NUM_BROKERS);
Expand Down Expand Up @@ -350,14 +374,14 @@ public void testIsolationPolicy() throws Exception {
}
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 2);
}
);

Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
Awaitility.await().atMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> {
String ownerBroker = admin.lookups().lookupTopic(topic);
assertEquals(extractBrokerIndex(ownerBroker), 1);
});
Expand All @@ -369,7 +393,7 @@ public void testIsolationPolicy() throws Exception {
}
}

Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(
Awaitility.await().atMost(30, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(
() -> {
List<String> activeBrokers = admin.brokers().getActiveBrokers();
assertEquals(activeBrokers.size(), 1);
Expand All @@ -380,8 +404,7 @@ public void testIsolationPolicy() throws Exception {
fail();
} catch (Exception ex) {
log.error("Failed to lookup topic: ", ex);
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker",
"Failed to select the new owner broker for bundle");
assertThat(ex.getMessage()).containsAnyOf("Failed to select the new owner broker for bundle");
}
}

Expand Down

0 comments on commit c43e55f

Please sign in to comment.