Skip to content

Commit

Permalink
use exponential backoff with capped sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
k0b3rIT committed Jan 15, 2025
1 parent b70fb07 commit 529cb16
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,11 @@ public static void maybeUpdateConfig(Set<AlterConfigOp> configsToAlter,
* @param scaleMs the scale for computing the delay
* @param base the base for computing the delay
* @param maxAttempts the max number of attempts on calling the function
* @param maxSleepMs the maximum sleep time between retries
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
*/
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts, int maxSleepMs) {
if (maxAttempts > 0) {
int attempts = 0;
long timeToSleep = scaleMs;
Expand All @@ -179,6 +180,9 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
return false;
}
timeToSleep *= base;
if (maxSleepMs > 0 && timeToSleep > maxSleepMs) {
timeToSleep = maxSleepMs;
}
Thread.sleep(timeToSleep);
} catch (InterruptedException ignored) {

Expand All @@ -200,7 +204,21 @@ public static boolean retry(Supplier<Boolean> function, long scaleMs, int base,
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
*/
public static boolean retry(Supplier<Boolean> function, int maxAttempts) {
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts);
return retry(function, DEFAULT_RETRY_BACKOFF_SCALE_MS, DEFAULT_RETRY_BACKOFF_BASE, maxAttempts, -1);
}

/**
* Retries the {@code Supplier<Boolean>} function while it returns {@code true} and for the specified max number of attempts.
* It uses -1 as maxSleepMs, to not limit the sleep time between retries.
* @param function the code to call and retry if needed
* @param scaleMs the scale for computing the delay
* @param base the base for computing the delay
* @param maxAttempts the max number of attempts on calling the function
* @return {@code false} if the function requires a retry, but it cannot be retried, because the max attempts have been exceeded.
* {@code true} if the function stopped requiring a retry before exceeding the max attempts.
*/
public static boolean retry(Supplier<Boolean> function, long scaleMs, int base, int maxAttempts) {
return retry(function, scaleMs, base, maxAttempts, -1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void configure(Map<String, ?> configs) {
_currentPartitionAssignment = Collections.emptySet();

LOG.info("Waiting for metrics reporter topic [{}] to be available in the Kafka cluster.", _metricReporterTopic);
if (!CruiseControlMetricsUtils.retry(() -> !this.isMetricsTopicExists(), 5, 1, metricTopicAssertAttempts)) {
if (!CruiseControlMetricsUtils.retry(() -> !this.isMetricsTopicExists(), 2000, 2, metricTopicAssertAttempts, 30_000)) {
throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic
+ "] in the Kafka cluster.");
}
Expand Down

0 comments on commit 529cb16

Please sign in to comment.