Skip to content

Commit

Permalink
Handle orphaned rate-limit buckets more gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
MinnDevelopment committed Nov 18, 2023
1 parent b7937d0 commit 18fe3cd
Showing 1 changed file with 12 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,8 +231,8 @@ private void runBucket(Bucket bucket)
return;
// Schedule a new bucket worker if no worker is running
MiscUtil.locked(lock, () ->
rateLimitQueue.computeIfAbsent(bucket,
(k) -> config.getPool().schedule(bucket, bucket.getRateLimit(), TimeUnit.MILLISECONDS)));
rateLimitQueue.computeIfAbsent(bucket,
k -> config.getPool().schedule(bucket, bucket.getRateLimit(), TimeUnit.MILLISECONDS)));
}

private long parseLong(String input)
Expand All @@ -252,9 +252,9 @@ private long getNow()
return System.currentTimeMillis();
}

private void updateBucket(Route.CompiledRoute route, Response response)
private Bucket updateBucket(Route.CompiledRoute route, Response response)
{
MiscUtil.locked(lock, () ->
return MiscUtil.locked(lock, () ->
{
try
{
Expand Down Expand Up @@ -302,14 +302,16 @@ else if (cloudflare)
boolean firstHit = hitRatelimit.add(baseRoute) && retryAfter < 60000;
// Update the bucket to the new information
bucket.remaining = 0;
bucket.reset = getNow() + retryAfter;
bucket.reset = now + retryAfter;
// don't log warning if we hit the rate limit for the first time, likely due to initialization of the bucket
// unless its a long retry-after delay (more than a minute)
if (firstHit)
log.debug("Encountered 429 on route {} with bucket {} Retry-After: {} ms Scope: {}", baseRoute, bucket.bucketId, retryAfter, scope);
else
log.warn("Encountered 429 on route {} with bucket {} Retry-After: {} ms Scope: {}", baseRoute, bucket.bucketId, retryAfter, scope);
}

log.trace("Updated bucket {} to retry after {}", bucket.bucketId, bucket.reset - now);
return bucket;
}

Expand Down Expand Up @@ -367,7 +369,8 @@ public void enqueue(Work request)

public void retry(Work request)
{
requests.addFirst(request);
if (!moveRequest(request))
requests.addFirst(request);
}

public long getReset()
Expand Down Expand Up @@ -423,7 +426,7 @@ public Queue<Work> getRequests()
return requests;
}

protected Boolean moveRequest(Work request)
protected boolean moveRequest(Work request)
{
return MiscUtil.locked(lock, () ->
{
Expand All @@ -433,9 +436,8 @@ protected Boolean moveRequest(Work request)
{
bucket.enqueue(request);
runBucket(bucket);
return true;
}
return false;
return bucket != this;
});
}

Expand Down Expand Up @@ -477,16 +479,9 @@ public void run()
}

Work request = requests.removeFirst();
if (request.isSkipped())
if (request.isSkipped() || moveRequest(request))
continue;

// Check if a bucket has been discovered and initialized for this route
if (isUninit())
{
boolean shouldSkip = moveRequest(request);
if (shouldSkip) continue;
}

if (execute(request)) break;
}

Expand Down

0 comments on commit 18fe3cd

Please sign in to comment.