Skip to content

Commit

Permalink
Revert "Make metadataClient always convert to plaintext protocol"
Browse files Browse the repository at this point in the history
This reverts commit 46054b5.
  • Loading branch information
jeffxiang committed Oct 30, 2024
1 parent e982187 commit 560ebf4
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.environment.Environment;
import com.pinterest.psc.exception.ExceptionMessage;
import com.pinterest.psc.exception.producer.ProducerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.PscStartupException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator;
import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager;
import com.pinterest.psc.producer.creation.PscBackendProducerCreator;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -145,7 +149,7 @@ public Map<TopicUriPartition, Long> listOffsetsForTimestamps(TopicUri clusterUri

@VisibleForTesting
protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) throws TopicUriSyntaxException {
TopicUri convertedClusterUri = convertTopicUri(clusterUri);
TopicUri convertedClusterUri = validateTopicUri(clusterUri);
String topicUriPrefix = convertedClusterUri.getTopicUriPrefix();
pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> {
PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(convertedClusterUri.getBackend());
Expand All @@ -158,8 +162,7 @@ protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri)
return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix);
}

@VisibleForTesting
protected TopicUri convertTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
private TopicUri validateTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
if (topicUri == null)
throw new IllegalArgumentException("Null topic URI was passed to the producer API.");

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.pinterest.psc.metadata.creation;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.PscUtils;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.kafka.KafkaTopicUri;
Expand Down Expand Up @@ -33,11 +32,6 @@ public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal p

@Override
public TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
String topicUriStr = topicUri.getTopicUriAsString();
if (topicUri.getProtocol().equals(KafkaTopicUri.SECURE_PROTOCOL)) {
// always use PLAINTEXT for metadata requests
topicUriStr = topicUriStr.replace(KafkaTopicUri.SECURE_PROTOCOL + ":" + TopicUri.SEPARATOR, KafkaTopicUri.PLAINTEXT_PROTOCOL + ":" + TopicUri.SEPARATOR);
}
return KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr));
return KafkaTopicUri.validate(topicUri);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package com.pinterest.psc.metadata.client;

import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.TopicRn;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.kafka.KafkaTopicUri;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.discovery.DiscoveryUtil;
import com.pinterest.psc.exception.startup.ConfigurationException;
Expand All @@ -23,7 +21,6 @@
public class TestPscMetadataClient {

protected static final String testKafkaTopic1 = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1";
protected static final String testKafkaTopicSecure = "secure:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1";

/**
* Ensure that {@link TopicRn} creation is correct, and that equality is implemented correctly
Expand All @@ -47,16 +44,4 @@ void testCreateTopicRn() throws TopicUriSyntaxException {
assertEquals(topic1Rn.getCluster(), topic2Rn.getCluster());
assertEquals("topic2", topic2Rn.getTopic());
}

@Test
void testConvertTopicUri() throws ConfigurationException, TopicUriSyntaxException, IOException {
PscConfiguration config = new PscConfiguration();
config.setProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "test");
PscMetadataClient metadataClient = new PscMetadataClient(config);
TopicUri converted = metadataClient.convertTopicUri(BaseTopicUri.validate(testKafkaTopicSecure));
assertEquals(testKafkaTopic1, converted.getTopicUriAsString());
assertTrue(converted.getProtocol().equals(KafkaTopicUri.PLAINTEXT_PROTOCOL));
assertTrue(converted instanceof KafkaTopicUri);
metadataClient.close();
}
}

0 comments on commit 560ebf4

Please sign in to comment.