Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-307 Added assignedBrokerUrl to CloseProducerCmd to skip lookups upon producer reconnections during unloading #21408

Merged
merged 4 commits into from
Nov 6, 2023

Conversation

heesung-sn
Copy link
Contributor

PIP: #20748

Motivation

Please refer to the pip #20748

There will be a separate PR for the similar changes on CloseConsumerCmd.

Modifications

Please refer to the pip #20748

  • Added assignedBrokerUrls to CloseProducerCmd to skip client lookups upon producer reconnections during topic unloading
  • Upon bundle transfer, topic.close(..) will be called at the source broker two times:
    • At Releasing, without disconnecting producers
    • At Owned, to fully close the producers with assigned broker information

Verifying this change

  • Make sure that the change passes the CI checks.

Added new tests In ExtensibleLoadManagerTest to cover this logic.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: heesung-sn#53

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Oct 20, 2023
/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutDisconnectingClients don't disconnect clients
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think the parameter should be disconnectingClients, it will be clearer to other developers. But it is not this PR's responsibility.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I was following the closeWithoutWaitingClientDisconnect's convention. Probably we need a minor PR to fix this naming, if we want to refactor it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, the naming is confusing at first.

Copy link
Contributor

@gaoran10 gaoran10 Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, disconnectingClients is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated.

@heesung-sn heesung-sn force-pushed the pip-307 branch 2 times, most recently from 025aac7 to 2d42d6e Compare October 27, 2023 22:11
Comment on lines 810 to 819
final URI uri = new URI(producer.client.conf.isUseTls()
? closeProducer.getAssignedBrokerServiceUrlTls()
: closeProducer.getAssignedBrokerServiceUrl());
log.info("[{}] Broker notification of Closed producer: {}. Redirecting to {}.",
remoteAddress, closeProducer.getProducerId(), uri);
producer.getConnectionHandler().connectionClosed(this, 0L, Optional.of(uri));
} catch (URISyntaxException e) {
log.error("[{}] Invalid redirect url {}/{} for {}", remoteAddress,
closeProducer.getAssignedBrokerServiceUrl(),
closeProducer.getAssignedBrokerServiceUrlTls(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetAssignedBrokerServiceUrl or GetAssignedBrokerServiceUrlTls could throw an InvalidStateException if the respective value is not present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I will update this part.

Copy link
Contributor

@dragosvictor dragosvictor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall, left a couple of cosmetic suggestions and questions.

}
);
} catch (Throwable e) {
log.error("Failed to DestinationBrokerLookupData for topic:{}", topic, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can we improve the error message here?

Suggested change
log.error("Failed to DestinationBrokerLookupData for topic:{}", topic, e);
log.error("Failed to lookup destination broker for topic:{}", topic, e);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I will update this.

Comment on lines 557 to 560
if (data.dstBroker() != null) {
return Optional.of(data.dstBroker());
}
return Optional.empty();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can simplify this to:

Suggested change
if (data.dstBroker() != null) {
return Optional.of(data.dstBroker());
}
return Optional.empty();
return Optional.ofNullable(data.dstBroker());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I will update this.

@@ -732,15 +770,20 @@ private void handleOwnEvent(String serviceUnit, ServiceUnitStateData data) {
if (getOwnerRequest != null) {
getOwnerRequest.complete(data.dstBroker());
}
stateChangeListeners.notify(serviceUnit, data, null);
CompletableFuture<Integer> ownFuture = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The null assignment doesn't seem necessary here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. I will update this.

Comment on lines 785 to 786
stateChangeListeners.notifyOnCompletion(ownFuture, serviceUnit, data)
.whenComplete((__, e) -> log(e, serviceUnit, data, null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why this is needed now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.
I think we need to log the first and second if cases only.

Let me update this.

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutDisconnectingClients don't disconnect clients
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, the naming is confusing at first.

connectionClosed(cnx, null, Optional.empty());
}

public void connectionClosed(ClientCnx cnx, Long initialConnectionDelayMs, Optional<URI> hostUrl) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation: it's not clear to me when we're using a raw reference and when an Optional. Seems like we could've used Optional<Long> initialConnectionDelayMs just as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I will make this Optional<Long> initialConnectionDelayMs

Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work! Left some trival comments.

/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutDisconnectingClients don't disconnect clients
Copy link
Contributor

@gaoran10 gaoran10 Nov 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, disconnectingClients is better.

Comment on lines +810 to +812
final URI uri = new URI(producer.client.conf.isUseTls()
? closeProducer.getAssignedBrokerServiceUrlTls()
: closeProducer.getAssignedBrokerServiceUrl());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing the similar issue below, but this one is still present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for checking this.

I changed the catch clause to catch IllegalStateException as well in case the broker does not return both urls.

@dragosvictor
Copy link
Contributor

Nice, looks good!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants