Skip to content

Commit

Permalink
resolved comments
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Nov 3, 2023
1 parent a97e2e0 commit 5bc4a23
Show file tree
Hide file tree
Showing 9 changed files with 51 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,7 +776,7 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
log(null, serviceUnit, data, null);
} else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
closeServiceUnit(serviceUnit, false), serviceUnit, data)
closeServiceUnit(serviceUnit, true), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
Expand All @@ -799,11 +799,11 @@ private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTransferCommand(data)) {
next = new ServiceUnitStateData(
Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, true);
unloadFuture = closeServiceUnit(serviceUnit, false);
} else {
next = new ServiceUnitStateData(
Free, null, data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, false);
unloadFuture = closeServiceUnit(serviceUnit, true);
}
stateChangeListeners.notifyOnCompletion(unloadFuture
.thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
Expand Down Expand Up @@ -903,13 +903,13 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
}
}

private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean closeWithoutDisconnectingClients) {
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean disconnectClients) {
long startTime = System.nanoTime();
MutableInt unloadedTopics = new MutableInt();
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
closeWithoutDisconnectingClients,
disconnectClients,
true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS)
Expand All @@ -918,7 +918,7 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean
return numUnloadedTopics;
})
.whenComplete((__, ex) -> {
if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2197,10 +2197,10 @@ public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
}

public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
boolean closeWithoutDisconnectingClients,
boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) {
CompletableFuture<Integer> future = unloadServiceUnit(
serviceUnit, closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect);
serviceUnit, disconnectClients, closeWithoutWaitingClientDisconnect);
ScheduledFuture<?> taskTimeout = executor().schedule(() -> {
if (!future.isDone()) {
log.warn("Unloading of {} has timed out", serviceUnit);
Expand All @@ -2217,13 +2217,13 @@ public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
* Unload all the topic served by the broker service under the given service unit.
*
* @param serviceUnit
* @param closeWithoutDisconnectingClients don't disconnect clients
* @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect
* and forcefully close managed-ledger
* @return
*/
private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
boolean closeWithoutDisconnectingClients,
boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect) {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
topics.forEach((name, topicFuture) -> {
Expand All @@ -2248,7 +2248,7 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(
closeWithoutDisconnectingClients, closeWithoutWaitingClientDisconnect)
disconnectClients, closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1762,11 +1762,10 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
printSendCommandDebug(send, headersAndPayload);
}


ServiceConfiguration conf = getBrokerService().pulsar().getConfiguration();
PulsarService pulsar = getBrokerService().pulsar();
if (producer.getTopic().isFenced()
&& ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(conf)) {
long ignoredMsgCount = ExtensibleLoadManagerImpl.get(getBrokerService().pulsar())
&& ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
.getIgnoredSendMsgCounter().incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Ignored send msg from:{}:{} to fenced topic:{} during unloading."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init
CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

CompletableFuture<Void> close(
boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect);
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect);

void checkGC();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,19 +488,19 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c

@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
return close(false, closeWithoutWaitingClientDisconnect);
return close(true, closeWithoutWaitingClientDisconnect);
}

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutDisconnectingClients don't disconnect clients
* @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(
boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand All @@ -519,7 +519,7 @@ public CompletableFuture<Void> close(
List<CompletableFuture<Void>> futures = new ArrayList<>();

replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData ->
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
Expand Down Expand Up @@ -553,7 +553,7 @@ public CompletableFuture<Void> close(
// so, execute it in different thread
brokerService.executor().execute(() -> {

if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
brokerService.removeTopicFromCache(NonPersistentTopic.this);
unregisterTopicPolicyListener();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,24 +1449,24 @@ public void deleteLedgerComplete(Object ctx) {
}

public CompletableFuture<Void> close() {
return close(false, false);
return close(true, false);
}

@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
return close(false, closeWithoutWaitingClientDisconnect);
return close(true, closeWithoutWaitingClientDisconnect);
}

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutDisconnectingClients don't disconnect clients
* @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(
boolean closeWithoutDisconnectingClients, boolean closeWithoutWaitingClientDisconnect) {
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

lock.writeLock().lock();
Expand All @@ -1489,7 +1489,7 @@ public CompletableFuture<Void> close(
futures.add(transactionBuffer.closeAsync());
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
shadowReplicators.forEach((__, replicator) -> futures.add(replicator.disconnect()));
if (!closeWithoutDisconnectingClients) {
if (disconnectClients) {
futures.add(ExtensibleLoadManagerImpl.getAssignedBrokerLookupData(
brokerService.getPulsar(), topic).thenAccept(lookupData ->
producers.values().forEach(producer -> futures.add(producer.disconnect(lookupData)))
Expand Down Expand Up @@ -1531,21 +1531,21 @@ public CompletableFuture<Void> close(
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
if (closeWithoutDisconnectingClients) {
closeFuture.complete(null);
} else {
if (disconnectClients) {
// Everything is now closed, remove the topic from map
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
if (closeWithoutDisconnectingClients) {
closeFuture.complete(null);
} else {
if (disconnectClients) {
disposeTopic(closeFuture);
} else {
closeFuture.complete(null);
}
}
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,11 @@ public void testCheckOwnershipPresentWithSystemNamespace() throws Exception {

@Test
public void testMoreThenOneFilter() throws Exception {
TopicName topicName = TopicName.get(defaultTestNamespace + "/test-filter-has-exception");
// Use a different namespace to avoid flaky test failures
// from unloading the default namespace and the following topic policy lookups at the init state step
String namespace = "public/my-namespace";
TopicName topicName = TopicName.get(namespace + "/test-filter-has-exception");
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();

String lookupServiceAddress1 = pulsar1.getLookupServiceAddress();
doReturn(List.of(new MockBrokerFilter() {
@Override
Expand All @@ -682,10 +684,18 @@ public CompletableFuture<Map<String, BrokerLookupData>> filterAsync(Map<String,
return FutureUtil.failedFuture(new BrokerFilterException("Test"));
}
})).when(primaryLoadManager).getBrokerFilterPipeline();

admin.namespaces().createNamespace(namespace);
Optional<BrokerLookupData> brokerLookupData = primaryLoadManager.assign(Optional.empty(), bundle).get();
assertTrue(brokerLookupData.isPresent());
assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
Awaitility.waitAtMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
assertTrue(brokerLookupData.isPresent());
assertEquals(brokerLookupData.get().getWebServiceUrl(), pulsar2.getWebServiceAddress());
assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
pulsar1.getAdminClient().lookups().lookupTopic(topicName.toString()));
assertEquals(brokerLookupData.get().getPulsarServiceUrl(),
pulsar2.getAdminClient().lookups().lookupTopic(topicName.toString()));
});

admin.namespaces().deleteNamespace(namespace, true);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ void reconnectLater(Throwable exception) {
}

public void connectionClosed(ClientCnx cnx) {
connectionClosed(cnx, null, Optional.empty());
connectionClosed(cnx, Optional.empty(), Optional.empty());
}

public void connectionClosed(ClientCnx cnx, Optional<Long> initialConnectionDelayMs, Optional<URI> hostUrl) {
Expand Down

0 comments on commit 5bc4a23

Please sign in to comment.