-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] PIP-307 Added assignedBrokerUrl to CloseProducerCmd to skip lookups upon producer reconnections during unloading #21408
Conversation
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
Show resolved
Hide resolved
/** | ||
* Close this topic - close all producers and subscriptions associated with this topic. | ||
* | ||
* @param closeWithoutDisconnectingClients don't disconnect clients |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the parameter should be disconnectingClients
, it will be clearer to other developers. But it is not this PR's responsibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, I was following the closeWithoutWaitingClientDisconnect
's convention. Probably we need a minor PR to fix this naming, if we want to refactor it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, the naming is confusing at first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, disconnectingClients
is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated.
...test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
Show resolved
Hide resolved
025aac7
to
2d42d6e
Compare
final URI uri = new URI(producer.client.conf.isUseTls() | ||
? closeProducer.getAssignedBrokerServiceUrlTls() | ||
: closeProducer.getAssignedBrokerServiceUrl()); | ||
log.info("[{}] Broker notification of Closed producer: {}. Redirecting to {}.", | ||
remoteAddress, closeProducer.getProducerId(), uri); | ||
producer.getConnectionHandler().connectionClosed(this, 0L, Optional.of(uri)); | ||
} catch (URISyntaxException e) { | ||
log.error("[{}] Invalid redirect url {}/{} for {}", remoteAddress, | ||
closeProducer.getAssignedBrokerServiceUrl(), | ||
closeProducer.getAssignedBrokerServiceUrlTls(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetAssignedBrokerServiceUrl
or GetAssignedBrokerServiceUrlTls
could throw an InvalidStateException
if the respective value is not present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will update this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, left a couple of cosmetic suggestions and questions.
} | ||
); | ||
} catch (Throwable e) { | ||
log.error("Failed to DestinationBrokerLookupData for topic:{}", topic, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: can we improve the error message here?
log.error("Failed to DestinationBrokerLookupData for topic:{}", topic, e); | |
log.error("Failed to lookup destination broker for topic:{}", topic, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. I will update this.
if (data.dstBroker() != null) { | ||
return Optional.of(data.dstBroker()); | ||
} | ||
return Optional.empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can simplify this to:
if (data.dstBroker() != null) { | |
return Optional.of(data.dstBroker()); | |
} | |
return Optional.empty(); | |
return Optional.ofNullable(data.dstBroker()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. I will update this.
@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) { | |||
if (getOwnerRequest != null) { | |||
getOwnerRequest.complete(data.dstBroker()); | |||
} | |||
stateChangeListeners.notify(serviceUnit, data, null); | |||
CompletableFuture<Integer> ownFuture = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The null assignment doesn't seem necessary here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure. I will update this.
stateChangeListeners.notifyOnCompletion(ownFuture, serviceUnit, data) | ||
.whenComplete((__, e) -> log(e, serviceUnit, data, null)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: why this is needed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.
I think we need to log the first and second if
cases only.
Let me update this.
/** | ||
* Close this topic - close all producers and subscriptions associated with this topic. | ||
* | ||
* @param closeWithoutDisconnectingClients don't disconnect clients |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, the naming is confusing at first.
connectionClosed(cnx, null, Optional.empty()); | ||
} | ||
|
||
public void connectionClosed(ClientCnx cnx, Long initialConnectionDelayMs, Optional<URI> hostUrl) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an observation: it's not clear to me when we're using a raw reference and when an Optional. Seems like we could've used Optional<Long> initialConnectionDelayMs
just as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I will make this Optional<Long> initialConnectionDelayMs
… to skip lookups upon producer reconnections during unloading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work! Left some trival comments.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
/** | ||
* Close this topic - close all producers and subscriptions associated with this topic. | ||
* | ||
* @param closeWithoutDisconnectingClients don't disconnect clients |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, disconnectingClients
is better.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
Show resolved
Hide resolved
final URI uri = new URI(producer.client.conf.isUseTls() | ||
? closeProducer.getAssignedBrokerServiceUrlTls() | ||
: closeProducer.getAssignedBrokerServiceUrl()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for fixing the similar issue below, but this one is still present.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for checking this.
I changed the catch clause to catch IllegalStateException
as well in case the broker does not return both urls.
Nice, looks good! |
PIP: #20748
Motivation
Please refer to the pip #20748
There will be a separate PR for the similar changes on
CloseConsumerCmd
.Modifications
Please refer to the pip #20748
topic.close(..)
will be called at the source broker two times:Releasing
, without disconnecting producersOwned
, to fully close the producers with assigned broker informationVerifying this change
Added new tests In
ExtensibleLoadManagerTest
to cover this logic.Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: heesung-sn#53