Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix rate limiter token bucket and clock consistency issues causing excessive throttling and connection timeouts #23930

Merged
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
cccc564
[fix][broker] Fix rate limiting causing connections to time out
lhotari Feb 5, 2025
79c9cce
Fix test
lhotari Feb 5, 2025
9fa43a9
Add test that checks that clock leaping backward or forward would be …
lhotari Feb 5, 2025
f28af85
Improve DefaultMonotonicSnapshotClock so that requests don't get dela…
lhotari Feb 5, 2025
a99fe0c
Also test with small offsets
lhotari Feb 5, 2025
9072713
Improve test case
lhotari Feb 5, 2025
f719dbc
Use JMH blackhole in test
lhotari Feb 5, 2025
c60c4b8
Improve code coverage
lhotari Feb 6, 2025
696f9fb
Refactor: Split out logic for leap detection and monotonic tick updating
lhotari Feb 6, 2025
d6d60f7
Remove invalid test
lhotari Feb 6, 2025
0886054
Revert "Remove invalid test"
lhotari Feb 6, 2025
f4feda1
Add test mode to DefaultMonotonicSnapshotClock so that thread updates…
lhotari Feb 6, 2025
13aa2f7
Use test mode to fix test
lhotari Feb 6, 2025
8b36d71
Improve testability
lhotari Feb 6, 2025
d9fb30f
Add failing test case
lhotari Feb 6, 2025
8c851e2
Fix test
lhotari Feb 6, 2025
846cc89
Add failing test case for the negative tokens case
lhotari Feb 6, 2025
58f8b35
Don't handle leaps forward since those cannot be detected properly
lhotari Feb 6, 2025
91ae4c6
Remove separate test mode since it's not needed
lhotari Feb 7, 2025
487419e
Fix parameter
lhotari Feb 7, 2025
571b2c2
Enable batching of update requests by using a AtomicLong for the requ…
lhotari Feb 7, 2025
532998d
Removing remaining parts of the test mode
lhotari Feb 7, 2025
a21eee3
Remove synchronization from tick updater since the usual path is sing…
lhotari Feb 7, 2025
eb82ab9
Improve comments
lhotari Feb 7, 2025
d1dd4b7
Reorder methods
lhotari Feb 7, 2025
d5ea482
Improve javadoc
lhotari Feb 7, 2025
b19e0de
Reduce excessive logging in test
lhotari Feb 7, 2025
b46fbab
Reduce duplication in JMH test
lhotari Feb 7, 2025
73c2b8c
Add JMH benchmark for DefaultMonotonicSnapshotClock
lhotari Feb 7, 2025
a22da98
Add more instructions for running JMH benchmarks
lhotari Feb 7, 2025
4287838
Add unit tests for leap detection
lhotari Feb 7, 2025
12cb400
Move updating of requestCount outside of the synchronization blocks t…
lhotari Feb 7, 2025
31a1b30
Fix typo and improve comment
lhotari Feb 7, 2025
6e4a94f
Optimize token calculation performance and correctness
lhotari Feb 7, 2025
7463891
Improve eventual consistency test
lhotari Feb 7, 2025
b04752d
If condition when newTokens should be 0
lhotari Feb 7, 2025
ea4f7d5
Improve logic to update lastNanos so that races are prevented with CAS
lhotari Feb 7, 2025
ca7cae4
Add "getTokensUpdatesTokens" to AsyncTokenBucket to reduce eventual c…
lhotari Feb 7, 2025
543c978
Reduce test flakiness
lhotari Feb 7, 2025
d159198
Let MessageDispatchThrottlingTest#reset handle deletion
lhotari Feb 7, 2025
936cdb0
Reduce test flakiness for waiting to new rate to be applied
lhotari Feb 7, 2025
dae8220
Prevent NPEs in DispatchRateLimiter when limit has changed
lhotari Feb 7, 2025
d62f97e
Fix switchToConsistentTokensView behavior
lhotari Feb 7, 2025
1d68522
Revert using getTokensUpdatesTokens mode by default since eventual co…
lhotari Feb 7, 2025
38e9f7b
Rename getTokensUpdatesTokens to consistentTokensView
lhotari Feb 7, 2025
f5c3e57
Use consistent tokens view for SubscribeRateLimiter
lhotari Feb 7, 2025
b79ed85
Fix issue with restartBroker in tests
lhotari Feb 7, 2025
5910b3c
Ignore metadata change when broker isn't running
lhotari Feb 7, 2025
fda2337
Move dispatch throttling tests to broker-api group
lhotari Feb 7, 2025
759fafe
Use AssertJ for better error message
lhotari Feb 7, 2025
5e0a327
Improve test cleanup for retries
lhotari Feb 7, 2025
f024203
Use unique namespaces
lhotari Feb 7, 2025
9277208
Extract common base class to avoid test duplication
lhotari Feb 7, 2025
96781e9
Reduce flakiness
lhotari Feb 7, 2025
234cfc9
Refactor common config
lhotari Feb 7, 2025
8ef12ca
Fix flakiness
lhotari Feb 7, 2025
68ba451
Move MessagePublishThrottlingTest to broker-api test group
lhotari Feb 7, 2025
2d99778
Fix issue in lookupUrl change in test class
lhotari Feb 7, 2025
c67ceb0
Revisit startBroker method in test base class
lhotari Feb 7, 2025
6de794b
Revisit logic one more time in test class
lhotari Feb 7, 2025
f9aab2e
Refactor consistency settings in AsyncTokenBucket and add Javadocs
lhotari Feb 8, 2025
e4f4689
Attempt to fix flaky test MessageDispatchThrottlingTest
lhotari Feb 8, 2025
42fb876
Use consistent tokens view in flaky RGUsageMTAggrWaitForAllMsgsTest
lhotari Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions microbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,29 @@ For fast recompiling of the benchmarks (without compiling Pulsar modules) and cr
mvn -Pmicrobench -pl microbench clean package
```

### Running specific benchmarks

Display help:

```shell
java -jar microbench/target/microbenchmarks.jar -h
```

Listing all benchmarks:

```shell
java -jar microbench/target/microbenchmarks.jar -l
```

Running specific benchmarks:

```shell
java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*"
```

Checking what benchmarks match the pattern:

```shell
java -jar microbench/target/microbenchmarks.jar ".*BenchmarkName.*" -lp
```

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@Fork(3)
@BenchmarkMode(Mode.Throughput)
Expand All @@ -59,23 +60,29 @@ public void teardown() {
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void consumeTokensBenchmark001Threads() {
asyncTokenBucket.consumeTokens(1);
public void consumeTokensBenchmark001Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole);
}

@Threads(10)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void consumeTokensBenchmark010Threads() {
asyncTokenBucket.consumeTokens(1);
public void consumeTokensBenchmark010Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole);
}

@Threads(100)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void consumeTokensBenchmark100Threads() {
public void consumeTokensBenchmark100Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole);
}

private void consumeTokenAndGetTokens(Blackhole blackhole) {
asyncTokenBucket.consumeTokens(1);
// blackhole is used to ensure that the compiler doesn't do dead code elimination
blackhole.consume(asyncTokenBucket.getTokens());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.qos;

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@Fork(3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class DefaultMonotonicSnapshotClockBenchmark {
private DefaultMonotonicSnapshotClock monotonicSnapshotClock =
new DefaultMonotonicSnapshotClock(TimeUnit.MILLISECONDS.toNanos(1), System::nanoTime);

@TearDown(Level.Iteration)
public void teardown() {
monotonicSnapshotClock.close();
}

@Threads(1)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void getTickNanos001Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole, false);
}

@Threads(10)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void getTickNanos010Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole, false);
}

@Threads(100)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void getTickNanos100Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole, false);
}

@Threads(1)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void getTickNanosRequestSnapshot001Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole, true);
}

@Threads(10)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void getTickNanosRequestSnapshot010Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole, true);
}

@Threads(100)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void getTickNanosRequestSnapshot100Threads(Blackhole blackhole) {
consumeTokenAndGetTokens(blackhole, true);
}

private void consumeTokenAndGetTokens(Blackhole blackhole, boolean requestSnapshot) {
// blackhole is used to ensure that the compiler doesn't do dead code elimination
blackhole.consume(monotonicSnapshotClock.getTickNanos(requestSnapshot));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ public static void resetToDefaultEventualConsistentTokensView() {
protected AsyncTokenBucket(MonotonicSnapshotClock clockSource, long resolutionNanos) {
this.clockSource = clockSource;
this.resolutionNanos = resolutionNanos;
this.lastNanos = Long.MIN_VALUE;
}

public static FinalRateAsyncTokenBucketBuilder builder() {
Expand Down Expand Up @@ -155,20 +156,23 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea
throw new IllegalArgumentException("consumeTokens must be >= 0");
}
long currentNanos = clockSource.getTickNanos(forceUpdateTokens);
long newTokens = 0;
// check if the tokens should be updated immediately
if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
// calculate the number of new tokens since the last update
long newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
// calculate the total amount of tokens to consume in this update
newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
}
if (newTokens > 0) {
// flush the pendingConsumedTokens by calling "sumThenReset"
long totalConsumedTokens = consumeTokens + pendingConsumedTokens.sumThenReset();
long currentPendingConsumedTokens = pendingConsumedTokens.sumThenReset();
// calculate the token delta by subtracting the consumed tokens from the new tokens
long tokenDelta = newTokens - currentPendingConsumedTokens;
// update the tokens and return the current token value
return TOKENS_UPDATER.updateAndGet(this,
currentTokens ->
// after adding new tokens, limit the tokens to the capacity
Math.min(currentTokens + newTokens, getCapacity())
// subtract the consumed tokens
- totalConsumedTokens);
// limit the tokens to the capacity of the bucket
currentTokens -> Math.min(currentTokens + tokenDelta, getCapacity())
// subtract the consumed tokens from the capped tokens
- consumeTokens);
} else {
// eventual consistent fast path, tokens are not updated immediately

Expand All @@ -177,8 +181,13 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea
pendingConsumedTokens.add(consumeTokens);
}

// return Long.MIN_VALUE if the current value of tokens is unknown due to the eventual consistency
return Long.MIN_VALUE;
if (forceUpdateTokens) {
// return the current tokens balance without updating the tokens and resetting the pendingConsumedTokens
return tokens - pendingConsumedTokens.sum();
lhotari marked this conversation as resolved.
Show resolved Hide resolved
} else {
// return Long.MIN_VALUE if the current value of tokens is unknown due to the eventual consistency
return Long.MIN_VALUE;
}
}
}

Expand Down Expand Up @@ -211,8 +220,16 @@ private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUp
*/
private long calculateNewTokensSinceLastUpdate(long currentNanos) {
long newTokens;
long previousLastNanos = LAST_NANOS_UPDATER.getAndSet(this, currentNanos);
if (previousLastNanos == 0) {
long previousLastNanos = LAST_NANOS_UPDATER.getAndUpdate(this,
currentLastNanos -> {
// update lastNanos only if at least resolutionNanos/2 nanoseconds has passed since the last update
if (currentNanos >= currentLastNanos + resolutionNanos / 2) {
return currentNanos;
} else {
return currentLastNanos;
}
});
if (previousLastNanos == Long.MIN_VALUE || previousLastNanos >= currentNanos) {
newTokens = 0;
} else {
long durationNanos = currentNanos - previousLastNanos + REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
Expand Down
Loading
Loading