Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] timeout when broker registry hangs and monitor broker registry (ExtensibleLoadManagerImpl only) #23382

Merged
merged 6 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService.State;
Expand Down Expand Up @@ -368,20 +369,26 @@ public void isReady(@Suspended AsyncResponse asyncResponse) {
@ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 307, message = "Current broker is not the target broker"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion) {
@QueryParam("topicVersion") TopicVersion topicVersion,
@QueryParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> maybeRedirectToBroker(
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
if (!isRedirectException(ex)) {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ public interface BrokerRegistry extends AutoCloseable {
*/
boolean isStarted();

/**
* Return the broker has been registered.
*/
boolean isRegistered();

/**
* Register local broker to metadata store.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
@Slf4j
public class BrokerRegistryImpl implements BrokerRegistry {

private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000;

private final PulsarService pulsar;

private final ServiceConfiguration conf;
Expand All @@ -77,10 +79,11 @@ protected enum State {
@VisibleForTesting
final AtomicReference<State> state = new AtomicReference<>(State.Init);

public BrokerRegistryImpl(PulsarService pulsar) {
@VisibleForTesting
BrokerRegistryImpl(PulsarService pulsar, MetadataCache<BrokerLookupData> brokerLookupDataMetadataCache) {
this.pulsar = pulsar;
this.conf = pulsar.getConfiguration();
this.brokerLookupDataMetadataCache = pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class);
this.brokerLookupDataMetadataCache = brokerLookupDataMetadataCache;
this.scheduler = pulsar.getLoadManagerExecutor();
this.listeners = new ArrayList<>();
this.brokerIdKeyPath = keyPath(pulsar.getBrokerId());
Expand All @@ -99,6 +102,10 @@ public BrokerRegistryImpl(PulsarService pulsar) {
pulsar.getConfig().lookupProperties());
}

public BrokerRegistryImpl(PulsarService pulsar) {
this(pulsar, pulsar.getLocalMetadataStore().getMetadataCache(BrokerLookupData.class));
}

@Override
public synchronized void start() throws PulsarServerException {
if (!this.state.compareAndSet(State.Init, State.Started)) {
Expand All @@ -118,6 +125,12 @@ public boolean isStarted() {
return state == State.Started || state == State.Registered;
}

@Override
public boolean isRegistered() {
final var state = this.state.get();
return state == State.Registered;
}

@Override
public CompletableFuture<Void> registerAsync() {
final var state = this.state.get();
Expand All @@ -127,12 +140,35 @@ public CompletableFuture<Void> registerAsync() {
}
log.info("[{}] Started registering self to {} (state: {})", getBrokerId(), brokerIdKeyPath, state);
return brokerLookupDataMetadataCache.put(brokerIdKeyPath, brokerLookupData, EnumSet.of(CreateOption.Ephemeral))
.thenAccept(__ -> {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
.orTimeout(pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), TimeUnit.SECONDS)
.whenComplete((__, ex) -> {
if (ex == null) {
this.state.set(State.Registered);
log.info("[{}] Finished registering self", getBrokerId());
} else {
log.error("[{}] Failed registering self", getBrokerId(), ex);
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

private void doRegisterAsyncWithRetries(int retry, CompletableFuture<Void> future) {
pulsar.getExecutor().schedule(() -> {
registerAsync().whenComplete((__, e) -> {
if (e != null) {
doRegisterAsyncWithRetries(retry + 1, future);
} else {
future.complete(null);
}
});
}, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), TimeUnit.MILLISECONDS);
}

private CompletableFuture<Void> registerAsyncWithRetries() {
var retryFuture = new CompletableFuture<Void>();
doRegisterAsyncWithRetries(0, retryFuture);
return retryFuture;
}

@Override
public synchronized void unregister() throws MetadataStoreException {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
Expand Down Expand Up @@ -219,17 +255,26 @@ private void handleMetadataStoreNotification(Notification t) {
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);

CompletableFuture<Void> register;
if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) {
registerAsync();
}
if (listeners.isEmpty()) {
return;
this.state.set(State.Started);
register = registerAsyncWithRetries();
} else {
register = CompletableFuture.completedFuture(null);
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
// Make sure to run the listeners after re-registered.
register.thenAccept(__ -> {
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
}
});
});

} catch (RejectedExecutionException e) {
// Executor is shutting down
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -987,8 +989,12 @@ protected void monitor() {
return;
}

// Monitor broker registry
// Periodically check the broker registry in case metadata store fails.
validateBrokerRegistry();

// Monitor role
// Periodically check the role in case ZK watcher fails.
// Periodically check the role in case metadata store fails.
var isChannelOwner = serviceUnitStateChannel.isChannelOwner();
if (isChannelOwner) {
// System topic config might fail due to the race condition
Expand Down Expand Up @@ -1087,5 +1093,15 @@ private boolean isPersistentSystemTopicUsed() {
.equals(pulsar.getConfiguration().getLoadManagerServiceUnitStateTableViewClassName());
}

private void validateBrokerRegistry()
throws ExecutionException, InterruptedException, TimeoutException {
var timeout = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
var lookup = brokerRegistry.lookupAsync(brokerRegistry.getBrokerId()).get(timeout, TimeUnit.SECONDS);
if (lookup.isEmpty()) {
log.warn("Found this broker:{} has not registered yet. Trying to register it",
brokerRegistry.getBrokerId());
brokerRegistry.registerAsync().get(timeout, TimeUnit.SECONDS);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
Expand All @@ -108,13 +110,16 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10;
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000;
private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
private final Map<String, CompletableFuture<String>> getOwnerRequests;
private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;

private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;

Expand Down Expand Up @@ -350,6 +355,11 @@ protected LeaderElectionService getLeaderElectionService() {
.get().getLeaderElectionService();
}

@VisibleForTesting
protected PulsarAdmin getPulsarAdmin() throws PulsarServerException {
return pulsar.getAdminClient();
}

@Override
public synchronized void close() throws PulsarServerException {
channelState = Closed;
Expand Down Expand Up @@ -448,6 +458,14 @@ private CompletableFuture<Optional<String>> getActiveOwnerAsync(
String serviceUnit,
ServiceUnitState state,
Optional<String> owner) {

// If this broker's registry does not exist(possibly suffering from connecting to the metadata store),
// we return the owner without its activeness check.
// This broker tries to serve lookups on a best efforts basis when metadata store connection is unstable.
if (!brokerRegistry.isRegistered()) {
return CompletableFuture.completedFuture(owner);
}

return dedupeGetOwnerRequest(serviceUnit)
.thenCompose(newOwner -> {
if (newOwner == null) {
Expand Down Expand Up @@ -1255,19 +1273,25 @@ private MetadataState getMetadataState() {
}

private void handleBrokerCreationEvent(String broker) {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}

if (!cleanupJobs.isEmpty() && cleanupJobs.containsKey(broker)) {
healthCheckBrokerAsync(broker)
.thenAccept(__ -> {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}
}
});
}
}

Expand Down Expand Up @@ -1431,6 +1455,37 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
System.currentTimeMillis() - started);
}

private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) {
CompletableFuture<Void> future = new CompletableFuture<>();
doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future);
return future;
}

private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
.whenComplete((__, e) -> {
if (e == null) {
log.info("Completed health-check broker :{}", brokerId, e);
future.complete(null);
return;
}
if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
log.error("Failed health-check broker :{}", brokerId, e);
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
} else {
pulsar.getExecutor()
.schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
MILLISECONDS);
}
});
} catch (PulsarServerException e) {
future.completeExceptionally(e);
}
}

private synchronized void doCleanup(String broker, boolean gracefully) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
Expand All @@ -1444,6 +1499,23 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
return;
}

// if not gracefully, verify the broker is inactive by health-check.
if (!gracefully) {
try {
healthCheckBrokerAsync(broker).get(
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
log.warn("Found that the broker to clean is healthy. Skip the broker:{}'s orphan bundle cleanup",
broker);
return;
} catch (Exception e) {
if (debug()) {
log.info("Failed to check broker:{} health", broker, e);
}
log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker);
}
}


long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Expand Down
Loading
Loading