-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] defer the ownership checks if the owner is inactive (ExtensibleLoadManager) #21811
Conversation
…ip (ExtensibleLoadManager)
6e42ab0
to
c43e55f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, had some cosmetic recommendations.
ownerAfterDeferred -> ownerAfterDeferred == null ? Optional.empty() | ||
: Optional.of(ownerAfterDeferred)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we simplify this to Optional.ofNullable
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. updated.
var override = getOverrideInactiveBrokerStateData( | ||
orphanData, selectedBroker, inactiveBroker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
var override = getOverrideInactiveBrokerStateData( | |
orphanData, selectedBroker, inactiveBroker); | |
var override = getOverrideInactiveBrokerStateData(orphanData, selectedBroker, inactiveBroker); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. updated.
} catch (Throwable e) { | ||
return false; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove this and add ignoreExceptions
to the Awaitility
chain above, similar to instances below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
@@ -259,9 +281,11 @@ public void testStopBroker() throws Exception { | |||
} | |||
} | |||
|
|||
String broker1 = admin.lookups().lookupTopic(topicName); | |||
Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wait period here is larger than the test timeout itself.
Awaitility.waitAtMost(60, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { | |
Awaitility.waitAtMost(40, TimeUnit.SECONDS).ignoreExceptions().untilAsserted(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increased the test time, instead.
@@ -380,8 +404,7 @@ public void testIsolationPolicy() throws Exception { | |||
fail(); | |||
} catch (Exception ex) { | |||
log.error("Failed to lookup topic: ", ex); | |||
assertThat(ex.getMessage()).containsAnyOf("Failed to look up a broker", | |||
"Failed to select the new owner broker for bundle"); | |||
assertThat(ex.getMessage()).containsAnyOf("Failed to select the new owner broker for bundle"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
assertThat(ex.getMessage()).containsAnyOf("Failed to select the new owner broker for bundle"); | |
assertThat(ex.getMessage()).contains("Failed to select the new owner broker for bundle"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
public void testActiveGetOwner() | ||
throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
public void testActiveGetOwner() | |
throws IllegalAccessException, ExecutionException, InterruptedException, TimeoutException { | |
public void testActiveGetOwner() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. updated.
try { | ||
channel1.getOwnerAsync(bundle).get(); | ||
fail(); | ||
} catch (Exception e) { | ||
if (e.getCause() instanceof TimeoutException) { | ||
// expected | ||
} else { | ||
fail(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reads a bit more clearly IMO, but I haven't seen it widely used in our tests:
try { | |
channel1.getOwnerAsync(bundle).get(); | |
fail(); | |
} catch (Exception e) { | |
if (e.getCause() instanceof TimeoutException) { | |
// expected | |
} else { | |
fail(); | |
} | |
} | |
var ex = expectThrows(ExecutionException.class, () -> channel1.getOwnerAsync(bundle).get()); | |
assertTrue(ex.getCause() instanceof TimeoutException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. updated.
doReturn(CompletableFuture.completedFuture(Optional.empty())) | ||
.when(loadManager).selectAsync(any(), any()); | ||
var leaderChannel = channel1; | ||
String leader = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we rename this to leader1
, to stay consistent with the naming scheme?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
...ava/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelImpl.java
Show resolved
Hide resolved
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## master #21811 +/- ##
============================================
- Coverage 73.58% 73.54% -0.04%
- Complexity 32254 32278 +24
============================================
Files 1858 1858
Lines 138021 138110 +89
Branches 15111 15137 +26
============================================
+ Hits 101558 101579 +21
- Misses 28606 28670 +64
- Partials 7857 7861 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
… (ExtensibleLoadManager) (apache#21811)
… (ExtensibleLoadManager) (#21811)
Motivation
When a broker shutdown or gets killed, the leader broker will get notified and soon reassign the orphan bundle ownerships. Meanwhile, the ownership lookup to these orphan bundles should be deferred til the new brokers own them. This will further reduce the lookup request retries when brokers are shutdown or get killed.
Modifications
ServiceUnitStateChannelImpl.getOwnerAsync
, additionally, check if the current owner broker is active or not in the broker registry. If active, return the current owner. Otherwise, defer the ownership lookup requests til the new owner owns it.Free
.writeLoadReportOnZookeeper
writeResourceQuotasToZooKeeper
Verifying this change
Added a test case
testActiveGetOwner
to cover this.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: heesung-sn#58