From 961db589ae52b04360c1f84e5f4f1eb9c886aef8 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 28 Aug 2024 18:37:08 -0400 Subject: [PATCH] WIP metadataClient API impl; finished listOffsets --- .../client/TestPscMetadataClient.java | 229 ++++++++++++++++++ .../com/pinterest/psc/common/TopicRn.java | 30 +++ .../psc/common/TopicUriPartition.java | 2 +- .../psc/config/PscConfiguration.java | 6 + .../psc/config/PscConfigurationInternal.java | 24 +- .../pinterest/psc/metadata/MetadataUtils.java | 24 ++ .../metadata/PscBackendMetadataClient.java | 53 ---- .../psc/metadata/PscMetadataClient.java | 61 ----- .../psc/metadata/TopicRnMetadata.java | 25 ++ .../client/PscBackendMetadataClient.java | 52 ++++ .../metadata/client/PscMetadataClient.java | 84 +++++++ .../client/kafka/PscKafkaMetadataClient.java | 123 ++++++++++ .../PscBackendMetadataClientCreator.java | 4 +- .../PscKafkaMetadataClientCreator.java | 9 +- .../kafka/PscKafkaMetadataClient.java | 42 ---- .../{ => client}/TestPscMetadataClient.java | 22 +- 16 files changed, 615 insertions(+), 175 deletions(-) create mode 100644 psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java delete mode 100644 psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java delete mode 100644 psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java delete mode 100644 psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java rename psc/src/test/java/com/pinterest/psc/metadata/{ => client}/TestPscMetadataClient.java (70%) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java new file mode 100644 index 0000000..7bb1c5b --- /dev/null +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -0,0 +1,229 @@ +package com.pinterest.psc.metadata.client; + +import com.pinterest.psc.common.BaseTopicUri; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.TestUtils; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.kafka.KafkaTopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.exception.producer.ProducerException; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; +import com.pinterest.psc.integration.KafkaCluster; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.producer.PscProducer; +import com.pinterest.psc.producer.PscProducerMessage; +import com.pinterest.psc.serde.IntegerSerializer; +import com.pinterest.psc.utils.PscTestUtils; +import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; +import com.salesforce.kafka.test.listeners.PlainListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class TestPscMetadataClient { + + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource1 = new SharedKafkaTestResource() + .withBrokers(1).registerListener(new PlainListener().onPorts(9092)); + private static final PscConfiguration metadataClientConfiguration = new PscConfiguration(); + private static String clientId; + private static final String topic1 = "topic1"; + private static final int partitions1 = 12; + private static final String topic2 = "topic2"; + private static final int partitions2 = 24; + private static final String topic3 = "topic3"; + private static final int partitions3 = 36; + private KafkaCluster kafkaCluster1; + private String topicUriStr1, topicUriStr2, topicUriStr3, clusterUriStr; + private TopicRn topic1Rn, topic2Rn, topic3Rn; + + /** + * Initializes two Kafka clusters that are commonly used by all tests, and creates a single topic on each. + * + * @throws IOException + */ + @BeforeEach + public void setup() throws IOException, InterruptedException, TopicUriSyntaxException { + clientId = this.getClass().getSimpleName() + "-psc-metadata-client"; + metadataClientConfiguration.clear(); + metadataClientConfiguration.setProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, clientId); + metadataClientConfiguration.setProperty(PscConfiguration.PSC_METRICS_REPORTER_CLASS, TestUtils.DEFAULT_METRICS_REPORTER); + metadataClientConfiguration.setProperty(PscConfiguration.PSC_CONFIG_LOGGING_ENABLED, "false"); + kafkaCluster1 = new KafkaCluster("plaintext", "region", "cluster", 9092); + topicUriStr1 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic1); + + topicUriStr2 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic2); + + topicUriStr3 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic3); + + clusterUriStr = String.format("%s:%s%s:kafka:env:cloud_%s::%s:", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster() + ); + + PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic1, partitions1); + PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic2, partitions2); + PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic3, partitions3); + + topic1Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), topic1); + topic2Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), topic2); + topic3Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr3)), topic3); + } + + /** + * Deleted the topics that are created by default. Also, adds a slight delay to make sure cleanup is complete + * when tests run consecutively. + * + * @throws ExecutionException + * @throws InterruptedException + */ + @AfterEach + public void tearDown() throws ExecutionException, InterruptedException { + PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic1); + PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic2); + PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic3); + Thread.sleep(1000); + } + + @Test + public void testListTopicRns() throws Exception { + PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); + List topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), 10000, TimeUnit.MILLISECONDS); + List expectedTopicRnList = Arrays.asList(topic1Rn, topic2Rn, topic3Rn); + assertEquals(expectedTopicRnList, topicRnList); + client.close(); + } + + @Test + public void testDescribeTopicRns() throws Exception { + PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); + Map topicRnDescriptionMap = client.describeTopicRns( + KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), + new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)), + 10000, + TimeUnit.MILLISECONDS + ); + assertEquals(3, topicRnDescriptionMap.size()); + + assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicRn()); + assertEquals(partitions1, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().size()); + for (int i = 0; i < partitions1; i++) { + assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); + assertEquals(i, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getPartition()); + } + assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicRn()); + assertEquals(partitions2, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().size()); + for (int i = 0; i < partitions2; i++) { + assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); + assertEquals(i, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getPartition()); + } + assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicRn()); + assertEquals(partitions3, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().size()); + for (int i = 0; i < partitions3; i++) { + assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); + assertEquals(i, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getPartition()); + } + client.close(); + } + + @Test + public void testListOffsets() throws Exception { + PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); + Map topicUriPartitionsAndOptions = new HashMap<>(); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); + Map offsets = client.listOffsets( + clusterUri, + topicUriPartitionsAndOptions, + 10000, + TimeUnit.MILLISECONDS + ); + assertEquals(2, offsets.size()); + + // ensure that the offsets are 0 when the topic is empty + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); + + // send one message to partition 0 for both topics + PscProducer pscProducer = getPscProducer(); + pscProducer.send(new PscProducerMessage(topicUriStr1, 0,0,0)); + pscProducer.send(new PscProducerMessage(topicUriStr2, 0,0,0)); + pscProducer.flush(); + + offsets = client.listOffsets( + clusterUri, + topicUriPartitionsAndOptions, + 10000, + TimeUnit.MILLISECONDS + ); + + // ensure sent offsets are captured in next metadataClient call + assertEquals(2, offsets.size()); + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); // earliest offset + assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); // latest offset + + // now change the spec to latest for both topics, and add topic1 partitions 5 and 10 to the query + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + + // send 2 messages to topic1 partition 0 - now the latest offset should be 3 + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0, 0, 0)); + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0, 0, 0)); + // send 1 message to topic1 partition 5 - now the latest offset should be 1 + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 5, 0, 0)); + // send 1 message to topic2 partition 0 - now the latest offset should be 2 + pscProducer.send(new PscProducerMessage<>(topicUriStr2, 0, 0, 0)); + pscProducer.flush(); + + offsets = client.listOffsets( + clusterUri, + topicUriPartitionsAndOptions, + 10000, + TimeUnit.MILLISECONDS + ); + + // ensure sent offsets are captured in next metadataClient call + assertEquals(4, offsets.size()); + assertEquals(3, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); + assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5)).getOffset()); + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10)).getOffset()); + assertEquals(2, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); + + client.close(); + pscProducer.close(); + } + + private static PscProducer getPscProducer() throws ConfigurationException, ProducerException { + PscConfiguration producerConfiguration = new PscConfiguration(); + String baseProducerId = "psc-producer-client"; + producerConfiguration.setProperty(PscConfiguration.PSC_METRICS_REPORTER_CLASS, TestUtils.DEFAULT_METRICS_REPORTER); + producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID, baseProducerId + "-" + UUID.randomUUID()); + producerConfiguration.setProperty(PscConfiguration.PSC_CONFIG_LOGGING_ENABLED, "false"); + producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, IntegerSerializer.class.getName()); + producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, IntegerSerializer.class.getName()); + + return new PscProducer<>(producerConfiguration); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java index af4a084..f2ff3d2 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java @@ -4,6 +4,7 @@ import com.pinterest.psc.logging.PscLogger; import java.io.IOException; +import java.util.Objects; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -107,6 +108,10 @@ public TopicRn( ); } + public String getTopicRnString() { + return topicRnString; + } + public String getTopicRnPrefixString() { return topicRnPrefixString; } @@ -171,6 +176,31 @@ private static String upgradeTopicRnToCurrentVersion(String topicRnAsStr, byte s throw new TopicRnSyntaxException(String.format("Unsupported topic RN version %d", serializedVersion)); } + public void setTopic(String topic) { + this.topic = topic; + if (this.topicRnString.endsWith(":")) { + this.topicRnString = this.topicRnString + topic; + } else { + this.topicRnString = this.topicRnString + ":" + topic; + } + } + + @Override + public int hashCode() { + return Objects.hash( + topicRnString, + topicRnPrefixString, + standard, + service, + environment, + cloud, + region, + classifier, + cluster, + topic + ); + } + @Override public boolean equals(Object other) { if (this == other) { diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java b/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java index 969a714..0d5fc38 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java @@ -42,7 +42,7 @@ public TopicUriPartition(String topicUriStr, int partition) { } } - protected TopicUriPartition(TopicUri topicUri, int partition) { + public TopicUriPartition(TopicUri topicUri, int partition) { this.backendTopicUri = topicUri; this.topicUriStr = topicUri.getTopicUriAsString(); this.partition = partition; diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java index 62e5e88..03aaff0 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java @@ -481,6 +481,12 @@ public class PscConfiguration extends PropertiesConfiguration { public static final String PSC_PRODUCER_SSL_TRUSTSTORE_TYPE = PSC_PRODUCER + "." + SSL_TRUSTSTORE_TYPE; */ + // ********************** + // MetadataClient Configuration + // ********************** + + protected static final String PSC_METADATA = "psc.metadata"; + public static final String PSC_METADATA_CLIENT_ID = PSC_METADATA + "." + CLIENT_ID; // ********************** // Metrics Configuration diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java index 929e37c..22891e7 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java @@ -142,7 +142,7 @@ protected void validate(boolean isLenient, boolean isLogConfiguration) throws Co validateProducerConfiguration(isLenient, isLogConfiguration); break; case PSC_CLIENT_TYPE_METADATA: - // no-op, we only need environment for metadata client + validateMetadataClientConfiguration(isLenient, isLogConfiguration); break; default: throw new ConfigurationException("Valid client type expected: " + String.join(", ", PSC_VALID_CLIENT_TYPES)); @@ -487,6 +487,24 @@ private T verifyConfigHasValue( return configuration.get(expectedType, configKey); } + private void validateMetadataClientConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException { + PscConfiguration metadataConfiguration = new PscConfiguration(); + metadataConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_METADATA)); + Map invalidConfigs = new HashMap<>(); + verifyConfigHasValue(metadataConfiguration, PscConfiguration.CLIENT_ID, String.class, invalidConfigs); + if (isLogConfiguration) + logConfiguration(); + + if (invalidConfigs.isEmpty() || isLenient) + return; + + StringBuilder stringBuilder = new StringBuilder(); + invalidConfigs.forEach((error, exception) -> + stringBuilder.append(String.format("\t%s: %s\n", error, exception == null ? "" : exception.getMessage())) + ); + throw new ConfigurationException("Invalid metadataClient configuration\n" + stringBuilder.toString()); + } + private void validateProducerConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException { PscConfiguration producerConfiguration = new PscConfiguration(); producerConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_PRODUCER)); @@ -753,4 +771,8 @@ public int getAutoResolutionRetryCount() { public MetricsReporterConfiguration getMetricsReporterConfiguration() { return metricsReporterConfiguration; } + + public String getMetadataClientId() { + return pscConfiguration.getString(PscConfiguration.PSC_METADATA_CLIENT_ID); + } } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java new file mode 100644 index 0000000..cebc229 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java @@ -0,0 +1,24 @@ +package com.pinterest.psc.metadata; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; + +public class MetadataUtils { + + @VisibleForTesting + public static TopicRn createTopicRn(TopicUri topicUri, String topicName) { + return new TopicRn( + topicUri.getTopicRn().getTopicRnPrefixString() + topicName, + topicUri.getTopicRn().getTopicRnPrefixString(), + topicUri.getTopicRn().getStandard(), + topicUri.getTopicRn().getService(), + topicUri.getTopicRn().getEnvironment(), + topicUri.getTopicRn().getCloud(), + topicUri.getTopicRn().getRegion(), + topicUri.getTopicRn().getClassifier(), + topicUri.getTopicRn().getCluster(), + topicName + ); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java deleted file mode 100644 index 63fcece..0000000 --- a/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.pinterest.psc.metadata; - -import com.google.common.annotations.VisibleForTesting; -import com.pinterest.psc.common.ServiceDiscoveryConfig; -import com.pinterest.psc.common.TopicRn; -import com.pinterest.psc.common.TopicUri; - -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public abstract class PscBackendMetadataClient implements AutoCloseable { - - private TopicUri topicUri; - private ServiceDiscoveryConfig serviceDiscoveryConfig; - - public void initialize(TopicUri topicUri, ServiceDiscoveryConfig discoveryConfig) { - this.topicUri = topicUri; - this.serviceDiscoveryConfig = discoveryConfig; - } - - public abstract List getTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; - - public abstract void close() throws Exception; - - @VisibleForTesting - protected TopicRn getTopicRn(String topicName) { - String topicRnPrefix = topicUri.getTopicRn().getTopicRnPrefixString(); - return new TopicRn( - topicRnPrefix, - topicRnPrefix, - topicUri.getTopicRn().getStandard(), - topicUri.getTopicRn().getService(), - topicUri.getTopicRn().getEnvironment(), - topicUri.getTopicRn().getCloud(), - topicUri.getTopicRn().getRegion(), - topicUri.getTopicRn().getClassifier(), - topicUri.getTopicRn().getCluster(), - topicName - ); - } - - @VisibleForTesting - protected TopicUri getTopicUri() { - return topicUri; - } - - @VisibleForTesting - protected ServiceDiscoveryConfig getServiceDiscoveryConfig() { - return serviceDiscoveryConfig; - } -} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java deleted file mode 100644 index efc734c..0000000 --- a/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.pinterest.psc.metadata; - -import com.google.common.annotations.VisibleForTesting; -import com.pinterest.psc.common.PscUtils; -import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.config.PscConfiguration; -import com.pinterest.psc.config.PscConfigurationInternal; -import com.pinterest.psc.environment.Environment; -import com.pinterest.psc.environment.EnvironmentProvider; -import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; -import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class PscMetadataClient implements AutoCloseable { - - private PscMetadataClientCreatorManager creatorManager; - private Environment environment; - private final PscConfigurationInternal pscConfigurationInternal; - private Map pscBackendMetadataClientByTopicUriPrefix = new ConcurrentHashMap<>(); - - public PscMetadataClient(PscConfiguration pscConfiguration) throws ConfigurationException { - this.pscConfigurationInternal = new PscConfigurationInternal( - pscConfiguration, - PscConfigurationInternal.PSC_CLIENT_TYPE_METADATA - ); - initialize(); - } - - private void initialize() { - creatorManager = new PscMetadataClientCreatorManager(); - environment = pscConfigurationInternal.getEnvironment(); - } - - public List getTopicUris(TopicUri topicUri) { - PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(topicUri); - return null; - } - - @VisibleForTesting - protected PscBackendMetadataClient getBackendMetadataClient(TopicUri topicUri) { - String topicUriPrefix = topicUri.getTopicUriPrefix(); - pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> { - PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(topicUri.getBackend()); - try { - return backendMetadataClientCreator.create(environment, pscConfigurationInternal, topicUri); - } catch (ConfigurationException e) { - throw new RuntimeException(e); - } - }); - return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix); - } - - @Override - public void close() throws Exception { - - } -} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java b/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java new file mode 100644 index 0000000..ab00c5f --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java @@ -0,0 +1,25 @@ +package com.pinterest.psc.metadata; + +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUriPartition; + +import java.util.List; + +public class TopicRnMetadata { + + private final TopicRn topicRn; + private final List topicUriPartitions; + + public TopicRnMetadata(TopicRn topicRn, List topicUriPartitions) { + this.topicRn = topicRn; + this.topicUriPartitions = topicUriPartitions; + } + + public TopicRn getTopicRn() { + return topicRn; + } + + public List getTopicUriPartitions() { + return topicUriPartitions; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java new file mode 100644 index 0000000..39ba99e --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java @@ -0,0 +1,52 @@ +package com.pinterest.psc.metadata.client; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.ServiceDiscoveryConfig; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.discovery.ServiceDiscoveryManager; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.metadata.TopicRnMetadata; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class PscBackendMetadataClient implements AutoCloseable { + + protected TopicUri topicUri; + protected PscConfigurationInternal pscConfigurationInternal; + protected ServiceDiscoveryConfig discoveryConfig; + + public void initialize(TopicUri topicUri, Environment env, PscConfigurationInternal pscConfigurationInternal) throws ConfigurationException { + this.topicUri = topicUri; + this.pscConfigurationInternal = pscConfigurationInternal; + this.discoveryConfig = + ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri); + } + + public abstract List listTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract Map describeTopicRns(Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract Map listOffsets(Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract void close() throws Exception; + + @VisibleForTesting + protected TopicUri getTopicUri() { + return topicUri; + } + + @VisibleForTesting + protected ServiceDiscoveryConfig getDiscoveryConfig() { + return discoveryConfig; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java new file mode 100644 index 0000000..e066acd --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -0,0 +1,84 @@ +package com.pinterest.psc.metadata.client; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; +import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class PscMetadataClient implements AutoCloseable { + private PscMetadataClientCreatorManager creatorManager; + private Environment environment; + private final PscConfigurationInternal pscConfigurationInternal; + private final Map pscBackendMetadataClientByTopicUriPrefix = new ConcurrentHashMap<>(); + + public PscMetadataClient(PscConfiguration pscConfiguration) throws ConfigurationException { + this.pscConfigurationInternal = new PscConfigurationInternal( + pscConfiguration, + PscConfigurationInternal.PSC_CLIENT_TYPE_METADATA + ); + initialize(); + } + + private void initialize() { + creatorManager = new PscMetadataClientCreatorManager(); + environment = pscConfigurationInternal.getEnvironment(); + } + + public List listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); + return backendMetadataClient.listTopicRns(timeout, timeUnit); + } + + public Map describeTopicRns(TopicUri clusterUri, Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); + return backendMetadataClient.describeTopicRns(topicRns, timeout, timeUnit); + } + + public Map listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); + return backendMetadataClient.listOffsets(topicRnsAndOptions, timeout, timeUnit); + } + + @VisibleForTesting + protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) { + String topicUriPrefix = clusterUri.getTopicUriPrefix(); + pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> { + PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(clusterUri.getBackend()); + try { + return backendMetadataClientCreator.create(environment, pscConfigurationInternal, clusterUri); + } catch (ConfigurationException e) { + throw new RuntimeException(e); + } + }); + return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix); + } + + @Override + public void close() throws Exception { + for (PscBackendMetadataClient client : pscBackendMetadataClientByTopicUriPrefix.values()) { + client.close(); + } + pscBackendMetadataClientByTopicUriPrefix.clear(); + } + + public enum MetadataClientOption { + OFFSET_SPEC_EARLIEST, + OFFSET_SPEC_LATEST + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java new file mode 100644 index 0000000..a289795 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -0,0 +1,123 @@ +package com.pinterest.psc.metadata.client.kafka; + +import com.pinterest.psc.common.BaseTopicUri; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.ServiceDiscoveryConfig; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.kafka.KafkaTopicUri; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.config.PscConfigurationUtils; +import com.pinterest.psc.discovery.ServiceDiscoveryManager; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.logging.PscLogger; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.client.PscBackendMetadataClient; +import com.pinterest.psc.metadata.client.PscMetadataClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class PscKafkaMetadataClient extends PscBackendMetadataClient { + + private static final PscLogger logger = PscLogger.getLogger(PscKafkaMetadataClient.class); + private AdminClient kafkaAdminClient; + + @Override + public void initialize(TopicUri topicUri, Environment env, PscConfigurationInternal pscConfigurationInternal) throws ConfigurationException { + super.initialize(topicUri, env, pscConfigurationInternal); + Properties properties = PscConfigurationUtils.pscConfigurationInternalToProperties(pscConfigurationInternal); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, discoveryConfig.getConnect()); + properties.put(AdminClientConfig.CLIENT_ID_CONFIG, pscConfigurationInternal.getMetadataClientId()); + this.kafkaAdminClient = AdminClient.create(properties); + logger.info("Initialized Kafka AdminClient with properties: " + properties); + } + + @Override + public List listTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); + Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); + return topicListing.stream().map(tl -> MetadataUtils.createTopicRn(topicUri, tl.name())).collect(Collectors.toList()); + } + + @Override + public Map describeTopicRns(Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + Collection topicNames = topicRns.stream().map(TopicRn::getTopic).collect(Collectors.toSet()); + Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(timeout, timeUnit); + Map result = new HashMap<>(); + for (Map.Entry entry : topicMetadata.entrySet()) { + String topicName = entry.getKey(); + TopicDescription description = entry.getValue(); + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topicName); + List topicUriPartitions = new ArrayList<>(); + for (TopicPartitionInfo partitionInfo : description.partitions()) { + topicUriPartitions.add( + createKafkaTopicUriPartition(topicRn, partitionInfo.partition()) + ); + } + result.put(topicRn, new TopicRnMetadata(topicRn, topicUriPartitions)); + } + return result; + } + + @Override + public Map listOffsets(Map topicUriPartitionsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + Map topicPartitionOffsets = new HashMap<>(); + for (Map.Entry entry : topicUriPartitionsAndOptions.entrySet()) { + TopicUriPartition topicUriPartition = entry.getKey(); + PscMetadataClient.MetadataClientOption option = entry.getValue(); + OffsetSpec offsetSpec; + if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST) + offsetSpec = OffsetSpec.earliest(); + else if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) + offsetSpec = OffsetSpec.latest(); + else + throw new IllegalArgumentException("Unsupported MetadataClientOption for listOffsets(): " + option); + topicPartitionOffsets.put(new TopicPartition(topicUriPartition.getTopicUri().getTopic(), topicUriPartition.getPartition()), offsetSpec); + } + ListOffsetsResult listOffsetsResult = kafkaAdminClient.listOffsets(topicPartitionOffsets); + Map result = new HashMap<>(); + listOffsetsResult.all().get(timeout, timeUnit).entrySet().forEach(e -> { + TopicPartition tp = e.getKey(); + ListOffsetsResult.ListOffsetsResultInfo info = e.getValue(); + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); + result.put( + createKafkaTopicUriPartition(topicRn, tp.partition()), + new MessageId(createKafkaTopicUriPartition(topicRn, tp.partition()), info.offset(), info.timestamp()) + ); + }); + return result; + } + + private TopicUriPartition createKafkaTopicUriPartition(TopicRn topicRn, int partition) { + return new TopicUriPartition(new KafkaTopicUri(new BaseTopicUri(topicUri.getProtocol(), topicRn)), partition); + } + + @Override + public void close() throws Exception { + if (kafkaAdminClient != null) + kafkaAdminClient.close(); + logger.info("Closed PscKafkaMetadataClient"); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java index 5b4b696..69be429 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java @@ -4,10 +4,10 @@ import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.metadata.PscBackendMetadataClient; +import com.pinterest.psc.metadata.client.PscBackendMetadataClient; public abstract class PscBackendMetadataClientCreator { - public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) throws ConfigurationException; + public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException; } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java index 1dd8e60..a202d4c 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java @@ -6,17 +6,18 @@ import com.pinterest.psc.discovery.ServiceDiscoveryManager; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.metadata.kafka.PscKafkaMetadataClient; +import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient; @PscMetadataClientCreatorPlugin(backend = PscUtils.BACKEND_TYPE_KAFKA) public class PscKafkaMetadataClientCreator extends PscBackendMetadataClientCreator { @Override - public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) throws ConfigurationException { + public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException { PscKafkaMetadataClient pscKafkaMetadataClient = new PscKafkaMetadataClient(); pscKafkaMetadataClient.initialize( - topicUri, - ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri) + clusterUri, + env, + pscConfigurationInternal ); return pscKafkaMetadataClient; } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java deleted file mode 100644 index 19249ee..0000000 --- a/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.pinterest.psc.metadata.kafka; - -import com.google.common.annotations.VisibleForTesting; -import com.pinterest.psc.common.ServiceDiscoveryConfig; -import com.pinterest.psc.common.TopicRn; -import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.metadata.PscBackendMetadataClient; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.ListTopicsResult; -import org.apache.kafka.clients.admin.TopicListing; - -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -public class PscKafkaMetadataClient extends PscBackendMetadataClient { - - private AdminClient kafkaAdminClient; - - public void initialize(TopicUri topicUri, ServiceDiscoveryConfig discoveryConfig) { - super.initialize(topicUri, discoveryConfig); - Properties properties = new Properties(); - properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, discoveryConfig.getConnect()); - this.kafkaAdminClient = AdminClient.create(properties); - } - - @Override - public List getTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { - ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); - Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); - return topicListing.stream().map(tl -> getTopicRn(tl.name())).collect(Collectors.toList()); - } - - @Override - public void close() throws Exception { - } -} diff --git a/psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java similarity index 70% rename from psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java rename to psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 5c0e8cd..677c7b2 100644 --- a/psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java +++ b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -1,4 +1,4 @@ -package com.pinterest.psc.metadata; +package com.pinterest.psc.metadata.client; import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; @@ -6,12 +6,14 @@ import com.pinterest.psc.discovery.DiscoveryUtil; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.exception.startup.TopicUriSyntaxException; -import com.pinterest.psc.metadata.kafka.PscKafkaMetadataClient; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient; import org.junit.jupiter.api.Test; import java.io.IOException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestPscMetadataClient { @@ -22,23 +24,20 @@ void testGetBackendMetadataClient() throws ConfigurationException, TopicUriSynta String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); PscConfiguration pscConfiguration = new PscConfiguration(); pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); + pscConfiguration.addProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "test-metadata-client"); PscMetadataClient pscMetadataClient = new PscMetadataClient(pscConfiguration); PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); assertEquals(PscKafkaMetadataClient.class, backendMetadataClient.getClass()); - assertEquals("kafkacluster01001:9092,kafkacluster01002:9092", backendMetadataClient.getServiceDiscoveryConfig().getConnect()); + assertEquals("kafkacluster01001:9092,kafkacluster01002:9092", backendMetadataClient.getDiscoveryConfig().getConnect()); } @Test - void testGetTopicRn() throws IOException, ConfigurationException, TopicUriSyntaxException { - String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); - PscConfiguration pscConfiguration = new PscConfiguration(); - pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); - - PscMetadataClient pscMetadataClient = new PscMetadataClient(pscConfiguration); - PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); + void testCreateTopicRn() throws TopicUriSyntaxException { TopicRn topic1Rn = TopicUri.validate(testKafkaTopic1).getTopicRn(); - TopicRn topic2Rn = backendMetadataClient.getTopicRn("topic2"); + TopicRn topic1RnCreated = MetadataUtils.createTopicRn(TopicUri.validate(testKafkaTopic1), "topic1"); + assertTrue(topic1Rn.equals(topic1RnCreated)); + TopicRn topic2Rn = MetadataUtils.createTopicRn(TopicUri.validate(testKafkaTopic1), "topic2"); assertEquals(topic1Rn.getStandard(), topic2Rn.getStandard()); assertEquals(topic1Rn.getService(), topic2Rn.getService()); assertEquals(topic1Rn.getEnvironment(), topic2Rn.getEnvironment()); @@ -46,5 +45,6 @@ void testGetTopicRn() throws IOException, ConfigurationException, TopicUriSyntax assertEquals(topic1Rn.getRegion(), topic2Rn.getRegion()); assertEquals(topic1Rn.getClassifier(), topic2Rn.getClassifier()); assertEquals(topic1Rn.getCluster(), topic2Rn.getCluster()); + assertEquals("topic2", topic2Rn.getTopic()); } }