Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Nov 3, 2023
1 parent a97e2e0 commit 7f0b981
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
log(null, serviceUnit, data, null);
} else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
closeServiceUnit(serviceUnit, false), serviceUnit, data)
closeServiceUnit(serviceUnit, true), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
Expand All @@ -799,11 +799,11 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTransferCommand(data)) {
next = new ServiceUnitStateData(
Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, true);
unloadFuture = closeServiceUnit(serviceUnit, false);
} else {
next = new ServiceUnitStateData(
Free, null, data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, false);
unloadFuture = closeServiceUnit(serviceUnit, true);
}
stateChangeListeners.notifyOnCompletion(unloadFuture
.thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
Expand Down Expand Up @@ -903,13 +903,13 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
}
}

private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean closeWithoutDisconnectingClients) {
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean disconnectClients) {
long startTime = System.nanoTime();
MutableInt unloadedTopics = new MutableInt();
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
closeWithoutDisconnectingClients,
disconnectClients,
true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS)
Expand All @@ -918,7 +918,7 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean
return numUnloadedTopics;
})
.whenComplete((__, ex) -> {
if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public CompletableFuture<Void> handleUnloadRequest(PulsarService pulsar, long ti
return pulsar.getNamespaceService().getOwnershipCache()
.updateBundleState(this.bundle, false)
.thenCompose(v -> pulsar.getBrokerService().unloadServiceUnit(
bundle, false, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
bundle, true, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
.handle((numUnloadedTopics, ex) -> {
if (ex != null) {
// ignore topic-close failure to unload bundle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2197,10 +2197,10 @@ public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
}

public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
boolean closeWithoutDisconnectingClients,
boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) {
CompletableFuture<Integer> future = unloadServiceUnit(
serviceUnit, closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect);
serviceUnit, disconnectClients, closeWithoutWaitingClientDisconnect);
ScheduledFuture<?> taskTimeout = executor().schedule(() -> {
if (!future.isDone()) {
log.warn("Unloading of {} has timed out", serviceUnit);
Expand All @@ -2217,13 +2217,13 @@ public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
* Unload all the topic served by the broker service under the given service unit.
*
* @param serviceUnit
* @param closeWithoutDisconnectingClients don't disconnect clients
* @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect
* and forcefully close managed-ledger
* @return
*/
private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
boolean closeWithoutDisconnectingClients,
boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect) {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
topics.forEach((name, topicFuture) -> {
Expand All @@ -2248,7 +2248,7 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(
closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect)
disconnectClients, closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1762,11 +1762,10 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
printSendCommandDebug(send, headersAndPayload);
}


ServiceConfiguration conf = getBrokerService().pulsar().getConfiguration();
PulsarService pulsar = getBrokerService().pulsar();
if (producer.getTopic().isFenced()
&& ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(conf)) {
long ignoredMsgCount = ExtensibleLoadManagerImpl.get(getBrokerService().pulsar())
&& ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
.getIgnoredSendMsgCounter().incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Ignored send msg from:{}:{} to fenced topic:{} during unloading."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

CompletableFuture<Void> close(
boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect);
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect);

void checkGC();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,19 +488,19 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c

@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
return close(false, closeWithoutWaitingClientDisconnect);
return close(true, closeWithoutWaitingClientDisconnect);
}

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutDisconnectingClients don't disconnect clients
* @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(
boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand All @@ -519,7 +519,7 @@ public CompletableFuture<Void> close(
List<CompletableFuture<Void>> futures = new ArrayList<>();

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData ->
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
Expand Down Expand Up @@ -553,7 +553,7 @@ public CompletableFuture<Void> close(
// so, execute it in different thread
brokerService.executor().execute(() -> {

if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,24 +1449,24 @@ public void deleteLedgerComplete(Object ctx) {
}

public CompletableFuture<Void> close() {
return close(false, false);
return close(true, false);
}

@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
return close(false, closeWithoutWaitingClientDisconnect);
return close(true, closeWithoutWaitingClientDisconnect);
}

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutDisconnectingClients don't disconnect clients
* @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(
boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand All @@ -1489,7 +1489,7 @@ public CompletableFuture<Void> close(
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData ->
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
Expand Down Expand Up @@ -1531,21 +1531,21 @@ public CompletableFuture<Void> close(
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (closeWithoutDisconnectingClients) {
closeFuture.complete(null);
} else {
if (disconnectClients) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (closeWithoutDisconnectingClients) {
closeFuture.complete(null);
} else {
if (disconnectClients) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,11 @@ public void testCheckOwnershipPresentWithSystemNamespace() throws Exception {

@Test
public void testMoreThenOneFilter() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception");
// Use a different namespace to avoid flaky test failures
// from unloading the default namespace and the following topic policy lookups at the init state step
String namespace = "public/my-namespace";
TopicName topicName = TopicName.get(namespace + "/test-filter-has-exception");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();

String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
doReturn(List.of(new MockBrokerFilter() {
@Override
Expand All @@ -682,10 +684,18 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
return FutureUtil.failedFuture(new BrokerFilterException("Test"));
}
})).when(primaryLoadManager).getBrokerFilterPipeline();

admin.namespaces().createNamespace(namespace);
Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
assertTrue(brokerLookupData.isPresent());
assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(brokerLookupData.isPresent());
assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
pulsar1.getAdminClient().lookups().lookupTopic(topicName.toString()));
assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString()));
});

admin.namespaces().deleteNamespace(namespace, true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void reconnectLater(Throwable exception) {
}

public void connectionClosed(ClientCnx cnx) {
connectionClosed(cnx, null, Optional.empty());
connectionClosed(cnx, Optional.empty(), Optional.empty());
}

public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDelayMs, Optional<URI> hostUrl) {
Expand Down

0 comments on commit 7f0b981

Please sign in to comment.