-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement PscMetadataClient #44
Conversation
* @throws InterruptedException | ||
* @throws TimeoutException | ||
*/ | ||
public List<TopicRn> listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we make a ClusterUri
class, maybe as a subclass of TopicUri
to avoid confusion? The current contract is that a topic URI has to have a valid topic as the last component, which I think we shouldn't change.
Plus, a variable names clusterUri
of type TopicUri
is rather confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just took a stab at this to see how much work would be involved. It is non-trivial. Introducing a ClusterUri
class will require major refactoring of existing service discovery classes and their logic. The current logic in TopicUri
and subclasses does not require a valid topic as the last component (surprisingly), and service discovery mechanism works out of the box without the topic in TopicUri.
I tried two approaches to introduce ClusterUri - ClusterUri extends BaseTopicUri
and ClusterUri implements TopicUri
. Both of these approaches still required decent effort to refactor some commonly used classes such as service discovery, and both approaches introduces the risk of someone erroneously supplying a ClusterUri instance to the regular consumer/producer API's that require a TopicUri (since TopicUri is now a superclass of ClusterUri).
A third approach is to create separate Cluster* classes equivalent to TopicRn, TopicUri, BaseTopicUri, and KafkaTopicUri. This will still require major refactoring of service discovery and is more work.
I do think that we should refactor at some point to introduce the concept of ClusterUri, but given the effort I think the refactoring itself deserves a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for looking into this. If too much effort is involved, we can leave it in the backlog.
For clusterUri
what is a sample input we'd be passing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A sample input for clusterUri
would be:
plaintext:/rn:kafka:env:cloud_region::cluster:
note that this is the same as getTopicUriPrefix()
in BaseTopicUri
: https://github.com/pinterest/psc/blob/3.2/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java#L124-L126
which is basically constructed via the protocol
+ getTopicRnPrefixString()
: https://github.com/pinterest/psc/blob/3.2/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java#L37
and getTopicRnPrefixString()
is just a regular TopicRn
but without the topic: https://github.com/pinterest/psc/blob/main/psc/src/main/java/com/pinterest/psc/common/TopicRn.java#L43-L45
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! That makes sense. I was trying to see if we can rename clusterUri
to better associate it with TopicUri
as type. But that's not a big concern. We can leave it as is.
Implement
PscMetadataClient
which allows backend-agnostic metadata queries and operations, similar to Kafka'sAdminClient
. This implementation does not yet achieve API parity withKafkaAdminClient
, but rather implements the methods needed for full Flink 1.15 upgrade and integration. Other gaps in the API can be added over time.Backend agnostic behavior is achieved via a similar design as
PscConsumer
andPscProducer
, where the calls to the API's at runtime (e.g.listOffsets()
) will create a backend-specific implementation of the metadata client (PscBackendMetadataClient
) if one is not already registered and created in thePscMetadataClient
instance for that particular backend and cluster. As such, each public API inPscMetadataClient
accepts aTopicUri
which can be "incomplete", i.e. excluding the topic and includes everything up to the cluster. This is enough for theServiceDiscoveryManager
to identify the correct backend service and endpoints to connect to.Unit and integration tests were added to ensure expected behavior and correctness of API's in
PscMetadataClient
under different scenarios.