diff --git a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsUtils.java b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsUtils.java index 633cd6dab..edcbe6489 100644 --- a/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsUtils.java +++ b/cruise-control-metrics-reporter/src/main/java/com/linkedin/kafka/cruisecontrol/metricsreporter/CruiseControlMetricsUtils.java @@ -163,10 +163,11 @@ public static void maybeUpdateConfig(Set configsToAlter, * @param scaleMs the scale for computing the delay * @param base the base for computing the delay * @param maxAttempts the max number of attempts on calling the function + * @param maxSleepMs the maximum sleep time between retries * @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded. * {@code true} if the function stopped requiring a retry before exceeding the max attempts. */ - public static boolean retry(Supplier function, long scaleMs, int base, int maxAttempts) { + public static boolean retry(Supplier function, long scaleMs, int base, int maxAttempts, int maxSleepMs) { if (maxAttempts > 0) { int attempts = 0; long timeToSleep = scaleMs; @@ -179,6 +180,9 @@ public static boolean retry(Supplier function, long scaleMs, int base, return false; } timeToSleep *= base; + if (maxSleepMs > 0 && timeToSleep > maxSleepMs) { + timeToSleep = maxSleepMs; + } Thread.sleep(timeToSleep); } catch (InterruptedException ignored) { @@ -200,7 +204,21 @@ public static boolean retry(Supplier function, long scaleMs, int base, * {@code true} if the function stopped requiring a retry before exceeding the max attempts. */ public static boolean retry(Supplier function, int maxAttempts) { - return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts); + return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts, -1); + } + + /** + * Retries the {@code Supplier} function while it returns {@code true} and for the specified max number of attempts. + * It uses -1 as maxSleepMs, to not limit the sleep time between retries. + * @param function the code to call and retry if needed + * @param scaleMs the scale for computing the delay + * @param base the base for computing the delay + * @param maxAttempts the max number of attempts on calling the function + * @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded. + * {@code true} if the function stopped requiring a retry before exceeding the max attempts. + */ + public static boolean retry(Supplier function, long scaleMs, int base, int maxAttempts) { + return retry(function, scaleMs, base, maxAttempts, -1); } /** diff --git a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java index 5ff505958..93d11feb1 100644 --- a/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java +++ b/cruise-control/src/main/java/com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsReporterSampler.java @@ -186,7 +186,7 @@ public void configure(Map configs) { _currentPartitionAssignment = Collections.emptySet(); LOG.info("Waiting for metrics reporter topic [{}] to be available in the Kafka cluster.", _metricReporterTopic); - if (!CruiseControlMetricsUtils.retry(() -> !this.isMetricsTopicExists(), 5, 1, metricTopicAssertAttempts)) { + if (!CruiseControlMetricsUtils.retry(() -> !this.isMetricsTopicExists(), 2000, 2, metricTopicAssertAttempts, 30_000)) { throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic + "] in the Kafka cluster."); }