diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberUtils.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberUtils.java index 3280dd2..77ce5af 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberUtils.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberUtils.java @@ -18,37 +18,39 @@ package com.pinterest.flink.connector.psc.source.enumerator.subscriber; +import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscMetadataClient; import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; /** The base implementations of {@link PscSubscriber}. */ class PscSubscriberUtils { private PscSubscriberUtils() {} - static Map getAllTopicRnMetadata(PscMetadataClient metadataClient, TopicUri clusterUri) { + static Map getAllTopicUriMetadata(PscMetadataClient metadataClient, TopicUri clusterUri) { try { List allTopicRns = metadataClient.listTopicRns(clusterUri, Duration.ofMillis(Long.MAX_VALUE)); - return getTopicRnMetadata(metadataClient, clusterUri, allTopicRns); + return getTopicUriMetadata(metadataClient, clusterUri, allTopicRns.stream().map(rn -> new BaseTopicUri(clusterUri.getProtocol(), rn)).collect(Collectors.toList())); } catch (Exception e) { throw new RuntimeException("Failed to get metadata for all topics.", e); } } - static Map getTopicRnMetadata( - PscMetadataClient metadataClient, TopicUri clusterUri, List topicRns) { + static Map getTopicUriMetadata( + PscMetadataClient metadataClient, TopicUri clusterUri, List topicUris) { try { - return metadataClient.describeTopicRns(clusterUri, new HashSet<>(topicRns), Duration.ofMillis(Long.MAX_VALUE)); + return metadataClient.describeTopicUris(clusterUri, new HashSet<>(topicUris), Duration.ofMillis(Long.MAX_VALUE)); } catch (Exception e) { throw new RuntimeException( - String.format("Failed to get metadata for topicRns %s.", topicRns), e); + String.format("Failed to get metadata for topicUris %s.", topicUris), e); } } } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java index d9ef48b..edd4c27 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscTopicUriListSubscriber.java @@ -23,7 +23,7 @@ import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.exception.startup.TopicUriSyntaxException; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscMetadataClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +34,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicRnMetadata; +import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicUriMetadata; /** * A subscriber to a fixed list of topics. The subscribed topics must have existed in the PSC @@ -43,26 +43,34 @@ class PscTopicUriListSubscriber implements PscSubscriber { private static final long serialVersionUID = -6917603843104947866L; private static final Logger LOG = LoggerFactory.getLogger(PscTopicUriListSubscriber.class); - private final List topicRns; + private final List topicUris; PscTopicUriListSubscriber(List topicUris) { - this.topicRns = topicUris.stream().map(topicUri -> { + this.topicUris = topicUris.stream().map(topicUri -> { try { - return BaseTopicUri.validate(topicUri).getTopicRn(); + return BaseTopicUri.validate(topicUri); } catch (TopicUriSyntaxException e) { throw new RuntimeException(e); } }).collect(Collectors.toList()); } + /** + * Get a set of subscribed {@link TopicUriPartition}s. This method will preserve the protocol of the + * supplied topicUris. + * + * @param metadataClient The admin client used to retrieve subscribed topic partitions. + * @param clusterUri The cluster URI to subscribe to. + * @return A set of subscribed {@link TopicUriPartition}s + */ @Override public Set getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) { - LOG.debug("Fetching descriptions for topicRns: {}", topicRns); - final Map topicMetadata = - getTopicRnMetadata(metadataClient, clusterUri, topicRns); + LOG.debug("Fetching descriptions for topicUris: {}", topicUris); + final Map topicMetadata = + getTopicUriMetadata(metadataClient, clusterUri, topicUris); Set subscribedPartitions = new HashSet<>(); - for (TopicRnMetadata topic : topicMetadata.values()) { + for (TopicUriMetadata topic : topicMetadata.values()) { subscribedPartitions.addAll(topic.getTopicUriPartitions()); } diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java index fe0086d..4b5cfc5 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicNamePatternSubscriber.java @@ -21,7 +21,7 @@ import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscMetadataClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +31,7 @@ import java.util.Set; import java.util.regex.Pattern; -import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getAllTopicRnMetadata; +import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getAllTopicUriMetadata; /** * A subscriber to a topic name pattern. Note that this pattern should match only the topic name itself. The pattern @@ -46,10 +46,18 @@ class TopicNamePatternSubscriber implements PscSubscriber { this.topicNamePattern = topicNamePattern; } + /** + * Get a set of subscribed {@link TopicUriPartition}s. This method will return a set of TopicUriPartitions whose + * protocols match the clusterUri's protocol. + * + * @param metadataClient The admin client used to retrieve subscribed topic partitions. + * @param clusterUri The cluster URI to subscribe to. + * @return A set of subscribed {@link TopicUriPartition}s + */ @Override public Set getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) { LOG.debug("Fetching descriptions for all topics on PubSub cluster"); - final Map allTopicRnMetadata = getAllTopicRnMetadata(metadataClient, clusterUri); + final Map allTopicRnMetadata = getAllTopicUriMetadata(metadataClient, clusterUri); Set subscribedTopicUriPartitions = new HashSet<>(); diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicUriPartitionSetSubscriber.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicUriPartitionSetSubscriber.java index 7958cd9..abd6df7 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicUriPartitionSetSubscriber.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/TopicUriPartitionSetSubscriber.java @@ -21,7 +21,7 @@ import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscMetadataClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,7 @@ import java.util.Set; import java.util.stream.Collectors; -import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicRnMetadata; +import static com.pinterest.flink.connector.psc.source.enumerator.subscriber.PscSubscriberUtils.getTopicUriMetadata; /** A subscriber for a partition set. */ class TopicUriPartitionSetSubscriber implements PscSubscriber { @@ -46,22 +46,21 @@ class TopicUriPartitionSetSubscriber implements PscSubscriber { @Override public Set getSubscribedTopicUriPartitions(PscMetadataClient metadataClient, TopicUri clusterUri) { - final List topicRns = + final List topicUris = subscribedPartitions.stream() .map(TopicUriPartition::getTopicUri) - .map(TopicUri::getTopicRn) .collect(Collectors.toList()); - LOG.debug("Fetching descriptions for topics: {}", topicRns); - final Map topicRnMetadata = - getTopicRnMetadata(metadataClient, clusterUri, topicRns); + LOG.debug("Fetching descriptions for topics: {}", topicUris); + final Map topicUriMetadata = + getTopicUriMetadata(metadataClient, clusterUri, topicUris); Set existingSubscribedPartitions = new HashSet<>(); for (TopicUriPartition subscribedPartition : this.subscribedPartitions) { - if (topicRnMetadata.containsKey(subscribedPartition.getTopicUri().getTopicRn()) + if (topicUriMetadata.containsKey(subscribedPartition.getTopicUri()) && partitionExistsInTopic( - subscribedPartition, topicRnMetadata.get(subscribedPartition.getTopicUri().getTopicRn()))) { + subscribedPartition, topicUriMetadata.get(subscribedPartition.getTopicUri()))) { existingSubscribedPartitions.add(subscribedPartition); } else { throw new RuntimeException( @@ -74,7 +73,7 @@ && partitionExistsInTopic( return existingSubscribedPartitions; } - private boolean partitionExistsInTopic(TopicUriPartition partition, TopicRnMetadata topicRnMetadata) { - return topicRnMetadata.getTopicUriPartitions().size() > partition.getPartition(); + private boolean partitionExistsInTopic(TopicUriPartition partition, TopicUriMetadata topicUriMetadata) { + return topicUriMetadata.getTopicUriPartitions().size() > partition.getPartition(); } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberTest.java index 7b1be48..0e03c05 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/enumerator/subscriber/PscSubscriberTest.java @@ -47,11 +47,13 @@ public class PscSubscriberTest { private static final String TOPIC_URI1 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC1; private static final String TOPIC2 = "pattern-topic"; private static final String TOPIC_URI2 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC2; + private static final String TOPIC_URI1_SECURE = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX.replace("plaintext:/", "secure:/") + TOPIC1; private static final TopicUriPartition NON_EXISTING_TOPIC_URI_PARTITION = new TopicUriPartition( PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "removed", 0); private static AdminClient adminClient; private static PscMetadataClient pscMetadataClient; + private static PscMetadataClient pscMetadataClientSecure; @BeforeClass public static void setup() throws Throwable { @@ -60,6 +62,7 @@ public static void setup() throws Throwable { PscSourceTestEnv.createTestTopic(TOPIC_URI2); adminClient = PscSourceTestEnv.getAdminClient(); pscMetadataClient = PscSourceTestEnv.getMetadataClient(); + pscMetadataClientSecure = PscSourceTestEnv.getSecureMetadataClient(); } @AfterClass @@ -82,6 +85,28 @@ public void testTopicUriListSubscriber() { assertEquals(expectedSubscribedPartitions, subscribedPartitions); } + @Test + public void testTopicUriListSubscriberPreservesProtocol() { + PscSubscriber subscriber = + PscSubscriber.getTopicUriListSubscriber(Arrays.asList(TOPIC_URI1_SECURE, TOPIC_URI2)); + final Set subscribedPartitionsPlaintext = + subscriber.getSubscribedTopicUriPartitions(pscMetadataClient, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI); + final Set subscribedPartitionsSecure = + subscriber.getSubscribedTopicUriPartitions(pscMetadataClientSecure, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI_SECURE); + + subscribedPartitionsPlaintext.forEach(tup -> { + if (tup.getTopicUri().getTopic().equals(TOPIC1)) { + assertEquals(TOPIC_URI1_SECURE, tup.getTopicUri().getTopicUriAsString()); + } else if (tup.getTopicUri().getTopic().equals(TOPIC2)) { + assertEquals(TOPIC_URI2, tup.getTopicUri().getTopicUriAsString()); + } else { + throw new RuntimeException("Unexpected topic: " + tup.getTopicUri().getTopic()); + } + }); + + assertEquals(subscribedPartitionsPlaintext, subscribedPartitionsSecure); + } + @Test public void testNonExistingTopic() { final PscSubscriber subscriber = diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java index 6e6bc7b..99e2172 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java @@ -92,6 +92,13 @@ public static PscMetadataClient getMetadataClient() throws ConfigurationExceptio return new PscMetadataClient(PscConfigurationUtils.propertiesToPscConfiguration(props)); } + public static PscMetadataClient getSecureMetadataClient() throws ConfigurationException { + Properties props = new Properties(); + props.setProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "psc-source-test-env-metadata-client-secure"); + putDiscoveryProperties(props, brokerConnectionStrings, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_SECURE_PREFIX); + return new PscMetadataClient(PscConfigurationUtils.propertiesToPscConfiguration(props)); + } + public static PscConsumer getConsumer() throws ConfigurationException, ConsumerException { Properties props = new Properties(); props.putAll(standardPscConsumerConfiguration); diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java index 1fe75af..a8ef1c3 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java @@ -51,11 +51,14 @@ */ public abstract class PscTestEnvironmentWithKafkaAsPubSub { public static String PSC_TEST_TOPIC_URI_PREFIX = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:cloud_region1::cluster1:"; + public static String PSC_TEST_TOPIC_URI_SECURE_PREFIX = "secure:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:cloud_region1::cluster1:"; public static TopicUri PSC_TEST_CLUSTER_URI; + public static TopicUri PSC_TEST_CLUSTER_URI_SECURE; static { try { PSC_TEST_CLUSTER_URI = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_PREFIX); + PSC_TEST_CLUSTER_URI_SECURE = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_SECURE_PREFIX); } catch (TopicUriSyntaxException e) { throw new RuntimeException("Unable to validate clusterUri", e); } diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 231ac96..d841ebb 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -1,7 +1,6 @@ package com.pinterest.psc.metadata.client; import com.pinterest.psc.common.BaseTopicUri; -import com.pinterest.psc.common.MessageId; import com.pinterest.psc.common.TestUtils; import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; @@ -16,7 +15,7 @@ import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.integration.KafkaCluster; import com.pinterest.psc.metadata.MetadataUtils; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; import com.pinterest.psc.serde.IntegerDeserializer; @@ -135,38 +134,38 @@ public void testListTopicRns() throws Exception { } /** - * Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, Duration)} returns the correct + * Tests that {@link PscMetadataClient#describeTopicUris(TopicUri, Collection, Duration)} returns the correct * metadata for the supplied topic RNs * * @throws Exception */ @Test - public void testDescribeTopicRns() throws Exception { + public void testDescribeTopicUris() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); - Map topicRnDescriptionMap = client.describeTopicRns( + Map topicUriDescriptionMap = client.describeTopicUris( BaseTopicUri.validate(clusterUriStr), - new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)), + new HashSet<>(Arrays.asList(topic1Uri, topic2Uri, topic3Uri)), Duration.ofMillis(10000) ); - assertEquals(3, topicRnDescriptionMap.size()); + assertEquals(3, topicUriDescriptionMap.size()); - assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicRn()); - assertEquals(partitions1, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().size()); + assertEquals(topic1Uri, topicUriDescriptionMap.get(topic1Uri).getTopicUri()); + assertEquals(partitions1, topicUriDescriptionMap.get(topic1Uri).getTopicUriPartitions().size()); for (int i = 0; i < partitions1; i++) { - assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); - assertEquals(i, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getPartition()); + assertEquals(topic1Uri, topicUriDescriptionMap.get(topic1Uri).getTopicUriPartitions().get(i).getTopicUri()); + assertEquals(i, topicUriDescriptionMap.get(topic1Uri).getTopicUriPartitions().get(i).getPartition()); } - assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicRn()); - assertEquals(partitions2, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().size()); + assertEquals(topic2Uri, topicUriDescriptionMap.get(topic2Uri).getTopicUri()); + assertEquals(partitions2, topicUriDescriptionMap.get(topic2Uri).getTopicUriPartitions().size()); for (int i = 0; i < partitions2; i++) { - assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); - assertEquals(i, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getPartition()); + assertEquals(topic2Uri, topicUriDescriptionMap.get(topic2Uri).getTopicUriPartitions().get(i).getTopicUri()); + assertEquals(i, topicUriDescriptionMap.get(topic2Uri).getTopicUriPartitions().get(i).getPartition()); } - assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicRn()); - assertEquals(partitions3, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().size()); + assertEquals(topic3Uri, topicUriDescriptionMap.get(topic3Uri).getTopicUri()); + assertEquals(partitions3, topicUriDescriptionMap.get(topic3Uri).getTopicUriPartitions().size()); for (int i = 0; i < partitions3; i++) { - assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); - assertEquals(i, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getPartition()); + assertEquals(topic3Uri, topicUriDescriptionMap.get(topic3Uri).getTopicUriPartitions().get(i).getTopicUri()); + assertEquals(i, topicUriDescriptionMap.get(topic3Uri).getTopicUriPartitions().get(i).getPartition()); } client.close(); } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java index 2cf9a23..a94aec9 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java @@ -1,8 +1,10 @@ package com.pinterest.psc.metadata; import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; /** * Utility class for common metadata logic @@ -24,4 +26,12 @@ public static TopicRn createTopicRn(TopicUri topicUri, String topicName) { topicName ); } + + public static TopicUri createTopicUri(String topic, TopicRn clusterRn, String protocol) { + try { + return BaseTopicUri.validate(protocol + ":" + TopicUri.SEPARATOR + clusterRn.getTopicRnPrefixString() + topic); + } catch (TopicUriSyntaxException e) { + throw new RuntimeException("Failed to create topic URI for topic " + topic, e); + } + } } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java b/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java deleted file mode 100644 index c818264..0000000 --- a/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.pinterest.psc.metadata; - -import com.pinterest.psc.common.TopicRn; -import com.pinterest.psc.common.TopicUriPartition; - -import java.util.List; - -/** - * Metadata for a {@link TopicRn}, including the list of its partitions - */ -public class TopicRnMetadata { - - private final TopicRn topicRn; - private final List topicUriPartitions; - - public TopicRnMetadata(TopicRn topicRn, List topicUriPartitions) { - this.topicRn = topicRn; - this.topicUriPartitions = topicUriPartitions; - } - - public TopicRn getTopicRn() { - return topicRn; - } - - public List getTopicUriPartitions() { - return topicUriPartitions; - } -} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/TopicUriMetadata.java b/psc/src/main/java/com/pinterest/psc/metadata/TopicUriMetadata.java new file mode 100644 index 0000000..0e6154f --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/TopicUriMetadata.java @@ -0,0 +1,28 @@ +package com.pinterest.psc.metadata; + +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; + +import java.util.List; + +/** + * Metadata for a {@link TopicUri}, including the list of its partitions + */ +public class TopicUriMetadata { + + private final TopicUri topicUri; + private final List topicUriPartitions; + + public TopicUriMetadata(TopicUri topicUri, List topicUriPartitions) { + this.topicUri = topicUri; + this.topicUriPartitions = topicUriPartitions; + } + + public TopicUri getTopicUri() { + return topicUri; + } + + public List getTopicUriPartitions() { + return topicUriPartitions; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java index 684ab03..5059f05 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java @@ -10,7 +10,7 @@ import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.logging.PscLogger; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import java.io.IOException; import java.time.Duration; @@ -43,8 +43,8 @@ public void initialize(TopicUri topicUri, Environment env, PscConfigurationInter public abstract List listTopicRns(Duration duration) throws ExecutionException, InterruptedException, TimeoutException; - public abstract Map describeTopicRns( - Collection topicRns, + public abstract Map describeTopicUris( + Collection topicUris, Duration duration ) throws ExecutionException, InterruptedException, TimeoutException; diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java index a0af4a1..0c4964a 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -8,22 +8,17 @@ import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.environment.Environment; -import com.pinterest.psc.exception.ExceptionMessage; -import com.pinterest.psc.exception.producer.ProducerException; import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.exception.startup.PscStartupException; import com.pinterest.psc.exception.startup.TopicUriSyntaxException; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; -import com.pinterest.psc.producer.creation.PscBackendProducerCreator; import java.io.IOException; import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; @@ -86,19 +81,20 @@ public List listTopicRns(TopicUri clusterUri, Duration duration) throws } /** - * Describe the metadata for the given {@link TopicRn}'s in the cluster. + * Describe the metadata for the given {@link TopicUri}'s in the cluster. The returned map will preserve the protocol + * of the supplied TopicUri's. * * @param clusterUri - * @param topicRns + * @param topicUris * @param duration - * @return a map of {@link TopicRn} to {@link TopicRnMetadata} + * @return a map of {@link TopicUri} to {@link TopicUriMetadata} * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException */ - public Map describeTopicRns(TopicUri clusterUri, Set topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException { + public Map describeTopicUris(TopicUri clusterUri, Collection topicUris, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.describeTopicRns(topicRns, duration); + return backendMetadataClient.describeTopicUris(topicUris, duration); } /** diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java index 59ca9ea..3d9702b 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -11,7 +11,7 @@ import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.logging.PscLogger; import com.pinterest.psc.metadata.MetadataUtils; -import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.TopicUriMetadata; import com.pinterest.psc.metadata.client.PscBackendMetadataClient; import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.kafka.clients.admin.AdminClient; @@ -70,24 +70,33 @@ public List listTopicRns(Duration duration) } @Override - public Map describeTopicRns( - Collection topicRns, + public Map describeTopicUris( + Collection topicUris, Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { - Collection topicNames = topicRns.stream().map(TopicRn::getTopic).collect(Collectors.toSet()); + Collection topicNames = topicUris.stream().map(TopicUri::getTopic).collect(Collectors.toSet()); + if (topicNames.size() != topicUris.size()) { + throw new IllegalArgumentException("Supplied topicUris must have unique topic names: " + topicUris); + } + Map topicToProtocolMap = topicUris.stream().collect(Collectors.toMap(TopicUri::getTopic, TopicUri::getProtocol)); Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(duration.toMillis(), TimeUnit.MILLISECONDS); - Map result = new HashMap<>(); + Map result = new HashMap<>(); for (Map.Entry entry : topicMetadata.entrySet()) { String topicName = entry.getKey(); TopicDescription description = entry.getValue(); - TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topicName); + String protocol = topicToProtocolMap.get(topicName); + if (protocol == null) { + // we don't expect this to happen + throw new RuntimeException("No protocol found for topic: " + topicName); + } + TopicUri topicUri = MetadataUtils.createTopicUri(topicName, this.topicUri.getTopicRn(), protocol); List topicUriPartitions = new ArrayList<>(); for (TopicPartitionInfo partitionInfo : description.partitions()) { topicUriPartitions.add( - createKafkaTopicUriPartition(topicRn, partitionInfo.partition()) + createKafkaTopicUriPartition(topicUri, partitionInfo.partition()) ); } - result.put(topicRn, new TopicRnMetadata(topicRn, topicUriPartitions)); + result.put(topicUri, new TopicUriMetadata(topicUri, topicUriPartitions)); } return result; } @@ -180,6 +189,9 @@ private TopicUriPartition createKafkaTopicUriPartition(TopicRn topicRn, int part return new TopicUriPartition(new KafkaTopicUri(new BaseTopicUri(topicUri.getProtocol(), topicRn)), partition); } + private TopicUriPartition createKafkaTopicUriPartition(TopicUri topicUri, int partition) { + return new TopicUriPartition(new KafkaTopicUri(topicUri), partition); + } @Override public void close() throws IOException { if (kafkaAdminClient != null)