Skip to content

Commit

Permalink
Make metadataClient always convert to plaintext protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 29, 2024
1 parent 0a1bded commit 46054b5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.environment.Environment;
import com.pinterest.psc.exception.ExceptionMessage;
import com.pinterest.psc.exception.producer.ProducerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.PscStartupException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator;
import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager;
import com.pinterest.psc.producer.creation.PscBackendProducerCreator;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -149,7 +145,7 @@ public Map<TopicUriPartition, Long> listOffsetsForTimestamps(TopicUri clusterUri

@VisibleForTesting
protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) throws TopicUriSyntaxException {
TopicUri convertedClusterUri = validateTopicUri(clusterUri);
TopicUri convertedClusterUri = convertTopicUri(clusterUri);
String topicUriPrefix = convertedClusterUri.getTopicUriPrefix();
pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> {
PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(convertedClusterUri.getBackend());
Expand All @@ -162,7 +158,8 @@ protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri)
return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix);
}

private TopicUri validateTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
@VisibleForTesting
protected TopicUri convertTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
if (topicUri == null)
throw new IllegalArgumentException("Null topic URI was passed to the producer API.");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pinterest.psc.metadata.creation;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.PscUtils;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.kafka.KafkaTopicUri;
Expand Down Expand Up @@ -28,6 +29,11 @@ public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal p

@Override
public TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
return KafkaTopicUri.validate(topicUri);
String topicUriStr = topicUri.getTopicUriAsString();
if (topicUri.getProtocol().equals(KafkaTopicUri.SECURE_PROTOCOL)) {
// always use PLAINTEXT for metadata requests
topicUriStr = topicUriStr.replace(KafkaTopicUri.SECURE_PROTOCOL + ":" + TopicUri.SEPARATOR, KafkaTopicUri.PLAINTEXT_PROTOCOL + ":" + TopicUri.SEPARATOR);
}
return KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr));
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.pinterest.psc.metadata.client;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.kafka.KafkaTopicUri;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.discovery.DiscoveryUtil;
import com.pinterest.psc.exception.startup.ConfigurationException;
Expand All @@ -21,6 +23,7 @@
public class TestPscMetadataClient {

protected static final String testKafkaTopic1 = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1";
protected static final String testKafkaTopicSecure = "secure:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1";

/**
* Ensure that {@link TopicRn} creation is correct, and that equality is implemented correctly
Expand All @@ -44,4 +47,16 @@ void testCreateTopicRn() throws TopicUriSyntaxException {
assertEquals(topic1Rn.getCluster(), topic2Rn.getCluster());
assertEquals("topic2", topic2Rn.getTopic());
}

@Test
void testConvertTopicUri() throws ConfigurationException, TopicUriSyntaxException, IOException {
PscConfiguration config = new PscConfiguration();
config.setProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "test");
PscMetadataClient metadataClient = new PscMetadataClient(config);
TopicUri converted = metadataClient.convertTopicUri(BaseTopicUri.validate(testKafkaTopicSecure));
assertEquals(testKafkaTopic1, converted.getTopicUriAsString());
assertTrue(converted.getProtocol().equals(KafkaTopicUri.PLAINTEXT_PROTOCOL));
assertTrue(converted instanceof KafkaTopicUri);
metadataClient.close();
}
}

0 comments on commit 46054b5

Please sign in to comment.