Skip to content

Commit

Permalink
[improve][broker] Add ServiceUnitStateTableView (ExtensibleLoadManage…
Browse files Browse the repository at this point in the history
…rImpl only)
  • Loading branch information
heesung-sn committed Sep 13, 2024
1 parent 13c19b5 commit d9e101a
Show file tree
Hide file tree
Showing 32 changed files with 2,986 additions and 1,116 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,9 @@ loadBalancerNamespaceBundleSplitConditionHitCountThreshold=3
# (only used in load balancer extension logics)
loadBalancerServiceUnitStateTombstoneDelayTimeInSeconds=3600

# Name of ServiceUnitStateTableView implementation class to use
loadManagerServiceUnitStateTableViewClassName=org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl


### --- Replication --- ###

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2874,6 +2874,14 @@ public double getLoadBalancerBandwidthOutResourceWeight() {
)
private boolean loadBalancerMultiPhaseBundleUnload = true;

@FieldContext(
dynamic = false,
category = CATEGORY_LOAD_BALANCER,
doc = "Name of ServiceUnitStateTableView implementation class to use"
)
private String loadManagerServiceUnitStateTableViewClassName =
"org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl";

/**** --- Replication. --- ****/
@FieldContext(
category = CATEGORY_REPLICATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import static java.lang.String.format;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Follower;
import static org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl.Role.Leader;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewImpl.TOPIC;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Label.Success;
import static org.apache.pulsar.broker.loadbalance.extensions.models.SplitDecision.Reason.Admin;
import static org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.getNamespaceBundle;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -41,20 +40,16 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitState;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLoadData;
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
import org.apache.pulsar.broker.loadbalance.extensions.data.TopBundlesLoadData;
Expand Down Expand Up @@ -209,46 +204,18 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager, BrokerS
/**
* Get all the bundles that are owned by this broker.
*/
@Deprecated
public CompletableFuture<Set<NamespaceBundle>> getOwnedServiceUnitsAsync() {
return CompletableFuture.completedFuture(getOwnedServiceUnits());
}

public Set<NamespaceBundle> getOwnedServiceUnits() {
if (!started) {
log.warn("Failed to get owned service units, load manager is not started.");
return CompletableFuture.completedFuture(Collections.emptySet());
return Collections.emptySet();
}

String brokerId = brokerRegistry.getBrokerId();
Set<Map.Entry<String, ServiceUnitStateData>> entrySet = serviceUnitStateChannel.getOwnershipEntrySet();
Set<NamespaceBundle> ownedServiceUnits = entrySet.stream()
.filter(entry -> {
var stateData = entry.getValue();
return stateData.state() == ServiceUnitState.Owned
&& StringUtils.isNotBlank(stateData.dstBroker())
&& stateData.dstBroker().equals(brokerId);
}).map(entry -> {
var bundle = entry.getKey();
return getNamespaceBundle(pulsar, bundle);
}).collect(Collectors.toSet());
// Add heartbeat and SLA monitor namespace bundle.
NamespaceName heartbeatNamespace = NamespaceService.getHeartbeatNamespace(brokerId, pulsar.getConfiguration());
NamespaceName heartbeatNamespaceV2 = NamespaceService
.getHeartbeatNamespaceV2(brokerId, pulsar.getConfiguration());
NamespaceName slaMonitorNamespace = NamespaceService
.getSLAMonitorNamespace(brokerId, pulsar.getConfiguration());
return pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(heartbeatNamespace)
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get heartbeat namespace bundle.", e);
return null;
}).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(heartbeatNamespaceV2))
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get heartbeat namespace V2 bundle.", e);
return null;
}).thenCompose(__ -> pulsar.getNamespaceService().getNamespaceBundleFactory()
.getFullBundleAsync(slaMonitorNamespace))
.thenAccept(fullBundle -> ownedServiceUnits.add(fullBundle)).exceptionally(e -> {
log.warn("Failed to get SLA Monitor namespace bundle.", e);
return null;
}).thenApply(__ -> ownedServiceUnits);
return serviceUnitStateChannel.getOwnedServiceUnits();
}

@Override
Expand Down Expand Up @@ -799,8 +766,8 @@ public void close() throws PulsarServerException {
monitorTask.cancel(true);
}

this.brokerLoadDataStore.close();
this.topBundlesLoadDataStore.close();
this.brokerLoadDataStore.shutdown();
this.topBundlesLoadDataStore.shutdown();
this.unloadScheduler.close();
this.splitScheduler.close();
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ public enum ServiceUnitState {

Deleted; // deleted in the system (semi-terminal state)

private static final Map<ServiceUnitState, Set<ServiceUnitState>> validTransitions = Map.of(

public enum StorageType {
SystemTopic,
MetadataStore;
}

private static final Map<ServiceUnitState, Set<ServiceUnitState>> validTransitionsOverSystemTopic = Map.of(
// (Init -> all states) transitions are required
// when the topic is compacted in the middle of assign, transfer or split.
Init, Set.of(Free, Owned, Assigning, Releasing, Splitting, Deleted),
Expand All @@ -54,12 +60,24 @@ public enum ServiceUnitState {
Deleted, Set.of(Init)
);

private static final Map<ServiceUnitState, Set<ServiceUnitState>> validTransitionsOverMetadataStore = Map.of(
Init, Set.of(Assigning),
Free, Set.of(Assigning),
Owned, Set.of(Splitting, Releasing),
Assigning, Set.of(Owned),
Releasing, Set.of(Assigning, Free),
Splitting, Set.of(Deleted),
Deleted, Set.of(Init)
);

private static final Set<ServiceUnitState> inFlightStates = Set.of(
Assigning, Releasing, Splitting
);

public static boolean isValidTransition(ServiceUnitState from, ServiceUnitState to) {
Set<ServiceUnitState> transitions = validTransitions.get(from);
public static boolean isValidTransition(ServiceUnitState from, ServiceUnitState to, StorageType storageType) {
Set<ServiceUnitState> transitions =
(storageType == StorageType.SystemTopic) ? validTransitionsOverSystemTopic.get(from)
: validTransitionsOverMetadataStore.get(from);
return transitions.contains(to);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.loadbalance.extensions.manager.StateChangeListener;
import org.apache.pulsar.broker.loadbalance.extensions.models.Split;
import org.apache.pulsar.broker.loadbalance.extensions.models.Unload;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.extended.SessionEvent;

/**
* Defines the ServiceUnitStateChannel interface.
Expand All @@ -51,140 +52,79 @@ public interface ServiceUnitStateChannel extends Closeable {
void close() throws PulsarServerException;

/**
* Asynchronously gets the current owner broker of the system topic in this channel.
* @return the service url without the protocol prefix, 'http://'. e.g. broker-xyz:abcd
*
* ServiceUnitStateChannel elects the separate leader as the owner broker of the system topic in this channel.
* Asynchronously gets the current owner broker of this channel.
* @return a future of owner brokerId to track the completion of the operation
*/
CompletableFuture<Optional<String>> getChannelOwnerAsync();

/**
* Asynchronously checks if the current broker is the owner broker of the system topic in this channel.
* @return True if the current broker is the owner. Otherwise, false.
* Asynchronously checks if the current broker is the owner broker of this channel.
* @return a future of check result to track the completion of the operation
*/
CompletableFuture<Boolean> isChannelOwnerAsync();

/**
* Checks if the current broker is the owner broker of the system topic in this channel.
* Checks if the current broker is the owner broker of this channel.
* @return True if the current broker is the owner. Otherwise, false.
* @throws ExecutionException
* @throws InterruptedException
* @throws TimeoutException
*/
boolean isChannelOwner();

/**
* Handles the metadata session events to track
* if the connection between the broker and metadata store is stable or not.
* This will be registered as a metadata SessionEvent listener.
*
* The stability of the metadata connection is important
* to determine how to handle the broker deletion(unavailable) event notified from the metadata store.
*
* Please refer to handleBrokerRegistrationEvent(String broker, NotificationType type) for more details.
*
* @param event metadata session events
*/
void handleMetadataSessionEvent(SessionEvent event);

/**
* Handles the broker registration event from the broker registry.
* This will be registered as a broker registry listener.
*
* Case 1: If NotificationType is Deleted,
* it will schedule a clean-up operation to release the ownerships of the deleted broker.
*
* Sub-case1: If the metadata connection has been stable for long time,
* it will immediately execute the cleanup operation to guarantee high-availability.
*
* Sub-case2: If the metadata connection has been stable only for short time,
* it will defer the clean-up operation for some time and execute it.
* This is to gracefully handle the case when metadata connection is flaky --
* If the deleted broker comes back very soon,
* we better cancel the clean-up operation for high-availability.
*
* Sub-case3: If the metadata connection is unstable,
* it will not schedule the clean-up operation, as the broker-metadata connection is lost.
* The brokers will continue to serve existing topics connections,
* and we better not to interrupt the existing topic connections for high-availability.
*
*
* Case 2: If NotificationType is Created,
* it will cancel any scheduled clean-up operation if still not executed.
*
* @param broker notified broker
* @param type notification type
*/
void handleBrokerRegistrationEvent(String broker, NotificationType type);
boolean isChannelOwner() throws ExecutionException, InterruptedException, TimeoutException;

/**
* Asynchronously gets the current owner broker of the service unit.
*
*
* @param serviceUnit (e.g. bundle)
* @return the future object of the owner broker
*
* Case 1: If the service unit is owned, it returns the completed future object with the current owner.
* Case 2: If the service unit's assignment is ongoing, it returns the non-completed future object.
* Sub-case1: If the assigned broker is available and finally takes the ownership,
* the future object will complete and return the owner broker.
* Sub-case2: If the assigned broker does not take the ownership in time,
* the future object will time out.
* Case 3: If none of them, it returns Optional.empty().
* @return a future of owner brokerId to track the completion of the operation
*/
CompletableFuture<Optional<String>> getOwnerAsync(String serviceUnit);

/**
* Gets the assigned broker of the service unit.
*
* Asynchronously gets the assigned broker of the service unit.
*
* @param serviceUnit (e.g. bundle))
* @return the future object of the assigned broker
* @return assigned brokerId
*/
Optional<String> getAssigned(String serviceUnit);


/**
* Checks if the target broker is the owner of the service unit.
*
*
* @param serviceUnit (e.g. bundle)
* @param targetBroker
* @return true if the target broker is the owner. false if unknown.
* @param targetBrokerId
* @return true if the target brokerId is the owner brokerId. false if unknown.
*/
boolean isOwner(String serviceUnit, String targetBroker);
boolean isOwner(String serviceUnit, String targetBrokerId);

/**
* Checks if the current broker is the owner of the service unit.
*
*
* @param serviceUnit (e.g. bundle))
* @return true if the current broker is the owner. false if unknown.
*/
boolean isOwner(String serviceUnit);

/**
* Asynchronously publishes the service unit assignment event to the system topic in this channel.
* It de-duplicates assignment events if there is any ongoing assignment event for the same service unit.
* Asynchronously publishes the service unit assignment event to this channel.
* @param serviceUnit (e.g bundle)
* @param broker the assigned broker
* @return the completable future object with the owner broker
* case 1: If the assigned broker is available and takes the ownership,
* the future object will complete and return the owner broker.
* The returned owner broker could be different from the input broker (due to assignment race-condition).
* case 2: If the assigned broker does not take the ownership in time,
* the future object will time out.
* @param brokerId the assigned brokerId
* @return a future of owner brokerId to track the completion of the operation
*/
CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String broker);
CompletableFuture<String> publishAssignEventAsync(String serviceUnit, String brokerId);

/**
* Asynchronously publishes the service unit unload event to the system topic in this channel.
* Asynchronously publishes the service unit unload event to this channel.
* @param unload (unload specification object)
* @return the completable future object staged from the event message sendAsync.
* @return a future to track the completion of the operation
*/
CompletableFuture<Void> publishUnloadEventAsync(Unload unload);

/**
* Asynchronously publishes the bundle split event to the system topic in this channel.
* Asynchronously publishes the bundle split event to this channel.
* @param split (split specification object)
* @return the completable future object staged from the event message sendAsync.
* @return a future to track the completion of the operation
*/
CompletableFuture<Void> publishSplitEventAsync(Split split);

Expand All @@ -195,18 +135,24 @@ public interface ServiceUnitStateChannel extends Closeable {
List<Metrics> getMetrics();

/**
* Add a state change listener.
* Adds a state change listener.
*
* @param listener State change listener.
*/
void listen(StateChangeListener listener);

/**
* Returns service unit ownership entry set.
* @return a set of service unit ownership entries
* Asynchronously returns service unit ownership entry set.
* @return a set of service unit ownership entries to track the completion of the operation
*/
Set<Map.Entry<String, ServiceUnitStateData>> getOwnershipEntrySet();

/**
* Asynchronously returns service units owned by this broker.
* @return a set of owned service units to track the completion of the operation
*/
Set<NamespaceBundle> getOwnedServiceUnits();

/**
* Schedules ownership monitor to periodically check and correct invalid ownership states.
*/
Expand All @@ -218,7 +164,7 @@ public interface ServiceUnitStateChannel extends Closeable {
void cancelOwnershipMonitor();

/**
* Cleans the service unit ownerships from the current broker's channel.
* Cleans(gives up) any service unit ownerships from this broker.
*/
void cleanOwnerships();
}
Loading

0 comments on commit d9e101a

Please sign in to comment.