From 46054b59f0d7b101f4c79d04ff0572c5e8be39ec Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Tue, 29 Oct 2024 16:50:12 -0400 Subject: [PATCH] Make metadataClient always convert to plaintext protocol --- .../psc/metadata/client/PscMetadataClient.java | 9 +++------ .../creation/PscKafkaMetadataClientCreator.java | 8 +++++++- .../metadata/client/TestPscMetadataClient.java | 15 +++++++++++++++ 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java index a0af4a1..cba240c 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -8,15 +8,11 @@ import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.environment.Environment; -import com.pinterest.psc.exception.ExceptionMessage; -import com.pinterest.psc.exception.producer.ProducerException; import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.exception.startup.PscStartupException; import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.metadata.TopicRnMetadata; import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; -import com.pinterest.psc.producer.creation.PscBackendProducerCreator; import java.io.IOException; import java.time.Duration; @@ -149,7 +145,7 @@ public Map listOffsetsForTimestamps(TopicUri clusterUri @VisibleForTesting protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) throws TopicUriSyntaxException { - TopicUri convertedClusterUri = validateTopicUri(clusterUri); + TopicUri convertedClusterUri = convertTopicUri(clusterUri); String topicUriPrefix = convertedClusterUri.getTopicUriPrefix(); pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> { PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(convertedClusterUri.getBackend()); @@ -162,7 +158,8 @@ protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix); } - private TopicUri validateTopicUri(TopicUri topicUri) throws TopicUriSyntaxException { + @VisibleForTesting + protected TopicUri convertTopicUri(TopicUri topicUri) throws TopicUriSyntaxException { if (topicUri == null) throw new IllegalArgumentException("Null topic URI was passed to the producer API."); diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java index 774e534..b9d7516 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java @@ -1,5 +1,6 @@ package com.pinterest.psc.metadata.creation; +import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.PscUtils; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.kafka.KafkaTopicUri; @@ -28,6 +29,11 @@ public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal p @Override public TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException { - return KafkaTopicUri.validate(topicUri); + String topicUriStr = topicUri.getTopicUriAsString(); + if (topicUri.getProtocol().equals(KafkaTopicUri.SECURE_PROTOCOL)) { + // always use PLAINTEXT for metadata requests + topicUriStr = topicUriStr.replace(KafkaTopicUri.SECURE_PROTOCOL + ":" + TopicUri.SEPARATOR, KafkaTopicUri.PLAINTEXT_PROTOCOL + ":" + TopicUri.SEPARATOR); + } + return KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr)); } } diff --git a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 802bae0..0e8cc32 100644 --- a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -1,7 +1,9 @@ package com.pinterest.psc.metadata.client; +import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.kafka.KafkaTopicUri; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.discovery.DiscoveryUtil; import com.pinterest.psc.exception.startup.ConfigurationException; @@ -21,6 +23,7 @@ public class TestPscMetadataClient { protected static final String testKafkaTopic1 = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1"; + protected static final String testKafkaTopicSecure = "secure:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1"; /** * Ensure that {@link TopicRn} creation is correct, and that equality is implemented correctly @@ -44,4 +47,16 @@ void testCreateTopicRn() throws TopicUriSyntaxException { assertEquals(topic1Rn.getCluster(), topic2Rn.getCluster()); assertEquals("topic2", topic2Rn.getTopic()); } + + @Test + void testConvertTopicUri() throws ConfigurationException, TopicUriSyntaxException, IOException { + PscConfiguration config = new PscConfiguration(); + config.setProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "test"); + PscMetadataClient metadataClient = new PscMetadataClient(config); + TopicUri converted = metadataClient.convertTopicUri(BaseTopicUri.validate(testKafkaTopicSecure)); + assertEquals(testKafkaTopic1, converted.getTopicUriAsString()); + assertTrue(converted.getProtocol().equals(KafkaTopicUri.PLAINTEXT_PROTOCOL)); + assertTrue(converted instanceof KafkaTopicUri); + metadataClient.close(); + } }