Skip to content

Commit

Permalink
Make metadata client preserve protocol in describeTopicUris()
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 30, 2024
1 parent 560ebf4 commit 11feb61
Show file tree
Hide file tree
Showing 14 changed files with 168 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,39 @@

package com.pinterest.flink.connector.psc.source.enumerator.subscriber;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.TopicUriMetadata;
import com.pinterest.psc.metadata.client.PscMetadataClient;

import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/** The base implementations of {@link PscSubscriber}. */
class PscSubscriberUtils {

private PscSubscriberUtils() {}

static Map<TopicRn, TopicRnMetadata> getAllTopicRnMetadata(PscMetadataClient metadataClient, TopicUri clusterUri) {
static Map<TopicUri, TopicUriMetadata> getAllTopicUriMetadata(PscMetadataClient metadataClient, TopicUri clusterUri) {
try {
List<TopicRn> allTopicRns = metadataClient.listTopicRns(clusterUri, Duration.ofMillis(Long.MAX_VALUE));
return getTopicRnMetadata(metadataClient, clusterUri, allTopicRns);
return getTopicUriMetadata(metadataClient, clusterUri, allTopicRns.stream().map(rn -> new BaseTopicUri(clusterUri.getProtocol(), rn)).collect(Collectors.toList()));
} catch (Exception e) {
throw new RuntimeException("Failed to get metadata for all topics.", e);
}
}

static Map<TopicRn, TopicRnMetadata> getTopicRnMetadata(
PscMetadataClient metadataClient, TopicUri clusterUri, List<TopicRn> topicRns) {
static Map<TopicUri, TopicUriMetadata> getTopicUriMetadata(
PscMetadataClient metadataClient, TopicUri clusterUri, List<TopicUri> topicUris) {
try {
return metadataClient.describeTopicRns(clusterUri, new HashSet<>(topicRns), Duration.ofMillis(Long.MAX_VALUE));
return metadataClient.describeTopicUris(clusterUri, new HashSet<>(topicUris), Duration.ofMillis(Long.MAX_VALUE));
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to get metadata for topicRns %s.", topicRns), e);
String.format("Failed to get metadata for topicUris %s.", topicUris), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.TopicUriMetadata;
import com.pinterest.psc.metadata.client.PscMetadataClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -34,7 +34,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicRnMetadata;
import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicUriMetadata;

/**
* A subscriber to a fixed list of topics. The subscribed topics must have existed in the PSC
Expand All @@ -43,26 +43,34 @@
class PscTopicUriListSubscriber implements PscSubscriber {
private static final long serialVersionUID = -6917603843104947866L;
private static final Logger LOG = LoggerFactory.getLogger(PscTopicUriListSubscriber.class);
private final List<TopicRn> topicRns;
private final List<TopicUri> topicUris;

PscTopicUriListSubscriber(List<String> topicUris) {
this.topicRns = topicUris.stream().map(topicUri -> {
this.topicUris = topicUris.stream().map(topicUri -> {
try {
return BaseTopicUri.validate(topicUri).getTopicRn();
return BaseTopicUri.validate(topicUri);
} catch (TopicUriSyntaxException e) {
throw new RuntimeException(e);
}
}).collect(Collectors.toList());
}

/**
* Get a set of subscribed {@link TopicUriPartition}s. This method will preserve the protocol of the
* supplied topicUris.
*
* @param metadataClient The admin client used to retrieve subscribed topic partitions.
* @param clusterUri The cluster URI to subscribe to.
* @return A set of subscribed {@link TopicUriPartition}s
*/
@Override
public Set<TopicUriPartition> getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) {
LOG.debug("Fetching descriptions for topicRns: {}", topicRns);
final Map<TopicRn, TopicRnMetadata> topicMetadata =
getTopicRnMetadata(metadataClient, clusterUri, topicRns);
LOG.debug("Fetching descriptions for topicUris: {}", topicUris);
final Map<TopicUri, TopicUriMetadata> topicMetadata =
getTopicUriMetadata(metadataClient, clusterUri, topicUris);

Set<TopicUriPartition> subscribedPartitions = new HashSet<>();
for (TopicRnMetadata topic : topicMetadata.values()) {
for (TopicUriMetadata topic : topicMetadata.values()) {
subscribedPartitions.addAll(topic.getTopicUriPartitions());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.TopicUriMetadata;
import com.pinterest.psc.metadata.client.PscMetadataClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,7 +31,7 @@
import java.util.Set;
import java.util.regex.Pattern;

import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getAllTopicRnMetadata;
import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getAllTopicUriMetadata;

/**
* A subscriber to a topic name pattern. Note that this pattern should match only the topic name itself. The pattern
Expand All @@ -46,10 +46,18 @@ class TopicNamePatternSubscriber implements PscSubscriber {
this.topicNamePattern = topicNamePattern;
}

/**
* Get a set of subscribed {@link TopicUriPartition}s. This method will return a set of TopicUriPartitions whose
* protocols match the clusterUri's protocol.
*
* @param metadataClient The admin client used to retrieve subscribed topic partitions.
* @param clusterUri The cluster URI to subscribe to.
* @return A set of subscribed {@link TopicUriPartition}s
*/
@Override
public Set<TopicUriPartition> getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) {
LOG.debug("Fetching descriptions for all topics on PubSub cluster");
final Map<TopicRn, TopicRnMetadata> allTopicRnMetadata = getAllTopicRnMetadata(metadataClient, clusterUri);
final Map<TopicUri, TopicUriMetadata> allTopicRnMetadata = getAllTopicUriMetadata(metadataClient, clusterUri);

Set<TopicUriPartition> subscribedTopicUriPartitions = new HashSet<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.TopicUriMetadata;
import com.pinterest.psc.metadata.client.PscMetadataClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -32,7 +32,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicRnMetadata;
import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicUriMetadata;

/** A subscriber for a partition set. */
class TopicUriPartitionSetSubscriber implements PscSubscriber {
Expand All @@ -46,22 +46,21 @@ class TopicUriPartitionSetSubscriber implements PscSubscriber {

@Override
public Set<TopicUriPartition> getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) {
final List<TopicRn> topicRns =
final List<TopicUri> topicUris =
subscribedPartitions.stream()
.map(TopicUriPartition::getTopicUri)
.map(TopicUri::getTopicRn)
.collect(Collectors.toList());

LOG.debug("Fetching descriptions for topics: {}", topicRns);
final Map<TopicRn, TopicRnMetadata> topicRnMetadata =
getTopicRnMetadata(metadataClient, clusterUri, topicRns);
LOG.debug("Fetching descriptions for topics: {}", topicUris);
final Map<TopicUri, TopicUriMetadata> topicUriMetadata =
getTopicUriMetadata(metadataClient, clusterUri, topicUris);

Set<TopicUriPartition> existingSubscribedPartitions = new HashSet<>();

for (TopicUriPartition subscribedPartition : this.subscribedPartitions) {
if (topicRnMetadata.containsKey(subscribedPartition.getTopicUri().getTopicRn())
if (topicUriMetadata.containsKey(subscribedPartition.getTopicUri())
&& partitionExistsInTopic(
subscribedPartition, topicRnMetadata.get(subscribedPartition.getTopicUri().getTopicRn()))) {
subscribedPartition, topicUriMetadata.get(subscribedPartition.getTopicUri()))) {
existingSubscribedPartitions.add(subscribedPartition);
} else {
throw new RuntimeException(
Expand All @@ -74,7 +73,7 @@ && partitionExistsInTopic(
return existingSubscribedPartitions;
}

private boolean partitionExistsInTopic(TopicUriPartition partition, TopicRnMetadata topicRnMetadata) {
return topicRnMetadata.getTopicUriPartitions().size() > partition.getPartition();
private boolean partitionExistsInTopic(TopicUriPartition partition, TopicUriMetadata topicUriMetadata) {
return topicUriMetadata.getTopicUriPartitions().size() > partition.getPartition();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ public class PscSubscriberTest {
private static final String TOPIC_URI1 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC1;
private static final String TOPIC2 = "pattern-topic";
private static final String TOPIC_URI2 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC2;
private static final String TOPIC_URI1_SECURE = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX.replace("plaintext:/", "secure:/") + TOPIC1;
private static final TopicUriPartition NON_EXISTING_TOPIC_URI_PARTITION = new TopicUriPartition(
PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "removed",
0);
private static AdminClient adminClient;
private static PscMetadataClient pscMetadataClient;
private static PscMetadataClient pscMetadataClientSecure;

@BeforeClass
public static void setup() throws Throwable {
Expand All @@ -60,6 +62,7 @@ public static void setup() throws Throwable {
PscSourceTestEnv.createTestTopic(TOPIC_URI2);
adminClient = PscSourceTestEnv.getAdminClient();
pscMetadataClient = PscSourceTestEnv.getMetadataClient();
pscMetadataClientSecure = PscSourceTestEnv.getSecureMetadataClient();
}

@AfterClass
Expand All @@ -82,6 +85,28 @@ public void testTopicUriListSubscriber() {
assertEquals(expectedSubscribedPartitions, subscribedPartitions);
}

@Test
public void testTopicUriListSubscriberPreservesProtocol() {
PscSubscriber subscriber =
PscSubscriber.getTopicUriListSubscriber(Arrays.asList(TOPIC_URI1_SECURE, TOPIC_URI2));
final Set<TopicUriPartition> subscribedPartitionsPlaintext =
subscriber.getSubscribedTopicUriPartitions(pscMetadataClient, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI);
final Set<TopicUriPartition> subscribedPartitionsSecure =
subscriber.getSubscribedTopicUriPartitions(pscMetadataClientSecure, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI_SECURE);

subscribedPartitionsPlaintext.forEach(tup -> {
if (tup.getTopicUri().getTopic().equals(TOPIC1)) {
assertEquals(TOPIC_URI1_SECURE, tup.getTopicUri().getTopicUriAsString());
} else if (tup.getTopicUri().getTopic().equals(TOPIC2)) {
assertEquals(TOPIC_URI2, tup.getTopicUri().getTopicUriAsString());
} else {
throw new RuntimeException("Unexpected topic: " + tup.getTopicUri().getTopic());
}
});

assertEquals(subscribedPartitionsPlaintext, subscribedPartitionsSecure);
}

@Test
public void testNonExistingTopic() {
final PscSubscriber subscriber =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ public static PscMetadataClient getMetadataClient() throws ConfigurationExceptio
return new PscMetadataClient(PscConfigurationUtils.propertiesToPscConfiguration(props));
}

public static PscMetadataClient getSecureMetadataClient() throws ConfigurationException {
Properties props = new Properties();
props.setProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "psc-source-test-env-metadata-client-secure");
putDiscoveryProperties(props, brokerConnectionStrings, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_SECURE_PREFIX);
return new PscMetadataClient(PscConfigurationUtils.propertiesToPscConfiguration(props));
}

public static PscConsumer<String, Integer> getConsumer() throws ConfigurationException, ConsumerException {
Properties props = new Properties();
props.putAll(standardPscConsumerConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@
*/
public abstract class PscTestEnvironmentWithKafkaAsPubSub {
public static String PSC_TEST_TOPIC_URI_PREFIX = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:cloud_region1::cluster1:";
public static String PSC_TEST_TOPIC_URI_SECURE_PREFIX = "secure:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:cloud_region1::cluster1:";
public static TopicUri PSC_TEST_CLUSTER_URI;
public static TopicUri PSC_TEST_CLUSTER_URI_SECURE;

static {
try {
PSC_TEST_CLUSTER_URI = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_PREFIX);
PSC_TEST_CLUSTER_URI_SECURE = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_SECURE_PREFIX);
} catch (TopicUriSyntaxException e) {
throw new RuntimeException("Unable to validate clusterUri", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.pinterest.psc.metadata.client;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.MessageId;
import com.pinterest.psc.common.TestUtils;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
Expand All @@ -16,7 +15,7 @@
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.integration.KafkaCluster;
import com.pinterest.psc.metadata.MetadataUtils;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.TopicUriMetadata;
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.serde.IntegerDeserializer;
Expand Down Expand Up @@ -135,38 +134,38 @@ public void testListTopicRns() throws Exception {
}

/**
* Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, Duration)} returns the correct
* Tests that {@link PscMetadataClient#describeTopicUris(TopicUri, Collection, Duration)} returns the correct
* metadata for the supplied topic RNs
*
* @throws Exception
*/
@Test
public void testDescribeTopicRns() throws Exception {
public void testDescribeTopicUris() throws Exception {
PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration);
Map<TopicRn, TopicRnMetadata> topicRnDescriptionMap = client.describeTopicRns(
Map<TopicUri, TopicUriMetadata> topicUriDescriptionMap = client.describeTopicUris(
BaseTopicUri.validate(clusterUriStr),
new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)),
new HashSet<>(Arrays.asList(topic1Uri, topic2Uri, topic3Uri)),
Duration.ofMillis(10000)
);
assertEquals(3, topicRnDescriptionMap.size());
assertEquals(3, topicUriDescriptionMap.size());

assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicRn());
assertEquals(partitions1, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().size());
assertEquals(topic1Uri, topicUriDescriptionMap.get(topic1Uri).getTopicUri());
assertEquals(partitions1, topicUriDescriptionMap.get(topic1Uri).getTopicUriPartitions().size());
for (int i = 0; i < partitions1; i++) {
assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn());
assertEquals(i, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getPartition());
assertEquals(topic1Uri, topicUriDescriptionMap.get(topic1Uri).getTopicUriPartitions().get(i).getTopicUri());
assertEquals(i, topicUriDescriptionMap.get(topic1Uri).getTopicUriPartitions().get(i).getPartition());
}
assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicRn());
assertEquals(partitions2, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().size());
assertEquals(topic2Uri, topicUriDescriptionMap.get(topic2Uri).getTopicUri());
assertEquals(partitions2, topicUriDescriptionMap.get(topic2Uri).getTopicUriPartitions().size());
for (int i = 0; i < partitions2; i++) {
assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn());
assertEquals(i, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getPartition());
assertEquals(topic2Uri, topicUriDescriptionMap.get(topic2Uri).getTopicUriPartitions().get(i).getTopicUri());
assertEquals(i, topicUriDescriptionMap.get(topic2Uri).getTopicUriPartitions().get(i).getPartition());
}
assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicRn());
assertEquals(partitions3, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().size());
assertEquals(topic3Uri, topicUriDescriptionMap.get(topic3Uri).getTopicUri());
assertEquals(partitions3, topicUriDescriptionMap.get(topic3Uri).getTopicUriPartitions().size());
for (int i = 0; i < partitions3; i++) {
assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn());
assertEquals(i, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getPartition());
assertEquals(topic3Uri, topicUriDescriptionMap.get(topic3Uri).getTopicUriPartitions().get(i).getTopicUri());
assertEquals(i, topicUriDescriptionMap.get(topic3Uri).getTopicUriPartitions().get(i).getPartition());
}
client.close();
}
Expand Down
10 changes: 10 additions & 0 deletions psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.pinterest.psc.metadata;

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;

/**
* Utility class for common metadata logic
Expand All @@ -24,4 +26,12 @@ public static TopicRn createTopicRn(TopicUri topicUri, String topicName) {
topicName
);
}

public static TopicUri createTopicUri(String topic, TopicRn clusterRn, String protocol) {
try {
return BaseTopicUri.validate(protocol + ":" + TopicUri.SEPARATOR + clusterRn.getTopicRnPrefixString() + topic);
} catch (TopicUriSyntaxException e) {
throw new RuntimeException("Failed to create topic URI for topic " + topic, e);
}
}
}
Loading

0 comments on commit 11feb61

Please sign in to comment.