Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PscMetadataClient #44

Merged
merged 7 commits into from
Sep 11, 2024
Merged

Implement PscMetadataClient #44

merged 7 commits into from
Sep 11, 2024

Conversation

jeffxiang
Copy link
Contributor

@jeffxiang jeffxiang commented Aug 28, 2024

Implement PscMetadataClient which allows backend-agnostic metadata queries and operations, similar to Kafka's AdminClient. This implementation does not yet achieve API parity with KafkaAdminClient, but rather implements the methods needed for full Flink 1.15 upgrade and integration. Other gaps in the API can be added over time.

Backend agnostic behavior is achieved via a similar design as PscConsumer and PscProducer, where the calls to the API's at runtime (e.g. listOffsets()) will create a backend-specific implementation of the metadata client (PscBackendMetadataClient) if one is not already registered and created in the PscMetadataClient instance for that particular backend and cluster. As such, each public API in PscMetadataClient accepts a TopicUri which can be "incomplete", i.e. excluding the topic and includes everything up to the cluster. This is enough for the ServiceDiscoveryManager to identify the correct backend service and endpoints to connect to.

Unit and integration tests were added to ensure expected behavior and correctness of API's in PscMetadataClient under different scenarios.

@jeffxiang jeffxiang requested a review from a team as a code owner August 28, 2024 22:38
* @throws InterruptedException
* @throws TimeoutException
*/
public List<TopicRn> listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make a ClusterUri class, maybe as a subclass of TopicUri to avoid confusion? The current contract is that a topic URI has to have a valid topic as the last component, which I think we shouldn't change.

Plus, a variable names clusterUri of type TopicUri is rather confusing.

Copy link
Contributor Author

@jeffxiang jeffxiang Sep 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just took a stab at this to see how much work would be involved. It is non-trivial. Introducing a ClusterUri class will require major refactoring of existing service discovery classes and their logic. The current logic in TopicUri and subclasses does not require a valid topic as the last component (surprisingly), and service discovery mechanism works out of the box without the topic in TopicUri.

I tried two approaches to introduce ClusterUri - ClusterUri extends BaseTopicUri and ClusterUri implements TopicUri. Both of these approaches still required decent effort to refactor some commonly used classes such as service discovery, and both approaches introduces the risk of someone erroneously supplying a ClusterUri instance to the regular consumer/producer API's that require a TopicUri (since TopicUri is now a superclass of ClusterUri).

A third approach is to create separate Cluster* classes equivalent to TopicRn, TopicUri, BaseTopicUri, and KafkaTopicUri. This will still require major refactoring of service discovery and is more work.

I do think that we should refactor at some point to introduce the concept of ClusterUri, but given the effort I think the refactoring itself deserves a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for looking into this. If too much effort is involved, we can leave it in the backlog.
For clusterUri what is a sample input we'd be passing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sample input for clusterUri would be:

plaintext:/rn:kafka:env:cloud_region::cluster:

note that this is the same as getTopicUriPrefix() in BaseTopicUri: https://github.com/pinterest/psc/blob/3.2/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java#L124-L126

which is basically constructed via the protocol + getTopicRnPrefixString(): https://github.com/pinterest/psc/blob/3.2/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java#L37

and getTopicRnPrefixString() is just a regular TopicRn but without the topic: https://github.com/pinterest/psc/blob/main/psc/src/main/java/com/pinterest/psc/common/TopicRn.java#L43-L45

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! That makes sense. I was trying to see if we can rename clusterUri to better associate it with TopicUri as type. But that's not a big concern. We can leave it as is.

@jeffxiang jeffxiang merged commit ce370c3 into 3.2 Sep 11, 2024
1 check passed
@jeffxiang jeffxiang deleted the metadata_client branch September 11, 2024 19:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants