diff --git a/src/main/java/com/github/eyefloaters/DataGenerator.java b/src/main/java/com/github/eyefloaters/DataGenerator.java index 873b0f7..686629d 100644 --- a/src/main/java/com/github/eyefloaters/DataGenerator.java +++ b/src/main/java/com/github/eyefloaters/DataGenerator.java @@ -132,9 +132,6 @@ void start(@Observes Startup startupEvent /* NOSONAR */) { virtualExec.submit(() -> { MDC.put(CLUSTER_NAME_KEY, clusterKey); - Admin adminClient = Admin.create(configProperties); - adminClients.put(clusterKey, adminClient); - var allTopics = IntStream .range(0, consumerCount) .mapToObj(groupNumber -> IntStream @@ -143,6 +140,11 @@ void start(@Observes Startup startupEvent /* NOSONAR */) { .flatMap(Function.identity()) .toList(); + initializeCounts(clusterKey, allTopics); + + Admin adminClient = Admin.create(configProperties); + adminClients.put(clusterKey, adminClient); + initialize(clusterKey, adminClient, allTopics, partitionsPerTopic); IntStream.range(0, consumerCount).forEach(groupNumber -> { @@ -209,6 +211,18 @@ void stop(@Observes Shutdown shutdownEvent /* NOSONAR */) throws Exception { virtualExec.awaitTermination(10, TimeUnit.SECONDS); } + void initializeCounts(String clusterKey, List topics) { + Map initialCounts = topics.stream() + .flatMap(topic -> IntStream + .range(0, partitionsPerTopic) + .mapToObj(p -> new TopicPartition(topic, p))) + .map(topicPartition -> Map.entry(topicPartition, 0L)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + recordsProduced.put(clusterKey, new ConcurrentHashMap<>(initialCounts)); + recordsConsumed.put(clusterKey, new ConcurrentHashMap<>(initialCounts)); + } + void initialize(String clusterKey, Admin adminClient, List topics, int partitionsPerTopic) { adminClient.describeCluster() .clusterId() @@ -238,9 +252,7 @@ void initialize(String clusterKey, Admin adminClient, List topics, int p .entrySet() .stream() .map(e -> e.getValue().toCompletionStage().exceptionally(error -> { - if (error instanceof CompletionException ce) { - error = ce.getCause(); - } + error = causeIfCompletionException(error); if (error instanceof GroupNotEmptyException) { log.warnf("Consumer group %s is not empty and cannot be deleted", e.getKey()); @@ -262,9 +274,7 @@ void initialize(String clusterKey, Admin adminClient, List topics, int p .entrySet() .stream() .map(e -> e.getValue().toCompletionStage().exceptionally(error -> { - if (error instanceof CompletionException ce) { - error = ce.getCause(); - } + error = causeIfCompletionException(error); if (!(error instanceof UnknownTopicOrPartitionException)) { log.warnf(error, "Error deleting topic %s: %s", e.getKey(), error.getMessage()); @@ -292,10 +302,7 @@ void initialize(String clusterKey, Admin adminClient, List topics, int p .entrySet() .stream() .map(e -> e.getValue().toCompletionStage().exceptionally(error -> { - if (error instanceof CompletionException ce) { - error = ce.getCause(); - } - + error = causeIfCompletionException(error); log.warnf(error, "Error creating topic %s: %s", e.getKey(), error.getMessage()); return null; })) @@ -304,16 +311,10 @@ void initialize(String clusterKey, Admin adminClient, List topics, int p .thenRun(() -> LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5))) .thenRun(() -> log.infof("Topics created: %s", topics)) .join(); + } - Map initialCounts = topics.stream() - .flatMap(topic -> IntStream - .range(0, partitionsPerTopic) - .mapToObj(p -> new TopicPartition(topic, p))) - .map(topicPartition -> Map.entry(topicPartition, 0L)) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - - recordsProduced.put(clusterKey, new ConcurrentHashMap<>(initialCounts)); - recordsConsumed.put(clusterKey, new ConcurrentHashMap<>(initialCounts)); + Throwable causeIfCompletionException(Throwable thrown) { + return thrown instanceof CompletionException ? thrown.getCause() : thrown; } void produce(String clusterKey, Producer producer, List topics) { @@ -345,7 +346,7 @@ void produce(String clusterKey, Producer producer, List var currentCount = incrementAndGet(recordsProduced, clusterKey, topicPartition); if (currentCount % 5_000 == 0) { - log.infof("Produced %d records to %s/%s (since startup)", currentCount, clusterKey, topicPartition); + log.infof("Produced %d records to %s (since startup)", currentCount, topicPartition); } }) .exceptionally(error -> { @@ -379,7 +380,7 @@ long incrementAndGet(Map> counters, String cl .compute(topicPartition, (k, v) -> v == null ? 1 : v + 1); } - void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, long offset) { + void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, Long offset) { var earliest = getOffset(adminClient, topicPartition, OffsetSpec.earliest()); var latest = getOffset(adminClient, topicPartition, OffsetSpec.latest()); @@ -388,16 +389,15 @@ void maybeDeleteRecords(Admin adminClient, TopicPartition topicPartition, long o long diff = latest.join() - earliest.join(); if (diff >= 5_000) { - log.infof("Offset diff is %d, truncating topic %s, partition %d to offset %d", - diff, topicPartition.topic(), topicPartition.partition(), offset); + log.infof("Offset diff is %d, truncating partition %d to offset %d", + diff, topicPartition, offset); // Truncate the topic to the up to the previous offset var recordsToDelete = Map.of(topicPartition, RecordsToDelete.beforeOffset(offset)); return adminClient.deleteRecords(recordsToDelete) .all() .toCompletionStage(); } else { - log.debugf("Offset diff is %d for topic %s, partition %d at offset %d", - diff, topicPartition.topic(), topicPartition.partition(), offset); + log.debugf("Offset diff is %d for partition %d at offset %d", diff, topicPartition, offset); return CompletableFuture.completedStage(null); } }, virtualExec) diff --git a/src/main/java/com/github/eyefloaters/health/AdminConnectivityCheck.java b/src/main/java/com/github/eyefloaters/health/AdminConnectivityCheck.java index 840ebd5..f145e1c 100644 --- a/src/main/java/com/github/eyefloaters/health/AdminConnectivityCheck.java +++ b/src/main/java/com/github/eyefloaters/health/AdminConnectivityCheck.java @@ -15,16 +15,20 @@ @ApplicationScoped public class AdminConnectivityCheck implements HealthCheck { + @Inject + @Named("adminConfigs") + Map> adminConfigs; + @Inject @Named("adminClients") Map adminClients; @Override public HealthCheckResponse call() { - var builder = HealthCheckResponse.builder().name("generator-liveness"); + var builder = HealthCheckResponse.builder().name("admin-connectivity"); boolean up = true; - long configuredClusters = adminClients.size(); + long configuredClusters = adminConfigs.size(); long availableClusters = adminClients.values() .stream() .map(client -> client.describeCluster() diff --git a/src/main/java/com/github/eyefloaters/health/CounterProgressCheck.java b/src/main/java/com/github/eyefloaters/health/CounterProgressCheck.java index ddf169c..dc31e35 100644 --- a/src/main/java/com/github/eyefloaters/health/CounterProgressCheck.java +++ b/src/main/java/com/github/eyefloaters/health/CounterProgressCheck.java @@ -65,16 +65,16 @@ HealthCheckResponse check(Map> recordsCounts) .flatMap(Collection::stream) .map(Map.Entry::getValue) .min(Instant::compareTo) - .map(Object::toString) - .orElse("NA"); + .orElse(Instant.EPOCH) + .toString(); String latestActivity = latestRecordActivityTimes.values() .stream() .map(Map::entrySet) .flatMap(Collection::stream) .map(Map.Entry::getValue) .max(Instant::compareTo) - .map(Object::toString) - .orElse("NA"); + .orElse(Instant.EPOCH) + .toString(); long currentCount = recordsCounts .values() .stream() @@ -90,7 +90,7 @@ HealthCheckResponse check(Map> recordsCounts) boolean up = true; - if (inactivePartitions > 0) { + if (inactivePartitions > 0 || latestRecordActivityTimes.isEmpty()) { up = false; }