diff --git a/src/main/java/com/github/streamshub/DataGenerator.java b/src/main/java/com/github/streamshub/DataGenerator.java index a8b998b..4120d78 100644 --- a/src/main/java/com/github/streamshub/DataGenerator.java +++ b/src/main/java/com/github/streamshub/DataGenerator.java @@ -8,6 +8,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -89,6 +90,10 @@ public class DataGenerator { @ConfigProperty(name = "datagen.partitions-per-topic", defaultValue = "1") int partitionsPerTopic; + @Inject + @ConfigProperty(name = "datagen.topic-replication-factor") + Optional topicReplicationFactor; + @Inject @ConfigProperty(name = "datagen.topic-name-template", defaultValue = TOPIC_NAME_TEMPLATE) String topicNameTemplate; @@ -121,12 +126,11 @@ public class DataGenerator { Map> recordsConsumed = new ConcurrentHashMap<>(); static ExecutorService virtualExec = Executors.newVirtualThreadPerTaskExecutor(); - static Faker faker = new Faker(); static JsonProvider json = JsonProvider.provider(); AtomicBoolean running = new AtomicBoolean(true); - Random generator = new Random(); - + Random generator; + Faker faker; void start(@Observes Startup startupEvent /* NOSONAR */) { if (!datagenEnabled) { @@ -134,6 +138,8 @@ void start(@Observes Startup startupEvent /* NOSONAR */) { return; } + generator = new Random(); + faker = new Faker(generator); AtomicInteger clientCount = new AtomicInteger(0); adminConfigs.forEach((clusterKey, configProperties) -> @@ -303,7 +309,7 @@ void initialize(String clusterKey, Admin adminClient, List topics, int p } while (--deleteTopicsMax > 0 && !remainingTopics.isEmpty()); var newTopics = topics.stream() - .map(t -> new NewTopic(t, partitionsPerTopic, (short) 3) + .map(t -> new NewTopic(t, Optional.of(partitionsPerTopic), topicReplicationFactor) .configs(Map.of( // 10 MiB "segment.bytes", Integer.toString(10 * 1024 * 1024), @@ -339,7 +345,6 @@ void produce(String clusterKey, Producer producer, List long start = System.currentTimeMillis(); long rate = 100 * ((start / 10000) % 5) + 10; - for (int i = 0; i < rate; i++) { if (!running.get()) { return; diff --git a/src/main/java/com/github/streamshub/health/CounterProgressCheck.java b/src/main/java/com/github/streamshub/health/CounterProgressCheck.java index a61a303..5314d70 100644 --- a/src/main/java/com/github/streamshub/health/CounterProgressCheck.java +++ b/src/main/java/com/github/streamshub/health/CounterProgressCheck.java @@ -13,9 +13,15 @@ import org.apache.kafka.common.TopicPartition; import org.eclipse.microprofile.health.HealthCheckResponse; import org.jboss.logging.Logger; +import org.jboss.logging.Logger.Level; abstract class CounterProgressCheck { + static final Instant STARTUP = Instant.now(); + static final Duration MINS3 = Duration.ofMinutes(3); + static final Duration MINS4 = Duration.ofMinutes(4); + static final Duration MINS5 = Duration.ofMinutes(5); + @Inject Logger log; @@ -47,9 +53,20 @@ HealthCheckResponse check(Map> recordsCounts) } else { var lastUpdate = latestRecordActivityTimes .computeIfAbsent(clusterKey, k -> new ConcurrentHashMap<>(prevCounts.size())) - .get(partition); + .getOrDefault(partition, STARTUP); + + Duration timeStale = Duration.between(lastUpdate, now); + Level level; + + if (timeStale.compareTo(MINS3) < 0) { + level = Level.DEBUG; + } else if (timeStale.compareTo(MINS4) < 0) { + level = Level.INFO; + } else { + level = Level.WARN; + } - log.infof("Counter %s/%s in cluster %s unchanged from %d at %s", + log.logf(level, "Counter %s/%s in cluster %s unchanged from %d at %s", checkName, partition, clusterKey, prevCount, lastUpdate); latestRecordActivityTimes @@ -60,7 +77,7 @@ HealthCheckResponse check(Map> recordsCounts) })); } - Instant inactiveTime = Instant.now().minus(Duration.ofMinutes(5)); + Instant inactiveTime = Instant.now().minus(MINS5); long inactivePartitions = latestRecordActivityTimes.values() .stream()