Skip to content

Commit

Permalink
fix: improve health checks with initial values (#19)
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Edgar <[email protected]>
  • Loading branch information
MikeEdgar authored Jan 18, 2024
1 parent c1f66a0 commit 9641b96
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 35 deletions.
56 changes: 28 additions & 28 deletions src/main/java/com/github/eyefloaters/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ void start(@Observes Startup startupEvent /* NOSONAR */) {
virtualExec.submit(() -> {
MDC.put(CLUSTER_NAME_KEY, clusterKey);

Admin adminClient = Admin.create(configProperties);
adminClients.put(clusterKey, adminClient);

var allTopics = IntStream
.range(0, consumerCount)
.mapToObj(groupNumber -> IntStream
Expand All @@ -143,6 +140,11 @@ void start(@Observes Startup startupEvent /* NOSONAR */) {
.flatMap(Function.identity())
.toList();

initializeCounts(clusterKey, allTopics);

Admin adminClient = Admin.create(configProperties);
adminClients.put(clusterKey, adminClient);

initialize(clusterKey, adminClient, allTopics, partitionsPerTopic);

IntStream.range(0, consumerCount).forEach(groupNumber -> {
Expand Down Expand Up @@ -209,6 +211,18 @@ void stop(@Observes Shutdown shutdownEvent /* NOSONAR */) throws Exception {
virtualExec.awaitTermination(10, TimeUnit.SECONDS);
}

void initializeCounts(String clusterKey, List<String> topics) {
Map<TopicPartition, Long> initialCounts = topics.stream()
.flatMap(topic -> IntStream
.range(0, partitionsPerTopic)
.mapToObj(p -> new TopicPartition(topic, p)))
.map(topicPartition -> Map.entry(topicPartition, 0L))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

recordsProduced.put(clusterKey, new ConcurrentHashMap<>(initialCounts));
recordsConsumed.put(clusterKey, new ConcurrentHashMap<>(initialCounts));
}

void initialize(String clusterKey, Admin adminClient, List<String> topics, int partitionsPerTopic) {
adminClient.describeCluster()
.clusterId()
Expand Down Expand Up @@ -238,9 +252,7 @@ void initialize(String clusterKey, Admin adminClient, List<String> topics, int p
.entrySet()
.stream()
.map(e -> e.getValue().toCompletionStage().exceptionally(error -> {
if (error instanceof CompletionException ce) {
error = ce.getCause();
}
error = causeIfCompletionException(error);

if (error instanceof GroupNotEmptyException) {
log.warnf("Consumer group %s is not empty and cannot be deleted", e.getKey());
Expand All @@ -262,9 +274,7 @@ void initialize(String clusterKey, Admin adminClient, List<String> topics, int p
.entrySet()
.stream()
.map(e -> e.getValue().toCompletionStage().exceptionally(error -> {
if (error instanceof CompletionException ce) {
error = ce.getCause();
}
error = causeIfCompletionException(error);

if (!(error instanceof UnknownTopicOrPartitionException)) {
log.warnf(error, "Error deleting topic %s: %s", e.getKey(), error.getMessage());
Expand Down Expand Up @@ -292,10 +302,7 @@ void initialize(String clusterKey, Admin adminClient, List<String> topics, int p
.entrySet()
.stream()
.map(e -> e.getValue().toCompletionStage().exceptionally(error -> {
if (error instanceof CompletionException ce) {
error = ce.getCause();
}

error = causeIfCompletionException(error);
log.warnf(error, "Error creating topic %s: %s", e.getKey(), error.getMessage());
return null;
}))
Expand All @@ -304,16 +311,10 @@ void initialize(String clusterKey, Admin adminClient, List<String> topics, int p
.thenRun(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5)))
.thenRun(() -> log.infof("Topics created: %s", topics))
.join();
}

Map<TopicPartition, Long> initialCounts = topics.stream()
.flatMap(topic -> IntStream
.range(0, partitionsPerTopic)
.mapToObj(p -> new TopicPartition(topic, p)))
.map(topicPartition -> Map.entry(topicPartition, 0L))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

recordsProduced.put(clusterKey, new ConcurrentHashMap<>(initialCounts));
recordsConsumed.put(clusterKey, new ConcurrentHashMap<>(initialCounts));
Throwable causeIfCompletionException(Throwable thrown) {
return thrown instanceof CompletionException ? thrown.getCause() : thrown;
}

void produce(String clusterKey, Producer<byte[], byte[]> producer, List<String> topics) {
Expand Down Expand Up @@ -345,7 +346,7 @@ void produce(String clusterKey, Producer<byte[], byte[]> producer, List<String>
var currentCount = incrementAndGet(recordsProduced, clusterKey, topicPartition);

if (currentCount % 5_000 == 0) {
log.infof("Produced %d records to %s/%s (since startup)", currentCount, clusterKey, topicPartition);
log.infof("Produced %d records to %s (since startup)", currentCount, topicPartition);
}
})
.exceptionally(error -> {
Expand Down Expand Up @@ -379,7 +380,7 @@ long incrementAndGet(Map<String, Map<TopicPartition , Long>> counters, String cl
.compute(topicPartition, (k, v) -> v == null ? 1 : v + 1);
}

void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, long offset) {
void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, Long offset) {
var earliest = getOffset(adminClient, topicPartition, OffsetSpec.earliest());
var latest = getOffset(adminClient, topicPartition, OffsetSpec.latest());

Expand All @@ -388,16 +389,15 @@ void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, long o
long diff = latest.join() - earliest.join();

if (diff >= 5_000) {
log.infof("Offset diff is %d, truncating topic %s, partition %d to offset %d",
diff, topicPartition.topic(), topicPartition.partition(), offset);
log.infof("Offset diff is %d, truncating partition %d to offset %d",
diff, topicPartition, offset);
// Truncate the topic to the up to the previous offset
var recordsToDelete = Map.of(topicPartition, RecordsToDelete.beforeOffset(offset));
return adminClient.deleteRecords(recordsToDelete)
.all()
.toCompletionStage();
} else {
log.debugf("Offset diff is %d for topic %s, partition %d at offset %d",
diff, topicPartition.topic(), topicPartition.partition(), offset);
log.debugf("Offset diff is %d for partition %d at offset %d", diff, topicPartition, offset);
return CompletableFuture.completedStage(null);
}
}, virtualExec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
@ApplicationScoped
public class AdminConnectivityCheck implements HealthCheck {

@Inject
@Named("adminConfigs")
Map<String, Map<String, Object>> adminConfigs;

@Inject
@Named("adminClients")
Map<String, Admin> adminClients;

@Override
public HealthCheckResponse call() {
var builder = HealthCheckResponse.builder().name("generator-liveness");
var builder = HealthCheckResponse.builder().name("admin-connectivity");
boolean up = true;

long configuredClusters = adminClients.size();
long configuredClusters = adminConfigs.size();
long availableClusters = adminClients.values()
.stream()
.map(client -> client.describeCluster()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ HealthCheckResponse check(Map<String, Map<TopicPartition, Long>> recordsCounts)
.flatMap(Collection::stream)
.map(Map.Entry::getValue)
.min(Instant::compareTo)
.map(Object::toString)
.orElse("NA");
.orElse(Instant.EPOCH)
.toString();
String latestActivity = latestRecordActivityTimes.values()
.stream()
.map(Map::entrySet)
.flatMap(Collection::stream)
.map(Map.Entry::getValue)
.max(Instant::compareTo)
.map(Object::toString)
.orElse("NA");
.orElse(Instant.EPOCH)
.toString();
long currentCount = recordsCounts
.values()
.stream()
Expand All @@ -90,7 +90,7 @@ HealthCheckResponse check(Map<String, Map<TopicPartition, Long>> recordsCounts)

boolean up = true;

if (inactivePartitions > 0) {
if (inactivePartitions > 0 || latestRecordActivityTimes.isEmpty()) {
up = false;
}

Expand Down

0 comments on commit 9641b96

Please sign in to comment.