Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] do not clean owned bundles from inactive source brokers… #75

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS

private static final String ELECTION_ROOT = "/loadbalance/extension/leader";

private static final Set<String> INTERNAL_TOPICS =
Set.of(BROKER_LOAD_DATA_STORE_TOPIC, TOP_BUNDLES_LOAD_DATA_STORE_TOPIC, TOPIC);

private PulsarService pulsar;

private ServiceConfiguration conf;
Expand Down Expand Up @@ -828,7 +831,8 @@ public void close() throws PulsarServerException {
}

public static boolean isInternalTopic(String topic) {
return topic.startsWith(TOPIC)
return INTERNAL_TOPICS.contains(topic)
|| topic.startsWith(TOPIC)
|| topic.startsWith(BROKER_LOAD_DATA_STORE_TOPIC)
|| topic.startsWith(TOP_BUNDLES_LOAD_DATA_STORE_TOPIC);
}
Expand Down Expand Up @@ -993,5 +997,26 @@ public void disableBroker() throws Exception {
serviceUnitStateChannel.cleanOwnerships();
leaderElectionService.close();
brokerRegistry.unregister();
// Close the internal topics (if owned any) after giving up the possible leader role,
// so that the subsequent lookups could hit the next leader.
closeInternalTopics();
}

private void closeInternalTopics() {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (String name : INTERNAL_TOPICS) {
futures.add(pulsar.getBrokerService().getTopicIfExists(name)
.thenAccept(topicOptional -> topicOptional.ifPresent(topic -> topic.close(true)))
.exceptionally(__ -> {
log.warn("Failed to close internal topic:{}", name);
return null;
}));
}
try {
FutureUtil.waitForAll(futures)
.get(pulsar.getConfiguration().getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS);
} catch (Throwable e) {
log.warn("Failed to wait for closing internal topics", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -638,20 +638,13 @@ public CompletableFuture<String> publishAssignEventAsync(String serviceUnit, Str
}

private CompletableFuture<Void> publishOverrideEventAsync(String serviceUnit,
ServiceUnitStateData orphanData,
ServiceUnitStateData override) {
if (!validateChannelState(Started, true)) {
throw new IllegalStateException("Invalid channel state:" + channelState.name());
}
EventType eventType = EventType.Override;
eventCounters.get(eventType).getTotal().incrementAndGet();
return pubAsync(serviceUnit, override).whenComplete((__, e) -> {
if (e != null) {
eventCounters.get(eventType).getFailure().incrementAndGet();
log.error("Failed to override serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override, e);
}
}).thenApply(__ -> null);
return pubAsync(serviceUnit, override).thenApply(__ -> null);
}

public CompletableFuture<Void> publishUnloadEventAsync(Unload unload) {
Expand Down Expand Up @@ -1307,24 +1300,49 @@ private void scheduleCleanup(String broker, long delayInSecs) {

private void overrideOwnership(String serviceUnit, ServiceUnitStateData orphanData, String inactiveBroker) {
final var version = getNextVersionId(orphanData);
final var override = selectBroker(serviceUnit, inactiveBroker).map(selectedBroker -> {
if (orphanData.state() == Splitting) {
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
} else {
return new ServiceUnitStateData(Owned, selectedBroker, inactiveBroker, true, version);
}
}).orElseGet(() -> new ServiceUnitStateData(Free, null, inactiveBroker, true, version));
log.info("Overriding ownership serviceUnit:{} from orphanData:{} to overrideData:{}",
serviceUnit, orphanData, override);
publishOverrideEventAsync(serviceUnit, orphanData, override)
.exceptionally(e -> {
log.error(
"Failed to override the ownership serviceUnit:{} orphanData:{}. "
+ "Failed to publish override event. totalCleanupErrorCnt:{}",
serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet());
return null;
});
try {
selectBroker(serviceUnit, inactiveBroker)
.thenApply(selectedOpt ->
selectedOpt.map(selectedBroker -> {
if (orphanData.state() == Splitting) {
// if Splitting, set orphan.dstBroker() as dst to indicate where it was from.
// (The src broker runs handleSplitEvent.)
return new ServiceUnitStateData(Splitting, orphanData.dstBroker(), selectedBroker,
Map.copyOf(orphanData.splitServiceUnitToDestBroker()), true, version);
} else if (orphanData.state() == Owned) {
// if Owned, set orphan.dstBroker() as source to clean it up in case it is still
// alive.
return new ServiceUnitStateData(Owned, selectedBroker,
selectedBroker.equals(orphanData.dstBroker()) ? null :
orphanData.dstBroker(),
true, version);
} else {
// if Assigning or Releasing, set orphan.sourceBroker() as source
// to clean it up in case it is still alive.
return new ServiceUnitStateData(Owned, selectedBroker,
selectedBroker.equals(orphanData.sourceBroker()) ? null :
orphanData.sourceBroker(),
true, version);
}
// If no broker is selected(available), free the ownership.
// If the previous owner is still active, it will close the bundle(topic) ownership.
}).orElseGet(() -> new ServiceUnitStateData(Free, null,
orphanData.state() == Owned ? orphanData.dstBroker() : orphanData.sourceBroker(),
true,
version)))
.thenCompose(override -> {
log.info(
"Overriding inactiveBroker:{}, ownership serviceUnit:{} from orphanData:{} to "
+ "overrideData:{}",
inactiveBroker, serviceUnit, orphanData, override);
return publishOverrideEventAsync(serviceUnit, override);
}).get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS);
} catch (Throwable e) {
log.error(
"Failed to override inactiveBroker:{} ownership serviceUnit:{} orphanData:{}. "
+ "totalCleanupErrorCnt:{}",
inactiveBroker, serviceUnit, orphanData, totalCleanupErrorCnt.incrementAndGet(), e);
}
}

private void waitForCleanups(String broker, boolean excludeSystemTopics, int maxWaitTimeInMillis) {
Expand Down Expand Up @@ -1440,60 +1458,13 @@ private synchronized void doCleanup(String broker) {

}

private Optional<String> selectBroker(String serviceUnit, String inactiveBroker) {
try {
return loadManager.selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
Set.of(inactiveBroker), LookupOptions.builder().build())
.get(inFlightStateWaitingTimeInMillis, MILLISECONDS);
} catch (Throwable e) {
log.error("Failed to select a broker for serviceUnit:{}", serviceUnit);
}
return Optional.empty();
private CompletableFuture<Optional<String>> selectBroker(String serviceUnit, String inactiveBroker) {
return getLoadManager().selectAsync(
LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit),
inactiveBroker == null ? Set.of() : Set.of(inactiveBroker),
LookupOptions.builder().build());
}

private Optional<ServiceUnitStateData> getRollForwardStateData(String serviceUnit,
String inactiveBroker,
long nextVersionId) {
Optional<String> selectedBroker = selectBroker(serviceUnit, inactiveBroker);
if (selectedBroker.isEmpty()) {
return Optional.empty();
}
return Optional.of(new ServiceUnitStateData(Owned, selectedBroker.get(), true, nextVersionId));
}


private Optional<ServiceUnitStateData> getOverrideInFlightStateData(
String serviceUnit, ServiceUnitStateData orphanData,
Set<String> availableBrokers) {
long nextVersionId = getNextVersionId(orphanData);
var state = orphanData.state();
switch (state) {
case Assigning: {
return getRollForwardStateData(serviceUnit, orphanData.dstBroker(), nextVersionId);
}
case Splitting: {
return Optional.of(new ServiceUnitStateData(Splitting,
orphanData.dstBroker(), orphanData.sourceBroker(),
Map.copyOf(orphanData.splitServiceUnitToDestBroker()),
true, nextVersionId));
}
case Releasing: {
if (availableBrokers.contains(orphanData.sourceBroker())) {
// rollback to the src
return Optional.of(new ServiceUnitStateData(Owned, orphanData.sourceBroker(), true, nextVersionId));
} else {
return getRollForwardStateData(serviceUnit, orphanData.sourceBroker(), nextVersionId);
}
}
default: {
var msg = String.format("Failed to get the overrideStateData from serviceUnit=%s, orphanData=%s",
serviceUnit, orphanData);
log.error(msg);
throw new IllegalStateException(msg);
}
}
}

@VisibleForTesting
protected void monitorOwnerships(List<String> brokers) {
Expand Down Expand Up @@ -1521,7 +1492,7 @@ protected void monitorOwnerships(List<String> brokers) {
long startTime = System.nanoTime();
Set<String> inactiveBrokers = new HashSet<>();
Set<String> activeBrokers = new HashSet<>(brokers);
Map<String, ServiceUnitStateData> orphanServiceUnits = new HashMap<>();
Map<String, ServiceUnitStateData> timedOutInFlightStateServiceUnits = new HashMap<>();
int serviceUnitTombstoneCleanupCnt = 0;
int orphanServiceUnitCleanupCnt = 0;
long totalCleanupErrorCntStart = totalCleanupErrorCnt.get();
Expand All @@ -1533,20 +1504,27 @@ protected void monitorOwnerships(List<String> brokers) {
String srcBroker = stateData.sourceBroker();
var state = stateData.state();

if (isActiveState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
if (state == Owned && (StringUtils.isBlank(dstBroker) || !activeBrokers.contains(dstBroker))) {
inactiveBrokers.add(dstBroker);
continue;
}

if (isInFlightState(state) && StringUtils.isNotBlank(srcBroker) && !activeBrokers.contains(srcBroker)) {
inactiveBrokers.add(srcBroker);
continue;
}
if (isActiveState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
if (isInFlightState(state) && StringUtils.isNotBlank(dstBroker) && !activeBrokers.contains(dstBroker)) {
inactiveBrokers.add(dstBroker);
continue;
}
if (isActiveState(state) && isInFlightState(state)

if (isInFlightState(state)
&& now - stateData.timestamp() > inFlightStateWaitingTimeInMillis) {
orphanServiceUnits.put(serviceUnit, stateData);
timedOutInFlightStateServiceUnits.put(serviceUnit, stateData);
continue;
}


if (!isActiveState(state) && now - stateData.timestamp() > stateTombstoneDelayTimeInMillis) {
log.info("Found semi-terminal states to tombstone"
+ " serviceUnit:{}, stateData:{}", serviceUnit, stateData);
Expand All @@ -1562,37 +1540,21 @@ protected void monitorOwnerships(List<String> brokers) {
}
}

// Skip cleaning orphan bundles if inactiveBrokers exist. This is a bigger problem.

if (!inactiveBrokers.isEmpty()) {
for (String inactiveBroker : inactiveBrokers) {
handleBrokerDeletionEvent(inactiveBroker);
}
} else if (!orphanServiceUnits.isEmpty()) {
for (var etr : orphanServiceUnits.entrySet()) {
}

// timedOutInFlightStateServiceUnits are the in-flight ones although their src and dst brokers are known to
// be active.
if (!timedOutInFlightStateServiceUnits.isEmpty()) {
for (var etr : timedOutInFlightStateServiceUnits.entrySet()) {
var orphanServiceUnit = etr.getKey();
var orphanData = etr.getValue();
var overrideData = getOverrideInFlightStateData(
orphanServiceUnit, orphanData, activeBrokers);
if (overrideData.isPresent()) {
log.info("Overriding in-flight state ownership serviceUnit:{} "
+ "from orphanData:{} to overrideData:{}",
orphanServiceUnit, orphanData, overrideData);
publishOverrideEventAsync(orphanServiceUnit, orphanData, overrideData.get())
.whenComplete((__, e) -> {
if (e != null) {
log.error("Failed cleaning the ownership orphanServiceUnit:{}, orphanData:{}, "
+ "cleanupErrorCnt:{}.",
orphanServiceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart, e);
}
});
orphanServiceUnitCleanupCnt++;
} else {
log.warn("Failed get the overrideStateData from orphanServiceUnit:{}, orphanData:{},"
+ " cleanupErrorCnt:{}. will retry..",
orphanServiceUnit, orphanData,
totalCleanupErrorCnt.incrementAndGet() - totalCleanupErrorCntStart);
}
overrideOwnership(orphanServiceUnit, orphanData, null);
orphanServiceUnitCleanupCnt++;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2250,6 +2250,15 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
topics.forEach((name, topicFuture) -> {
TopicName topicName = TopicName.get(name);
if (serviceUnit.includes(topicName)) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)
&& ExtensibleLoadManagerImpl.isInternalTopic(topicName.toString())) {
if (ExtensibleLoadManagerImpl.debug(pulsar.getConfiguration(), log)) {
log.info("[{}] Skip unloading ExtensibleLoadManager internal topics. Such internal topic "
+ "should be closed when shutting down the broker.", topicName);
}
return;
}

// Topic needs to be unloaded
log.info("[{}] Unloading topic", topicName);
if (topicFuture.isCompletedExceptionally()) {
Expand Down
Loading
Loading