Skip to content

Commit

Permalink
add health check verfiication
Browse files Browse the repository at this point in the history
  • Loading branch information
heesung-sn committed Oct 2, 2024
1 parent 0bb0dd8 commit ead0246
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService.State;
Expand Down Expand Up @@ -368,20 +369,26 @@ public void isReady(@Suspended AsyncResponse asyncResponse) {
@ApiOperation(value = "Run a healthCheck against the broker")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Everything is OK"),
@ApiResponse(code = 307, message = "Current broker is not the target broker"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@ApiParam(value = "Topic Version")
@QueryParam("topicVersion") TopicVersion topicVersion) {
@QueryParam("topicVersion") TopicVersion topicVersion,
@QueryParam("brokerId") String brokerId) {
validateSuperUserAccessAsync()
.thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> maybeRedirectToBroker(
StringUtils.isBlank(brokerId) ? pulsar().getBrokerId() : brokerId))
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
asyncResponse.resume(Response.ok("ok").build());
}).exceptionally(ex -> {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
if (!isRedirectException(ex)) {
LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
}
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class BrokerRegistryImpl implements BrokerRegistry {

private final PulsarService pulsar;

private static final int MAX_REGISTER_RETRY = 10;

private static final int MAX_REGISTER_RETRY_DELAY_IN_MILLIS = 1000;

private final ServiceConfiguration conf;

private final BrokerLookupData brokerLookupData;
Expand Down Expand Up @@ -149,6 +153,35 @@ public CompletableFuture<Void> registerAsync() {
});
}

private void doRegisterAsyncWithRetries(int retry, CompletableFuture<Void> future) {
this.scheduler.schedule(() -> {
registerAsync().whenComplete((__, e) -> {
if (e != null) {
if (retry == MAX_REGISTER_RETRY) {
future.completeExceptionally(new PulsarServerException("Stopped registering self retries", e));
} else {
doRegisterAsyncWithRetries(retry + 1, future);
}
} else {
future.complete(null);
}
});
}, Math.min(MAX_REGISTER_RETRY_DELAY_IN_MILLIS, retry * retry * 50), TimeUnit.MILLISECONDS);
}

private CompletableFuture<Void> registerAsyncWithRetries() {
var future = registerAsync();
return future.handle((__, e) -> {
if (e != null) {
var retryFuture = new CompletableFuture<Void>();
doRegisterAsyncWithRetries(1, retryFuture);
return retryFuture.join();
} else {
return null;
}
});
}

@Override
public synchronized void unregister() throws MetadataStoreException {
if (state.compareAndSet(State.Registered, State.Unregistering)) {
Expand Down Expand Up @@ -235,17 +268,25 @@ private void handleMetadataStoreNotification(Notification t) {
// The registered node is an ephemeral node that could be deleted when the metadata store client's session
// is expired. In this case, we should register again.
final var brokerId = t.getPath().substring(LOADBALANCE_BROKERS_ROOT.length() + 1);

CompletableFuture<Void> register;
if (t.getType() == NotificationType.Deleted && getBrokerId().equals(brokerId)) {
registerAsync();
register = registerAsyncWithRetries();
} else {
register = CompletableFuture.completedFuture(null);
}
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
// Make sure to run the listeners after re-registered.
register.thenAccept(__ -> {
if (listeners.isEmpty()) {
return;
}
this.scheduler.submit(() -> {
for (BiConsumer<String, NotificationType> listener : listeners) {
listener.accept(brokerId, t.getType());
}
});
});

} catch (RejectedExecutionException e) {
// Executor is shutting down
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,13 @@
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.Reflections;
Expand All @@ -108,13 +110,16 @@ public class ServiceUnitStateChannelImpl implements ServiceUnitStateChannel {
private static final long MIN_CLEAN_UP_DELAY_TIME_IN_SECS = 0; // 0 secs to clean immediately
private static final long MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS = 10;
private static final long MAX_OWNED_BUNDLE_COUNT_DELAY_TIME_IN_MILLIS = 10 * 60 * 1000;
private static final long MAX_BROKER_HEALTH_CHECK_RETRY = 3;
private static final long MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS = 1000;
private final PulsarService pulsar;
private final ServiceConfiguration config;
private final Schema<ServiceUnitStateData> schema;
private final Map<String, CompletableFuture<String>> getOwnerRequests;
private final String brokerId;
private final Map<String, CompletableFuture<Void>> cleanupJobs;
private final StateChangeListeners stateChangeListeners;

private BrokerRegistry brokerRegistry;
private LeaderElectionService leaderElectionService;

Expand Down Expand Up @@ -350,6 +355,11 @@ protected LeaderElectionService getLeaderElectionService() {
.get().getLeaderElectionService();
}

@VisibleForTesting
protected PulsarAdmin getPulsarAdmin() throws PulsarServerException {
return pulsar.getAdminClient();
}

@Override
public synchronized void close() throws PulsarServerException {
channelState = Closed;
Expand Down Expand Up @@ -1255,20 +1265,24 @@ private MetadataState getMetadataState() {
}

private void handleBrokerCreationEvent(String broker) {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}
}

healthCheckBrokerAsync(broker)
.thenAccept(__ -> {
CompletableFuture<Void> future = cleanupJobs.remove(broker);
if (future != null) {
future.cancel(false);
totalInactiveBrokerCleanupCancelledCnt++;
log.info("Successfully cancelled the ownership cleanup for broker:{}."
+ " Active cleanup job count:{}",
broker, cleanupJobs.size());
} else {
if (debug()) {
log.info("No needs to cancel the ownership cleanup for broker:{}."
+ " There was no scheduled cleanup job. Active cleanup job count:{}",
broker, cleanupJobs.size());
}
}
});
}

private void handleBrokerDeletionEvent(String broker) {
Expand Down Expand Up @@ -1431,6 +1445,37 @@ private void waitForCleanups(String broker, boolean excludeSystemTopics, int max
System.currentTimeMillis() - started);
}

private CompletableFuture<Void> healthCheckBrokerAsync(String brokerId) {
CompletableFuture<Void> future = new CompletableFuture<>();
doHealthCheckBrokerAsyncWithRetries(brokerId, 0, future);
return future;
}

private void doHealthCheckBrokerAsyncWithRetries(String brokerId, int retry, CompletableFuture<Void> future) {
try {
var admin = getPulsarAdmin();
admin.brokers().healthcheckAsync(TopicVersion.V2, Optional.of(brokerId))
.whenComplete((__, e) -> {
if (e == null) {
log.info("Completed health-check broker :{}", brokerId, e);
future.complete(null);
return;
}
if (retry == MAX_BROKER_HEALTH_CHECK_RETRY) {
log.error("Failed health-check broker :{}", brokerId, e);
future.completeExceptionally(FutureUtil.unwrapCompletionException(e));
} else {
pulsar.getExecutor()
.schedule(() -> doHealthCheckBrokerAsyncWithRetries(brokerId, retry + 1, future),
Math.min(MAX_BROKER_HEALTH_CHECK_DELAY_IN_MILLIS, retry * retry * 50),
MILLISECONDS);
}
});
} catch (PulsarServerException e) {
future.completeExceptionally(e);
}
}

private synchronized void doCleanup(String broker, boolean gracefully) {
try {
if (getChannelOwnerAsync().get(MAX_CHANNEL_OWNER_ELECTION_WAITING_TIME_IN_SECS, TimeUnit.SECONDS)
Expand All @@ -1444,6 +1489,23 @@ private synchronized void doCleanup(String broker, boolean gracefully) {
return;
}

// if not gracefully, verify the broker is inactive by health-check.
if (!gracefully) {
try {
healthCheckBrokerAsync(broker).get(
pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds(), SECONDS);
log.warn("Found that the broker to clean is healthy. Skip the broker:{}'s orphan bundle cleanup",
broker);
return;
} catch (Exception e) {
if (debug()) {
log.info("Failed to check broker:{} health", broker, e);
}
log.info("Checked the broker:{} health. Continue the orphan bundle cleanup", broker);
}
}


long startTime = System.nanoTime();
log.info("Started ownership cleanup for the inactive broker:{}", broker);
int orphanServiceUnitCleanupCnt = 0;
Expand Down
Loading

0 comments on commit ead0246

Please sign in to comment.