Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-307 Added assignedBrokerUrl to CloseProducerCmd… #53

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -178,6 +179,10 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
private final UnloadCounter unloadCounter = new UnloadCounter();
private final SplitCounter splitCounter = new SplitCounter();

// Record the ignored send msg count during unloading
@Getter
private final AtomicLong ignoredSendMsgCounter = new AtomicLong();

// record unload metrics
private final AtomicReference<List<Metrics>> unloadMetrics = new AtomicReference<>();
// record split metrics
Expand Down Expand Up @@ -262,13 +267,26 @@ public static boolean isLoadManagerExtensionEnabled(ServiceConfiguration conf) {
return ExtensibleLoadManagerImpl.class.getName().equals(conf.getLoadManagerClassName());
}

public static boolean isLoadManagerExtensionEnabled(PulsarService pulsar) {
return pulsar.getLoadManager().get() instanceof ExtensibleLoadManagerImpl;
}

public static ExtensibleLoadManagerImpl get(LoadManager loadManager) {
if (!(loadManager instanceof ExtensibleLoadManagerWrapper loadManagerWrapper)) {
throw new IllegalArgumentException("The load manager should be 'ExtensibleLoadManagerWrapper'.");
}
return loadManagerWrapper.get();
}

/**
* A static util func to get the ExtensibleLoadManagerImpl instance.
* @param pulsar PulsarService
* @return the ExtensibleLoadManagerImpl instance
*/
public static ExtensibleLoadManagerImpl get(PulsarService pulsar) {
return get(pulsar.getLoadManager().get());
}

public static boolean debug(ServiceConfiguration config, Logger log) {
return config.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
}
Expand All @@ -286,6 +304,37 @@ public static void createSystemTopic(PulsarService pulsar, String topic) throws
}
}

/**
* Gets the assigned broker for the given topic.
* @param pulsar PulsarService instance
* @param topic Topic Name
* @return the assigned broker's BrokerLookupData instance. Empty, if not assigned by Extensible LoadManager.
*/
public static CompletableFuture<Optional<BrokerLookupData>> getAssignedBrokerLookupData(PulsarService pulsar,
String topic) {
if (ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar.getConfig())) {
var topicName = TopicName.get(topic);
try {
return pulsar.getNamespaceService().getBundleAsync(topicName)
.thenCompose(bundle -> {
var loadManager = ExtensibleLoadManagerImpl.get(pulsar);
var assigned = loadManager.getServiceUnitStateChannel()
.getAssigned(bundle.toString());
if (assigned.isPresent()) {
return loadManager.getBrokerRegistry().lookupAsync(assigned.get());
} else {
return CompletableFuture.completedFuture(Optional.empty());
}
}
);
} catch (Throwable e) {
log.error("Failed to lookup destination broker for topic:{}", topic, e);
return CompletableFuture.completedFuture(Optional.empty());
}
}
return CompletableFuture.completedFuture(Optional.empty());
}

@Override
public void start() throws PulsarServerException {
if (this.started) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,16 @@ public interface ServiceUnitStateChannel extends Closeable {
*/
CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit);

/**
* Gets the assigned broker of the service unit.
*
*
* @param serviceUnit (e.g. bundle))
* @return the future object of the assigned broker
*/
Optional<String> getAssigned(String serviceUnit);


/**
* Checks if the target broker is the owner of the service unit.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,41 @@ public CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit) {
}
}

@Override
public Optional<String> getAssigned(String serviceUnit) {
if (!validateChannelState(Started, true)) {
return Optional.empty();
}

ServiceUnitStateData data = tableview.get(serviceUnit);
if (data == null) {
return Optional.empty();
}
ServiceUnitState state = state(data);
switch (state) {
case Owned, Assigning -> {
return Optional.of(data.dstBroker());
}
case Releasing -> {
return Optional.ofNullable(data.dstBroker());
}
case Splitting -> {
return Optional.of(data.sourceBroker());
}
case Init, Free -> {
return Optional.empty();
}
case Deleted -> {
log.warn("Trying to get the assigned broker from the deleted serviceUnit:{}", serviceUnit);
return Optional.empty();
}
default -> {
log.warn("Trying to get the assigned broker from unknown state:{} serviceUnit:{}", state, serviceUnit);
return Optional.empty();
}
}
}

private long getNextVersionId(String serviceUnit) {
var data = tableview.get(serviceUnit);
return getNextVersionId(data);
Expand Down Expand Up @@ -732,14 +767,19 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(data.dstBroker());
}
stateChangeListeners.notify(serviceUnit, data, null);

if (isTargetBroker(data.dstBroker())) {
log(null, serviceUnit, data, null);
pulsar.getNamespaceService()
.onNamespaceBundleOwned(LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit));
lastOwnEventHandledAt = System.currentTimeMillis();
} else if (data.force() && isTargetBroker(data.sourceBroker())) {
closeServiceUnit(serviceUnit);
stateChangeListeners.notify(serviceUnit, data, null);
log(null, serviceUnit, data, null);
} else if ((data.force() || isTransferCommand(data)) && isTargetBroker(data.sourceBroker())) {
stateChangeListeners.notifyOnCompletion(
closeServiceUnit(serviceUnit, true), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
} else {
stateChangeListeners.notify(serviceUnit, data, null);
}
}

Expand All @@ -755,15 +795,17 @@ private void handleAssignEvent(String serviceUnit, ServiceUnitStateData data) {
private void handleReleaseEvent(String serviceUnit, ServiceUnitStateData data) {
if (isTargetBroker(data.sourceBroker())) {
ServiceUnitStateData next;
CompletableFuture<Integer> unloadFuture;
if (isTransferCommand(data)) {
next = new ServiceUnitStateData(
Assigning, data.dstBroker(), data.sourceBroker(), getNextVersionId(data));
// TODO: when close, pass message to clients to connect to the new broker
unloadFuture = closeServiceUnit(serviceUnit, false);
} else {
next = new ServiceUnitStateData(
Free, null, data.sourceBroker(), getNextVersionId(data));
unloadFuture = closeServiceUnit(serviceUnit, true);
}
stateChangeListeners.notifyOnCompletion(closeServiceUnit(serviceUnit)
stateChangeListeners.notifyOnCompletion(unloadFuture
.thenCompose(__ -> pubAsync(serviceUnit, next)), serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, next));
}
Expand Down Expand Up @@ -861,12 +903,13 @@ private CompletableFuture<String> deferGetOwnerRequest(String serviceUnit) {
}
}

private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
private CompletableFuture<Integer> closeServiceUnit(String serviceUnit, boolean disconnectClients) {
long startTime = System.nanoTime();
MutableInt unloadedTopics = new MutableInt();
NamespaceBundle bundle = LoadManagerShared.getNamespaceBundle(pulsar, serviceUnit);
return pulsar.getBrokerService().unloadServiceUnit(
bundle,
disconnectClients,
true,
pulsar.getConfig().getNamespaceBundleUnloadingTimeoutMs(),
TimeUnit.MILLISECONDS)
Expand All @@ -875,8 +918,10 @@ private CompletableFuture<Integer> closeServiceUnit(String serviceUnit) {
return numUnloadedTopics;
})
.whenComplete((__, ex) -> {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
if (disconnectClients) {
// clean up topics that failed to unload from the broker ownership cache
pulsar.getBrokerService().cleanUnloadedTopicFromCache(bundle);
}
pulsar.getNamespaceService().onNamespaceBundleUnload(bundle);
double unloadBundleTime = TimeUnit.NANOSECONDS
.toMillis((System.nanoTime() - startTime));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public CompletableFuture<Void> handleUnloadRequest(PulsarService pulsar, long ti
return pulsar.getNamespaceService().getOwnershipCache()
.updateBundleState(this.bundle, false)
.thenCompose(v -> pulsar.getBrokerService().unloadServiceUnit(
bundle, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
bundle, true, closeWithoutWaitingClientDisconnect, timeout, timeoutUnit))
.handle((numUnloadedTopics, ex) -> {
if (ex != null) {
// ignore topic-close failure to unload bundle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,11 @@ protected void checkTopicFenced() throws BrokerServiceException {
}
}

@Override
public boolean isFenced() {
return isFenced;
}

protected CompletableFuture<Void> internalAddProducer(Producer producer) {
if (isProducersExceeded(producer)) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2197,8 +2197,10 @@ public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
}

public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) {
CompletableFuture<Integer> future = unloadServiceUnit(serviceUnit, closeWithoutWaitingClientDisconnect);
CompletableFuture<Integer> future = unloadServiceUnit(
serviceUnit, disconnectClients, closeWithoutWaitingClientDisconnect);
ScheduledFuture<?> taskTimeout = executor().schedule(() -> {
if (!future.isDone()) {
log.warn("Unloading of {} has timed out", serviceUnit);
Expand All @@ -2215,11 +2217,13 @@ public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
* Unload all the topic served by the broker service under the given service unit.
*
* @param serviceUnit
* @param disconnectClients disconnect clients
* @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect
* and forcefully close managed-ledger
* @return
*/
private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit,
boolean disconnectClients,
boolean closeWithoutWaitingClientDisconnect) {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
topics.forEach((name, topicFuture) -> {
Expand All @@ -2243,7 +2247,8 @@ private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit
}
}
closeFutures.add(topicFuture
.thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect)
.thenCompose(t -> t.isPresent() ? t.get().close(
disconnectClients, closeWithoutWaitingClientDisconnect)
: CompletableFuture.completedFuture(null)));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
Expand Down Expand Up @@ -699,17 +700,21 @@ public void closeNow(boolean removeFromTopic) {
isDisconnecting.set(false);
}

public CompletableFuture<Void> disconnect() {
return disconnect(Optional.empty());
}

/**
* It closes the producer from server-side and sends command to client to disconnect producer from existing
* connection without closing that connection.
*
* @return Completable future indicating completion of producer close
*/
public CompletableFuture<Void> disconnect() {
public CompletableFuture<Void> disconnect(Optional<BrokerLookupData> assignedBrokerLookupData) {
if (!closeFuture.isDone() && isDisconnecting.compareAndSet(false, true)) {
log.info("Disconnecting producer: {}", this);
cnx.execute(() -> {
cnx.closeProducer(this);
cnx.closeProducer(this, assignedBrokerLookupData);
closeNow(true);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.limiter.ConnectionController;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
Expand Down Expand Up @@ -1598,7 +1600,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {
remoteAddress, producerId);
}
producers.remove(producerId, producerFuture);
closeProducer(producerId, -1L);
closeProducer(producerId, -1L, Optional.empty());
return null;
}
}
Expand Down Expand Up @@ -1760,6 +1762,19 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
printSendCommandDebug(send, headersAndPayload);
}

PulsarService pulsar = getBrokerService().pulsar();
if (producer.getTopic().isFenced()
&& ExtensibleLoadManagerImpl.isLoadManagerExtensionEnabled(pulsar)) {
long ignoredMsgCount = ExtensibleLoadManagerImpl.get(pulsar)
.getIgnoredSendMsgCounter().incrementAndGet();
if (log.isDebugEnabled()) {
log.debug("Ignored send msg from:{}:{} to fenced topic:{} during unloading."
+ " Ignored message count:{}.",
remoteAddress, send.getProducerId(), producer.getTopic().getName(), ignoredMsgCount);
}
return;
}

if (producer.isNonPersistentTopic()) {
// avoid processing non-persist message if reached max concurrent-message limit
if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
Expand Down Expand Up @@ -3020,13 +3035,26 @@ protected void interceptCommand(BaseCommand command) throws InterceptException {
public void closeProducer(Producer producer) {
// removes producer-connection from map and send close command to producer
safelyRemoveProducer(producer);
closeProducer(producer.getProducerId(), producer.getEpoch());
closeProducer(producer.getProducerId(), producer.getEpoch(), Optional.empty());
}

@Override
public void closeProducer(Producer producer, Optional<BrokerLookupData> assignedBrokerLookupData) {
// removes producer-connection from map and send close command to producer
safelyRemoveProducer(producer);
closeProducer(producer.getProducerId(), producer.getEpoch(), assignedBrokerLookupData);
}

public void closeProducer(long producerId, long epoch) {
private void closeProducer(long producerId, long epoch, Optional<BrokerLookupData> assignedBrokerLookupData) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
writeAndFlush(Commands.newCloseProducer(producerId, -1L));
if (assignedBrokerLookupData.isPresent()) {
writeAndFlush(Commands.newCloseProducer(producerId, -1L,
assignedBrokerLookupData.get().pulsarServiceUrl(),
assignedBrokerLookupData.get().pulsarServiceUrlTls()));
} else {
writeAndFlush(Commands.newCloseProducer(producerId, -1L));
}

// The client does not necessarily know that the producer is closed, but the connection is still
// active, and there could be messages in flight already. We want to ignore these messages for a time
// because they are expected. Once the interval has passed, the client should have received the
Expand All @@ -3049,7 +3077,7 @@ public void closeConsumer(Consumer consumer) {
closeConsumer(consumer.consumerId());
}

public void closeConsumer(long consumerId) {
private void closeConsumer(long consumerId) {
if (getRemoteEndpointProtocolVersion() >= v5.getValue()) {
writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

CompletableFuture<Void> close(
boolean disconnectClients, boolean closeWithoutWaitingClientDisconnect);

void checkGC();

CompletableFuture<Void> checkClusterMigration();
Expand Down Expand Up @@ -331,6 +334,8 @@ default boolean isSystemTopic() {

boolean isPersistent();

boolean isFenced();

/* ------ Transaction related ------ */

/**
Expand Down
Loading
Loading