Skip to content

Commit

Permalink
[fix][broker] timeout when broker registry hangs and monitor broker r…
Browse files Browse the repository at this point in the history
…egistry (ExtensibleLoadManagerImpl only) (#23382) (#23510)
  • Loading branch information
heesung-sn authored Oct 25, 2024
1 parent 5c54c3b commit 0ee3e00
Show file tree
Hide file tree
Showing 10 changed files with 310 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService.State;
Expand Down Expand Up @@ -368,20 +369,26 @@ public void isReady(@Suspended AsyncResponse asyncResponse) {
@ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 307, message = "Current broker is not the target broker"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion) {
@QueryParam("topicVersion") TopicVersion topicVersion,
@QueryParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> maybeRedirectToBroker(
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
if (!isRedirectException(ex)) {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable {
*/
boolean isStarted();

/**
* Return the broker has been registered.
*/
boolean isRegistered();

/**
* Register local broker to metadata store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
@Slf4j
public class BrokerRegistryImpl implements BrokerRegistry {

private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000;

private final PulsarService pulsar;

private final ServiceConfiguration conf;
Expand All @@ -77,10 +79,11 @@ protected enum State {
@VisibleForTesting
final AtomicReference<State> state = new AtomicReference<>(State.Init);

public BrokerRegistryImpl(PulsarService pulsar) {
@VisibleForTesting
BrokerRegistryImpl(PulsarService pulsar, MetadataCache<BrokerLookupData> brokerLookupDataMetadataCache) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache;
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
Expand All @@ -98,6 +101,10 @@ public BrokerRegistryImpl(PulsarService pulsar) {
pulsar.getBrokerVersion());
}

public BrokerRegistryImpl(PulsarService pulsar) {
this(pulsar, pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class));
}

@Override
public synchronized void start() throws PulsarServerException {
if (!this.state.compareAndSet(State.Init, State.Started)) {
Expand All @@ -117,6 +124,12 @@ public boolean isStarted() {
return state == State.Started || state == State.Registered;
}

@Override
public boolean isRegistered() {
final var state = this.state.get();
return state == State.Registered;
}

@Override
public CompletableFuture<Void> registerAsync() {
final var state = this.state.get();
Expand All @@ -126,12 +139,35 @@ public CompletableFuture<Void> registerAsync() {
}
log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state);
return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.thenAccept(__ -> {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
.orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)
.whenComplete((__, ex) -> {
if (ex == null) {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
} else {
log.error("[{}] Failed registering self", getBrokerId(), ex);
}
});
}

private void doRegisterAsyncWithRetries(int retry, CompletableFuture<Void> future) {
pulsar.getExecutor().schedule(() -> {
registerAsync().whenComplete((__, e) -> {
if (e != null) {
doRegisterAsyncWithRetries(retry + 1, future);
} else {
future.complete(null);
}
});
}, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), TimeUnit.MILLISECONDS);
}

private CompletableFuture<Void> registerAsyncWithRetries() {
var retryFuture = new CompletableFuture<Void>();
doRegisterAsyncWithRetries(0, retryFuture);
return retryFuture;
}

@Override
public synchronized void unregister() throws MetadataStoreException {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
Expand Down Expand Up @@ -218,17 +254,26 @@ private void handleMetadataStoreNotification(Notification t) {
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);

CompletableFuture<Void> register;
if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) {
registerAsync();
}
if (listeners.isEmpty()) {
return;
this.state.set(State.Started);
register = registerAsyncWithRetries();
} else {
register = CompletableFuture.completedFuture(null);
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
// Make sure to run the listeners after re-registered.
register.thenAccept(__ -> {
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
}
});
});

} catch (RejectedExecutionException e) {
// Executor is shutting down
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -1007,8 +1009,12 @@ protected void monitor() {
return;
}

// Monitor broker registry
// Periodically check the broker registry in case metadata store fails.
validateBrokerRegistry();

// Monitor role
// Periodically check the role in case ZK watcher fails.
// Periodically check the role in case metadata store fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
Expand Down Expand Up @@ -1093,4 +1099,15 @@ boolean running() {
private boolean disabled() {
return state.get() == State.DISABLED;
}

private void validateBrokerRegistry()
throws ExecutionException, InterruptedException, TimeoutException {
var timeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
var lookup = brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout, TimeUnit.SECONDS);
if (lookup.isEmpty()) {
log.warn("Found this broker:{} has not registered yet. Trying to register it",
brokerRegistry.getBrokerId());
brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -98,6 +99,7 @@
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
import org.apache.pulsar.common.util.FutureUtil;
Expand All @@ -121,13 +123,16 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10;
private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000;
private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
private final Map<String, CompletableFuture<String>> getOwnerRequests;
private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;

private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;
private TableView<ServiceUnitStateData> tableview;
Expand Down Expand Up @@ -267,7 +272,7 @@ public void cancelOwnershipMonitor() {
@Override
public void cleanOwnerships() {
disable();
doCleanup(brokerId);
doCleanup(brokerId, true);
}

@Override
Expand Down Expand Up @@ -383,6 +388,12 @@ protected LeaderElectionService getLeaderElectionService() {
.get().getLeaderElectionService();
}

@VisibleForTesting
protected PulsarAdmin getPulsarAdmin() throws PulsarServerException {
return pulsar.getAdminClient();
}

@Override
public synchronized void close() throws PulsarServerException {
channelState = Closed;
boolean debug = debug();
Expand Down Expand Up @@ -491,6 +502,14 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {

// If this broker's registry does not exist(possibly suffering from connecting to the metadata store),
// we return the owner without its activeness check.
// This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable.
if (!brokerRegistry.isRegistered()) {
return CompletableFuture.completedFuture(owner);
}

return dedupeGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
Expand Down Expand Up @@ -1259,19 +1278,25 @@ private MetadataState getMetadataState() {
}

private void handleBrokerCreationEvent(String broker) {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}

if (!cleanupJobs.isEmpty() && cleanupJobs.containsKey(broker)) {
healthCheckBrokerAsync(broker)
.thenAccept(__ -> {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}
}
});
}
}

Expand Down Expand Up @@ -1318,7 +1343,7 @@ private void scheduleCleanup(String broker, long delayInSecs) {
var future = CompletableFuture
.runAsync(() -> {
try {
doCleanup(broker);
doCleanup(broker, false);
} catch (Throwable e) {
log.error("Failed to run the cleanup job for the broker {}, "
+ "totalCleanupErrorCnt:{}.",
Expand Down Expand Up @@ -1422,7 +1447,38 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
System.currentTimeMillis() - started);
}

private synchronized void doCleanup(String broker) {
private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) {
CompletableFuture<Void> future = new CompletableFuture<>();
doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future);
return future;
}

private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
.whenComplete((__, e) -> {
if (e == null) {
log.info("Completed health-check broker :{}", brokerId, e);
future.complete(null);
return;
}
if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
log.error("Failed health-check broker :{}", brokerId, e);
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
} else {
pulsar.getExecutor()
.schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
MILLISECONDS);
}
});
} catch (PulsarServerException e) {
future.completeExceptionally(e);
}
}

private synchronized void doCleanup(String broker, boolean gracefully) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
.isEmpty()) {
Expand All @@ -1435,6 +1491,23 @@ private synchronized void doCleanup(String broker) {
return;
}

// if not gracefully, verify the broker is inactive by health-check.
if (!gracefully) {
try {
healthCheckBrokerAsync(broker).get(
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
log.warn("Found that the broker to clean is healthy. Skip the broker:{}'s orphan bundle cleanup",
broker);
return;
} catch (Exception e) {
if (debug()) {
log.info("Failed to check broker:{} health", broker, e);
}
log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker);
}
}


long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Expand Down
Loading

0 comments on commit 0ee3e00

Please sign in to comment.