From 1d34288b7ec178b957f44b1dd50005c8e21ed471 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Tue, 27 Aug 2024 17:10:44 -0400 Subject: [PATCH 1/7] WIP metadataClient impl --- .../com/pinterest/psc/common/TopicRn.java | 4 ++ .../psc/config/PscConfigurationInternal.java | 6 +- .../metadata/PscBackendMetadataClient.java | 53 ++++++++++++++++ .../psc/metadata/PscMetadataClient.java | 61 +++++++++++++++++++ .../PscBackendMetadataClientCreator.java | 13 ++++ .../PscKafkaMetadataClientCreator.java | 23 +++++++ .../PscMetadataClientCreatorManager.java | 47 ++++++++++++++ .../PscMetadataClientCreatorPlugin.java | 12 ++++ .../kafka/PscKafkaMetadataClient.java | 42 +++++++++++++ .../psc/metadata/TestPscMetadataClient.java | 50 +++++++++++++++ 10 files changed, 310 insertions(+), 1 deletion(-) create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorPlugin.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java create mode 100644 psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java index dfc15dd..af4a084 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java @@ -111,6 +111,10 @@ public String getTopicRnPrefixString() { return topicRnPrefixString; } + public String getStandard() { + return standard; + } + public String getService() { return service; } diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java index 7086c16..929e37c 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java @@ -37,7 +37,8 @@ public class PscConfigurationInternal { private final static String PSC_CLIENT_TYPE = "psc.client.type"; public final static String PSC_CLIENT_TYPE_CONSUMER = "consumer"; public final static String PSC_CLIENT_TYPE_PRODUCER = "producer"; - private final static String[] PSC_VALID_CLIENT_TYPES = {PSC_CLIENT_TYPE_CONSUMER, PSC_CLIENT_TYPE_PRODUCER}; + public final static String PSC_CLIENT_TYPE_METADATA = "metadata"; + private final static String[] PSC_VALID_CLIENT_TYPES = {PSC_CLIENT_TYPE_CONSUMER, PSC_CLIENT_TYPE_PRODUCER, PSC_CLIENT_TYPE_METADATA}; private PscConfiguration pscConfiguration; private Deserializer keyDeserializer, valueDeserializer; @@ -140,6 +141,9 @@ protected void validate(boolean isLenient, boolean isLogConfiguration) throws Co case PSC_CLIENT_TYPE_PRODUCER: validateProducerConfiguration(isLenient, isLogConfiguration); break; + case PSC_CLIENT_TYPE_METADATA: + // no-op, we only need environment for metadata client + break; default: throw new ConfigurationException("Valid client type expected: " + String.join(", ", PSC_VALID_CLIENT_TYPES)); } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java new file mode 100644 index 0000000..63fcece --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java @@ -0,0 +1,53 @@ +package com.pinterest.psc.metadata; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.ServiceDiscoveryConfig; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; + +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class PscBackendMetadataClient implements AutoCloseable { + + private TopicUri topicUri; + private ServiceDiscoveryConfig serviceDiscoveryConfig; + + public void initialize(TopicUri topicUri, ServiceDiscoveryConfig discoveryConfig) { + this.topicUri = topicUri; + this.serviceDiscoveryConfig = discoveryConfig; + } + + public abstract List getTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract void close() throws Exception; + + @VisibleForTesting + protected TopicRn getTopicRn(String topicName) { + String topicRnPrefix = topicUri.getTopicRn().getTopicRnPrefixString(); + return new TopicRn( + topicRnPrefix, + topicRnPrefix, + topicUri.getTopicRn().getStandard(), + topicUri.getTopicRn().getService(), + topicUri.getTopicRn().getEnvironment(), + topicUri.getTopicRn().getCloud(), + topicUri.getTopicRn().getRegion(), + topicUri.getTopicRn().getClassifier(), + topicUri.getTopicRn().getCluster(), + topicName + ); + } + + @VisibleForTesting + protected TopicUri getTopicUri() { + return topicUri; + } + + @VisibleForTesting + protected ServiceDiscoveryConfig getServiceDiscoveryConfig() { + return serviceDiscoveryConfig; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java new file mode 100644 index 0000000..efc734c --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java @@ -0,0 +1,61 @@ +package com.pinterest.psc.metadata; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.PscUtils; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.environment.EnvironmentProvider; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; +import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class PscMetadataClient implements AutoCloseable { + + private PscMetadataClientCreatorManager creatorManager; + private Environment environment; + private final PscConfigurationInternal pscConfigurationInternal; + private Map pscBackendMetadataClientByTopicUriPrefix = new ConcurrentHashMap<>(); + + public PscMetadataClient(PscConfiguration pscConfiguration) throws ConfigurationException { + this.pscConfigurationInternal = new PscConfigurationInternal( + pscConfiguration, + PscConfigurationInternal.PSC_CLIENT_TYPE_METADATA + ); + initialize(); + } + + private void initialize() { + creatorManager = new PscMetadataClientCreatorManager(); + environment = pscConfigurationInternal.getEnvironment(); + } + + public List getTopicUris(TopicUri topicUri) { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(topicUri); + return null; + } + + @VisibleForTesting + protected PscBackendMetadataClient getBackendMetadataClient(TopicUri topicUri) { + String topicUriPrefix = topicUri.getTopicUriPrefix(); + pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> { + PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(topicUri.getBackend()); + try { + return backendMetadataClientCreator.create(environment, pscConfigurationInternal, topicUri); + } catch (ConfigurationException e) { + throw new RuntimeException(e); + } + }); + return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix); + } + + @Override + public void close() throws Exception { + + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java new file mode 100644 index 0000000..5b4b696 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java @@ -0,0 +1,13 @@ +package com.pinterest.psc.metadata.creation; + +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.metadata.PscBackendMetadataClient; + +public abstract class PscBackendMetadataClientCreator { + + public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) throws ConfigurationException; + +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java new file mode 100644 index 0000000..1dd8e60 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java @@ -0,0 +1,23 @@ +package com.pinterest.psc.metadata.creation; + +import com.pinterest.psc.common.PscUtils; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.discovery.ServiceDiscoveryManager; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.metadata.kafka.PscKafkaMetadataClient; + +@PscMetadataClientCreatorPlugin(backend = PscUtils.BACKEND_TYPE_KAFKA) +public class PscKafkaMetadataClientCreator extends PscBackendMetadataClientCreator { + + @Override + public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) throws ConfigurationException { + PscKafkaMetadataClient pscKafkaMetadataClient = new PscKafkaMetadataClient(); + pscKafkaMetadataClient.initialize( + topicUri, + ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri) + ); + return pscKafkaMetadataClient; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java new file mode 100644 index 0000000..c89ed8a --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java @@ -0,0 +1,47 @@ +package com.pinterest.psc.metadata.creation; + +import com.pinterest.psc.logging.PscLogger; +import org.reflections.Reflections; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class PscMetadataClientCreatorManager { + + private static final PscLogger logger = PscLogger.getLogger(PscMetadataClientCreatorManager.class); + private final Map backendMetadataClientCreatorMap = + findAndRegisterMetadataClientCreators(PscMetadataClientCreatorManager.class.getPackage().getName()); + + private static Map findAndRegisterMetadataClientCreators(String packageName) { + Map backendCreatorRegistry = new HashMap<>(); + Reflections reflections = new Reflections(packageName.trim()); + Set> annotatedClasses = reflections.getTypesAnnotatedWith(PscMetadataClientCreatorPlugin.class); + for (Class annotatedClass : annotatedClasses) { + PscMetadataClientCreatorPlugin plugin = annotatedClass.getAnnotation(PscMetadataClientCreatorPlugin.class); + if (plugin == null) { + logger.error("Plugin info null: " + annotatedClass.getName()); + continue; + } + String backend = plugin.backend(); + if (backend.isEmpty()) { + logger.warn("Ignoring due to empty backend for plugin: " + annotatedClass.getName()); + continue; + } + if (backendCreatorRegistry.containsKey(backend)) { + logger.error("Output plugin alias '" + backend + "' already exists: " + annotatedClass.getName()); + System.exit(-1); + } + try { + backendCreatorRegistry.put(backend, (PscBackendMetadataClientCreator) annotatedClass.newInstance()); + } catch (IllegalAccessException | InstantiationException e) { + throw new RuntimeException("Failed to register PscBackendMetadataClientCreator", e); + } + } + return backendCreatorRegistry; + } + + public Map getBackendCreators() { + return backendMetadataClientCreatorMap; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorPlugin.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorPlugin.java new file mode 100644 index 0000000..41162f2 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorPlugin.java @@ -0,0 +1,12 @@ +package com.pinterest.psc.metadata.creation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface PscMetadataClientCreatorPlugin { + String backend(); +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java new file mode 100644 index 0000000..19249ee --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java @@ -0,0 +1,42 @@ +package com.pinterest.psc.metadata.kafka; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.ServiceDiscoveryConfig; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.metadata.PscBackendMetadataClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.TopicListing; + +import java.util.Collection; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class PscKafkaMetadataClient extends PscBackendMetadataClient { + + private AdminClient kafkaAdminClient; + + public void initialize(TopicUri topicUri, ServiceDiscoveryConfig discoveryConfig) { + super.initialize(topicUri, discoveryConfig); + Properties properties = new Properties(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, discoveryConfig.getConnect()); + this.kafkaAdminClient = AdminClient.create(properties); + } + + @Override + public List getTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); + Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); + return topicListing.stream().map(tl -> getTopicRn(tl.name())).collect(Collectors.toList()); + } + + @Override + public void close() throws Exception { + } +} diff --git a/psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java b/psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java new file mode 100644 index 0000000..5c0e8cd --- /dev/null +++ b/psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java @@ -0,0 +1,50 @@ +package com.pinterest.psc.metadata; + +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.discovery.DiscoveryUtil; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; +import com.pinterest.psc.metadata.kafka.PscKafkaMetadataClient; +import org.junit.jupiter.api.Test; + +import java.io.IOException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestPscMetadataClient { + + protected static final String testKafkaTopic1 = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1"; + + @Test + void testGetBackendMetadataClient() throws ConfigurationException, TopicUriSyntaxException, IOException { + String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); + PscConfiguration pscConfiguration = new PscConfiguration(); + pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); + + PscMetadataClient pscMetadataClient = new PscMetadataClient(pscConfiguration); + PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); + assertEquals(PscKafkaMetadataClient.class, backendMetadataClient.getClass()); + assertEquals("kafkacluster01001:9092,kafkacluster01002:9092", backendMetadataClient.getServiceDiscoveryConfig().getConnect()); + } + + @Test + void testGetTopicRn() throws IOException, ConfigurationException, TopicUriSyntaxException { + String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); + PscConfiguration pscConfiguration = new PscConfiguration(); + pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); + + PscMetadataClient pscMetadataClient = new PscMetadataClient(pscConfiguration); + PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); + TopicRn topic1Rn = TopicUri.validate(testKafkaTopic1).getTopicRn(); + TopicRn topic2Rn = backendMetadataClient.getTopicRn("topic2"); + assertEquals(topic1Rn.getStandard(), topic2Rn.getStandard()); + assertEquals(topic1Rn.getService(), topic2Rn.getService()); + assertEquals(topic1Rn.getEnvironment(), topic2Rn.getEnvironment()); + assertEquals(topic1Rn.getCloud(), topic2Rn.getCloud()); + assertEquals(topic1Rn.getRegion(), topic2Rn.getRegion()); + assertEquals(topic1Rn.getClassifier(), topic2Rn.getClassifier()); + assertEquals(topic1Rn.getCluster(), topic2Rn.getCluster()); + } +} From 961db589ae52b04360c1f84e5f4f1eb9c886aef8 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 28 Aug 2024 18:37:08 -0400 Subject: [PATCH 2/7] WIP metadataClient API impl; finished listOffsets --- .../client/TestPscMetadataClient.java | 229 ++++++++++++++++++ .../com/pinterest/psc/common/TopicRn.java | 30 +++ .../psc/common/TopicUriPartition.java | 2 +- .../psc/config/PscConfiguration.java | 6 + .../psc/config/PscConfigurationInternal.java | 24 +- .../pinterest/psc/metadata/MetadataUtils.java | 24 ++ .../metadata/PscBackendMetadataClient.java | 53 ---- .../psc/metadata/PscMetadataClient.java | 61 ----- .../psc/metadata/TopicRnMetadata.java | 25 ++ .../client/PscBackendMetadataClient.java | 52 ++++ .../metadata/client/PscMetadataClient.java | 84 +++++++ .../client/kafka/PscKafkaMetadataClient.java | 123 ++++++++++ .../PscBackendMetadataClientCreator.java | 4 +- .../PscKafkaMetadataClientCreator.java | 9 +- .../kafka/PscKafkaMetadataClient.java | 42 ---- .../{ => client}/TestPscMetadataClient.java | 22 +- 16 files changed, 615 insertions(+), 175 deletions(-) create mode 100644 psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java delete mode 100644 psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java delete mode 100644 psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java create mode 100644 psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java delete mode 100644 psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java rename psc/src/test/java/com/pinterest/psc/metadata/{ => client}/TestPscMetadataClient.java (70%) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java new file mode 100644 index 0000000..7bb1c5b --- /dev/null +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -0,0 +1,229 @@ +package com.pinterest.psc.metadata.client; + +import com.pinterest.psc.common.BaseTopicUri; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.TestUtils; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.kafka.KafkaTopicUri; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.exception.producer.ProducerException; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; +import com.pinterest.psc.integration.KafkaCluster; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.producer.PscProducer; +import com.pinterest.psc.producer.PscProducerMessage; +import com.pinterest.psc.serde.IntegerSerializer; +import com.pinterest.psc.utils.PscTestUtils; +import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; +import com.salesforce.kafka.test.listeners.PlainListener; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class TestPscMetadataClient { + + @RegisterExtension + public static final SharedKafkaTestResource sharedKafkaTestResource1 = new SharedKafkaTestResource() + .withBrokers(1).registerListener(new PlainListener().onPorts(9092)); + private static final PscConfiguration metadataClientConfiguration = new PscConfiguration(); + private static String clientId; + private static final String topic1 = "topic1"; + private static final int partitions1 = 12; + private static final String topic2 = "topic2"; + private static final int partitions2 = 24; + private static final String topic3 = "topic3"; + private static final int partitions3 = 36; + private KafkaCluster kafkaCluster1; + private String topicUriStr1, topicUriStr2, topicUriStr3, clusterUriStr; + private TopicRn topic1Rn, topic2Rn, topic3Rn; + + /** + * Initializes two Kafka clusters that are commonly used by all tests, and creates a single topic on each. + * + * @throws IOException + */ + @BeforeEach + public void setup() throws IOException, InterruptedException, TopicUriSyntaxException { + clientId = this.getClass().getSimpleName() + "-psc-metadata-client"; + metadataClientConfiguration.clear(); + metadataClientConfiguration.setProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, clientId); + metadataClientConfiguration.setProperty(PscConfiguration.PSC_METRICS_REPORTER_CLASS, TestUtils.DEFAULT_METRICS_REPORTER); + metadataClientConfiguration.setProperty(PscConfiguration.PSC_CONFIG_LOGGING_ENABLED, "false"); + kafkaCluster1 = new KafkaCluster("plaintext", "region", "cluster", 9092); + topicUriStr1 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic1); + + topicUriStr2 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic2); + + topicUriStr3 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic3); + + clusterUriStr = String.format("%s:%s%s:kafka:env:cloud_%s::%s:", + kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster() + ); + + PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic1, partitions1); + PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic2, partitions2); + PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic3, partitions3); + + topic1Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), topic1); + topic2Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), topic2); + topic3Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr3)), topic3); + } + + /** + * Deleted the topics that are created by default. Also, adds a slight delay to make sure cleanup is complete + * when tests run consecutively. + * + * @throws ExecutionException + * @throws InterruptedException + */ + @AfterEach + public void tearDown() throws ExecutionException, InterruptedException { + PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic1); + PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic2); + PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic3); + Thread.sleep(1000); + } + + @Test + public void testListTopicRns() throws Exception { + PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); + List topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), 10000, TimeUnit.MILLISECONDS); + List expectedTopicRnList = Arrays.asList(topic1Rn, topic2Rn, topic3Rn); + assertEquals(expectedTopicRnList, topicRnList); + client.close(); + } + + @Test + public void testDescribeTopicRns() throws Exception { + PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); + Map topicRnDescriptionMap = client.describeTopicRns( + KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), + new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)), + 10000, + TimeUnit.MILLISECONDS + ); + assertEquals(3, topicRnDescriptionMap.size()); + + assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicRn()); + assertEquals(partitions1, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().size()); + for (int i = 0; i < partitions1; i++) { + assertEquals(topic1Rn, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); + assertEquals(i, topicRnDescriptionMap.get(topic1Rn).getTopicUriPartitions().get(i).getPartition()); + } + assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicRn()); + assertEquals(partitions2, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().size()); + for (int i = 0; i < partitions2; i++) { + assertEquals(topic2Rn, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); + assertEquals(i, topicRnDescriptionMap.get(topic2Rn).getTopicUriPartitions().get(i).getPartition()); + } + assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicRn()); + assertEquals(partitions3, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().size()); + for (int i = 0; i < partitions3; i++) { + assertEquals(topic3Rn, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getTopicUri().getTopicRn()); + assertEquals(i, topicRnDescriptionMap.get(topic3Rn).getTopicUriPartitions().get(i).getPartition()); + } + client.close(); + } + + @Test + public void testListOffsets() throws Exception { + PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); + Map topicUriPartitionsAndOptions = new HashMap<>(); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); + Map offsets = client.listOffsets( + clusterUri, + topicUriPartitionsAndOptions, + 10000, + TimeUnit.MILLISECONDS + ); + assertEquals(2, offsets.size()); + + // ensure that the offsets are 0 when the topic is empty + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); + + // send one message to partition 0 for both topics + PscProducer pscProducer = getPscProducer(); + pscProducer.send(new PscProducerMessage(topicUriStr1, 0,0,0)); + pscProducer.send(new PscProducerMessage(topicUriStr2, 0,0,0)); + pscProducer.flush(); + + offsets = client.listOffsets( + clusterUri, + topicUriPartitionsAndOptions, + 10000, + TimeUnit.MILLISECONDS + ); + + // ensure sent offsets are captured in next metadataClient call + assertEquals(2, offsets.size()); + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); // earliest offset + assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); // latest offset + + // now change the spec to latest for both topics, and add topic1 partitions 5 and 10 to the query + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + + // send 2 messages to topic1 partition 0 - now the latest offset should be 3 + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0, 0, 0)); + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0, 0, 0)); + // send 1 message to topic1 partition 5 - now the latest offset should be 1 + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 5, 0, 0)); + // send 1 message to topic2 partition 0 - now the latest offset should be 2 + pscProducer.send(new PscProducerMessage<>(topicUriStr2, 0, 0, 0)); + pscProducer.flush(); + + offsets = client.listOffsets( + clusterUri, + topicUriPartitionsAndOptions, + 10000, + TimeUnit.MILLISECONDS + ); + + // ensure sent offsets are captured in next metadataClient call + assertEquals(4, offsets.size()); + assertEquals(3, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); + assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5)).getOffset()); + assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10)).getOffset()); + assertEquals(2, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); + + client.close(); + pscProducer.close(); + } + + private static PscProducer getPscProducer() throws ConfigurationException, ProducerException { + PscConfiguration producerConfiguration = new PscConfiguration(); + String baseProducerId = "psc-producer-client"; + producerConfiguration.setProperty(PscConfiguration.PSC_METRICS_REPORTER_CLASS, TestUtils.DEFAULT_METRICS_REPORTER); + producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID, baseProducerId + "-" + UUID.randomUUID()); + producerConfiguration.setProperty(PscConfiguration.PSC_CONFIG_LOGGING_ENABLED, "false"); + producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, IntegerSerializer.class.getName()); + producerConfiguration.setProperty(PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, IntegerSerializer.class.getName()); + + return new PscProducer<>(producerConfiguration); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java index af4a084..f2ff3d2 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java @@ -4,6 +4,7 @@ import com.pinterest.psc.logging.PscLogger; import java.io.IOException; +import java.util.Objects; import java.util.Properties; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -107,6 +108,10 @@ public TopicRn( ); } + public String getTopicRnString() { + return topicRnString; + } + public String getTopicRnPrefixString() { return topicRnPrefixString; } @@ -171,6 +176,31 @@ private static String upgradeTopicRnToCurrentVersion(String topicRnAsStr, byte s throw new TopicRnSyntaxException(String.format("Unsupported topic RN version %d", serializedVersion)); } + public void setTopic(String topic) { + this.topic = topic; + if (this.topicRnString.endsWith(":")) { + this.topicRnString = this.topicRnString + topic; + } else { + this.topicRnString = this.topicRnString + ":" + topic; + } + } + + @Override + public int hashCode() { + return Objects.hash( + topicRnString, + topicRnPrefixString, + standard, + service, + environment, + cloud, + region, + classifier, + cluster, + topic + ); + } + @Override public boolean equals(Object other) { if (this == other) { diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java b/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java index 969a714..0d5fc38 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java @@ -42,7 +42,7 @@ public TopicUriPartition(String topicUriStr, int partition) { } } - protected TopicUriPartition(TopicUri topicUri, int partition) { + public TopicUriPartition(TopicUri topicUri, int partition) { this.backendTopicUri = topicUri; this.topicUriStr = topicUri.getTopicUriAsString(); this.partition = partition; diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java index 62e5e88..03aaff0 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java @@ -481,6 +481,12 @@ public class PscConfiguration extends PropertiesConfiguration { public static final String PSC_PRODUCER_SSL_TRUSTSTORE_TYPE = PSC_PRODUCER + "." + SSL_TRUSTSTORE_TYPE; */ + // ********************** + // MetadataClient Configuration + // ********************** + + protected static final String PSC_METADATA = "psc.metadata"; + public static final String PSC_METADATA_CLIENT_ID = PSC_METADATA + "." + CLIENT_ID; // ********************** // Metrics Configuration diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java index 929e37c..22891e7 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java @@ -142,7 +142,7 @@ protected void validate(boolean isLenient, boolean isLogConfiguration) throws Co validateProducerConfiguration(isLenient, isLogConfiguration); break; case PSC_CLIENT_TYPE_METADATA: - // no-op, we only need environment for metadata client + validateMetadataClientConfiguration(isLenient, isLogConfiguration); break; default: throw new ConfigurationException("Valid client type expected: " + String.join(", ", PSC_VALID_CLIENT_TYPES)); @@ -487,6 +487,24 @@ private T verifyConfigHasValue( return configuration.get(expectedType, configKey); } + private void validateMetadataClientConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException { + PscConfiguration metadataConfiguration = new PscConfiguration(); + metadataConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_METADATA)); + Map invalidConfigs = new HashMap<>(); + verifyConfigHasValue(metadataConfiguration, PscConfiguration.CLIENT_ID, String.class, invalidConfigs); + if (isLogConfiguration) + logConfiguration(); + + if (invalidConfigs.isEmpty() || isLenient) + return; + + StringBuilder stringBuilder = new StringBuilder(); + invalidConfigs.forEach((error, exception) -> + stringBuilder.append(String.format("\t%s: %s\n", error, exception == null ? "" : exception.getMessage())) + ); + throw new ConfigurationException("Invalid metadataClient configuration\n" + stringBuilder.toString()); + } + private void validateProducerConfiguration(boolean isLenient, boolean isLogConfiguration) throws ConfigurationException { PscConfiguration producerConfiguration = new PscConfiguration(); producerConfiguration.copy(pscConfiguration.subset(PscConfiguration.PSC_PRODUCER)); @@ -753,4 +771,8 @@ public int getAutoResolutionRetryCount() { public MetricsReporterConfiguration getMetricsReporterConfiguration() { return metricsReporterConfiguration; } + + public String getMetadataClientId() { + return pscConfiguration.getString(PscConfiguration.PSC_METADATA_CLIENT_ID); + } } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java new file mode 100644 index 0000000..cebc229 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java @@ -0,0 +1,24 @@ +package com.pinterest.psc.metadata; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; + +public class MetadataUtils { + + @VisibleForTesting + public static TopicRn createTopicRn(TopicUri topicUri, String topicName) { + return new TopicRn( + topicUri.getTopicRn().getTopicRnPrefixString() + topicName, + topicUri.getTopicRn().getTopicRnPrefixString(), + topicUri.getTopicRn().getStandard(), + topicUri.getTopicRn().getService(), + topicUri.getTopicRn().getEnvironment(), + topicUri.getTopicRn().getCloud(), + topicUri.getTopicRn().getRegion(), + topicUri.getTopicRn().getClassifier(), + topicUri.getTopicRn().getCluster(), + topicName + ); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java deleted file mode 100644 index 63fcece..0000000 --- a/psc/src/main/java/com/pinterest/psc/metadata/PscBackendMetadataClient.java +++ /dev/null @@ -1,53 +0,0 @@ -package com.pinterest.psc.metadata; - -import com.google.common.annotations.VisibleForTesting; -import com.pinterest.psc.common.ServiceDiscoveryConfig; -import com.pinterest.psc.common.TopicRn; -import com.pinterest.psc.common.TopicUri; - -import java.util.List; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -public abstract class PscBackendMetadataClient implements AutoCloseable { - - private TopicUri topicUri; - private ServiceDiscoveryConfig serviceDiscoveryConfig; - - public void initialize(TopicUri topicUri, ServiceDiscoveryConfig discoveryConfig) { - this.topicUri = topicUri; - this.serviceDiscoveryConfig = discoveryConfig; - } - - public abstract List getTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; - - public abstract void close() throws Exception; - - @VisibleForTesting - protected TopicRn getTopicRn(String topicName) { - String topicRnPrefix = topicUri.getTopicRn().getTopicRnPrefixString(); - return new TopicRn( - topicRnPrefix, - topicRnPrefix, - topicUri.getTopicRn().getStandard(), - topicUri.getTopicRn().getService(), - topicUri.getTopicRn().getEnvironment(), - topicUri.getTopicRn().getCloud(), - topicUri.getTopicRn().getRegion(), - topicUri.getTopicRn().getClassifier(), - topicUri.getTopicRn().getCluster(), - topicName - ); - } - - @VisibleForTesting - protected TopicUri getTopicUri() { - return topicUri; - } - - @VisibleForTesting - protected ServiceDiscoveryConfig getServiceDiscoveryConfig() { - return serviceDiscoveryConfig; - } -} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java deleted file mode 100644 index efc734c..0000000 --- a/psc/src/main/java/com/pinterest/psc/metadata/PscMetadataClient.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.pinterest.psc.metadata; - -import com.google.common.annotations.VisibleForTesting; -import com.pinterest.psc.common.PscUtils; -import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.config.PscConfiguration; -import com.pinterest.psc.config.PscConfigurationInternal; -import com.pinterest.psc.environment.Environment; -import com.pinterest.psc.environment.EnvironmentProvider; -import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; -import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class PscMetadataClient implements AutoCloseable { - - private PscMetadataClientCreatorManager creatorManager; - private Environment environment; - private final PscConfigurationInternal pscConfigurationInternal; - private Map pscBackendMetadataClientByTopicUriPrefix = new ConcurrentHashMap<>(); - - public PscMetadataClient(PscConfiguration pscConfiguration) throws ConfigurationException { - this.pscConfigurationInternal = new PscConfigurationInternal( - pscConfiguration, - PscConfigurationInternal.PSC_CLIENT_TYPE_METADATA - ); - initialize(); - } - - private void initialize() { - creatorManager = new PscMetadataClientCreatorManager(); - environment = pscConfigurationInternal.getEnvironment(); - } - - public List getTopicUris(TopicUri topicUri) { - PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(topicUri); - return null; - } - - @VisibleForTesting - protected PscBackendMetadataClient getBackendMetadataClient(TopicUri topicUri) { - String topicUriPrefix = topicUri.getTopicUriPrefix(); - pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> { - PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(topicUri.getBackend()); - try { - return backendMetadataClientCreator.create(environment, pscConfigurationInternal, topicUri); - } catch (ConfigurationException e) { - throw new RuntimeException(e); - } - }); - return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix); - } - - @Override - public void close() throws Exception { - - } -} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java b/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java new file mode 100644 index 0000000..ab00c5f --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java @@ -0,0 +1,25 @@ +package com.pinterest.psc.metadata; + +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUriPartition; + +import java.util.List; + +public class TopicRnMetadata { + + private final TopicRn topicRn; + private final List topicUriPartitions; + + public TopicRnMetadata(TopicRn topicRn, List topicUriPartitions) { + this.topicRn = topicRn; + this.topicUriPartitions = topicUriPartitions; + } + + public TopicRn getTopicRn() { + return topicRn; + } + + public List getTopicUriPartitions() { + return topicUriPartitions; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java new file mode 100644 index 0000000..39ba99e --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java @@ -0,0 +1,52 @@ +package com.pinterest.psc.metadata.client; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.ServiceDiscoveryConfig; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.discovery.ServiceDiscoveryManager; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.metadata.TopicRnMetadata; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public abstract class PscBackendMetadataClient implements AutoCloseable { + + protected TopicUri topicUri; + protected PscConfigurationInternal pscConfigurationInternal; + protected ServiceDiscoveryConfig discoveryConfig; + + public void initialize(TopicUri topicUri, Environment env, PscConfigurationInternal pscConfigurationInternal) throws ConfigurationException { + this.topicUri = topicUri; + this.pscConfigurationInternal = pscConfigurationInternal; + this.discoveryConfig = + ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri); + } + + public abstract List listTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract Map describeTopicRns(Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract Map listOffsets(Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract void close() throws Exception; + + @VisibleForTesting + protected TopicUri getTopicUri() { + return topicUri; + } + + @VisibleForTesting + protected ServiceDiscoveryConfig getDiscoveryConfig() { + return discoveryConfig; + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java new file mode 100644 index 0000000..e066acd --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -0,0 +1,84 @@ +package com.pinterest.psc.metadata.client; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; +import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class PscMetadataClient implements AutoCloseable { + private PscMetadataClientCreatorManager creatorManager; + private Environment environment; + private final PscConfigurationInternal pscConfigurationInternal; + private final Map pscBackendMetadataClientByTopicUriPrefix = new ConcurrentHashMap<>(); + + public PscMetadataClient(PscConfiguration pscConfiguration) throws ConfigurationException { + this.pscConfigurationInternal = new PscConfigurationInternal( + pscConfiguration, + PscConfigurationInternal.PSC_CLIENT_TYPE_METADATA + ); + initialize(); + } + + private void initialize() { + creatorManager = new PscMetadataClientCreatorManager(); + environment = pscConfigurationInternal.getEnvironment(); + } + + public List listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); + return backendMetadataClient.listTopicRns(timeout, timeUnit); + } + + public Map describeTopicRns(TopicUri clusterUri, Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); + return backendMetadataClient.describeTopicRns(topicRns, timeout, timeUnit); + } + + public Map listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); + return backendMetadataClient.listOffsets(topicRnsAndOptions, timeout, timeUnit); + } + + @VisibleForTesting + protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) { + String topicUriPrefix = clusterUri.getTopicUriPrefix(); + pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> { + PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(clusterUri.getBackend()); + try { + return backendMetadataClientCreator.create(environment, pscConfigurationInternal, clusterUri); + } catch (ConfigurationException e) { + throw new RuntimeException(e); + } + }); + return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix); + } + + @Override + public void close() throws Exception { + for (PscBackendMetadataClient client : pscBackendMetadataClientByTopicUriPrefix.values()) { + client.close(); + } + pscBackendMetadataClientByTopicUriPrefix.clear(); + } + + public enum MetadataClientOption { + OFFSET_SPEC_EARLIEST, + OFFSET_SPEC_LATEST + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java new file mode 100644 index 0000000..a289795 --- /dev/null +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -0,0 +1,123 @@ +package com.pinterest.psc.metadata.client.kafka; + +import com.pinterest.psc.common.BaseTopicUri; +import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.common.ServiceDiscoveryConfig; +import com.pinterest.psc.common.TopicRn; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; +import com.pinterest.psc.common.kafka.KafkaTopicUri; +import com.pinterest.psc.config.PscConfigurationInternal; +import com.pinterest.psc.config.PscConfigurationUtils; +import com.pinterest.psc.discovery.ServiceDiscoveryManager; +import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.logging.PscLogger; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.TopicRnMetadata; +import com.pinterest.psc.metadata.client.PscBackendMetadataClient; +import com.pinterest.psc.metadata.client.PscMetadataClient; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +public class PscKafkaMetadataClient extends PscBackendMetadataClient { + + private static final PscLogger logger = PscLogger.getLogger(PscKafkaMetadataClient.class); + private AdminClient kafkaAdminClient; + + @Override + public void initialize(TopicUri topicUri, Environment env, PscConfigurationInternal pscConfigurationInternal) throws ConfigurationException { + super.initialize(topicUri, env, pscConfigurationInternal); + Properties properties = PscConfigurationUtils.pscConfigurationInternalToProperties(pscConfigurationInternal); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, discoveryConfig.getConnect()); + properties.put(AdminClientConfig.CLIENT_ID_CONFIG, pscConfigurationInternal.getMetadataClientId()); + this.kafkaAdminClient = AdminClient.create(properties); + logger.info("Initialized Kafka AdminClient with properties: " + properties); + } + + @Override + public List listTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); + Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); + return topicListing.stream().map(tl -> MetadataUtils.createTopicRn(topicUri, tl.name())).collect(Collectors.toList()); + } + + @Override + public Map describeTopicRns(Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + Collection topicNames = topicRns.stream().map(TopicRn::getTopic).collect(Collectors.toSet()); + Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(timeout, timeUnit); + Map result = new HashMap<>(); + for (Map.Entry entry : topicMetadata.entrySet()) { + String topicName = entry.getKey(); + TopicDescription description = entry.getValue(); + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, topicName); + List topicUriPartitions = new ArrayList<>(); + for (TopicPartitionInfo partitionInfo : description.partitions()) { + topicUriPartitions.add( + createKafkaTopicUriPartition(topicRn, partitionInfo.partition()) + ); + } + result.put(topicRn, new TopicRnMetadata(topicRn, topicUriPartitions)); + } + return result; + } + + @Override + public Map listOffsets(Map topicUriPartitionsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + Map topicPartitionOffsets = new HashMap<>(); + for (Map.Entry entry : topicUriPartitionsAndOptions.entrySet()) { + TopicUriPartition topicUriPartition = entry.getKey(); + PscMetadataClient.MetadataClientOption option = entry.getValue(); + OffsetSpec offsetSpec; + if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST) + offsetSpec = OffsetSpec.earliest(); + else if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) + offsetSpec = OffsetSpec.latest(); + else + throw new IllegalArgumentException("Unsupported MetadataClientOption for listOffsets(): " + option); + topicPartitionOffsets.put(new TopicPartition(topicUriPartition.getTopicUri().getTopic(), topicUriPartition.getPartition()), offsetSpec); + } + ListOffsetsResult listOffsetsResult = kafkaAdminClient.listOffsets(topicPartitionOffsets); + Map result = new HashMap<>(); + listOffsetsResult.all().get(timeout, timeUnit).entrySet().forEach(e -> { + TopicPartition tp = e.getKey(); + ListOffsetsResult.ListOffsetsResultInfo info = e.getValue(); + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); + result.put( + createKafkaTopicUriPartition(topicRn, tp.partition()), + new MessageId(createKafkaTopicUriPartition(topicRn, tp.partition()), info.offset(), info.timestamp()) + ); + }); + return result; + } + + private TopicUriPartition createKafkaTopicUriPartition(TopicRn topicRn, int partition) { + return new TopicUriPartition(new KafkaTopicUri(new BaseTopicUri(topicUri.getProtocol(), topicRn)), partition); + } + + @Override + public void close() throws Exception { + if (kafkaAdminClient != null) + kafkaAdminClient.close(); + logger.info("Closed PscKafkaMetadataClient"); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java index 5b4b696..69be429 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java @@ -4,10 +4,10 @@ import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.metadata.PscBackendMetadataClient; +import com.pinterest.psc.metadata.client.PscBackendMetadataClient; public abstract class PscBackendMetadataClientCreator { - public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) throws ConfigurationException; + public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException; } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java index 1dd8e60..a202d4c 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java @@ -6,17 +6,18 @@ import com.pinterest.psc.discovery.ServiceDiscoveryManager; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.metadata.kafka.PscKafkaMetadataClient; +import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient; @PscMetadataClientCreatorPlugin(backend = PscUtils.BACKEND_TYPE_KAFKA) public class PscKafkaMetadataClientCreator extends PscBackendMetadataClientCreator { @Override - public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri topicUri) throws ConfigurationException { + public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException { PscKafkaMetadataClient pscKafkaMetadataClient = new PscKafkaMetadataClient(); pscKafkaMetadataClient.initialize( - topicUri, - ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri) + clusterUri, + env, + pscConfigurationInternal ); return pscKafkaMetadataClient; } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java deleted file mode 100644 index 19249ee..0000000 --- a/psc/src/main/java/com/pinterest/psc/metadata/kafka/PscKafkaMetadataClient.java +++ /dev/null @@ -1,42 +0,0 @@ -package com.pinterest.psc.metadata.kafka; - -import com.google.common.annotations.VisibleForTesting; -import com.pinterest.psc.common.ServiceDiscoveryConfig; -import com.pinterest.psc.common.TopicRn; -import com.pinterest.psc.common.TopicUri; -import com.pinterest.psc.metadata.PscBackendMetadataClient; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.AdminClientConfig; -import org.apache.kafka.clients.admin.ListTopicsResult; -import org.apache.kafka.clients.admin.TopicListing; - -import java.util.Collection; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; - -public class PscKafkaMetadataClient extends PscBackendMetadataClient { - - private AdminClient kafkaAdminClient; - - public void initialize(TopicUri topicUri, ServiceDiscoveryConfig discoveryConfig) { - super.initialize(topicUri, discoveryConfig); - Properties properties = new Properties(); - properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, discoveryConfig.getConnect()); - this.kafkaAdminClient = AdminClient.create(properties); - } - - @Override - public List getTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { - ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); - Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); - return topicListing.stream().map(tl -> getTopicRn(tl.name())).collect(Collectors.toList()); - } - - @Override - public void close() throws Exception { - } -} diff --git a/psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java similarity index 70% rename from psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java rename to psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 5c0e8cd..677c7b2 100644 --- a/psc/src/test/java/com/pinterest/psc/metadata/TestPscMetadataClient.java +++ b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -1,4 +1,4 @@ -package com.pinterest.psc.metadata; +package com.pinterest.psc.metadata.client; import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; @@ -6,12 +6,14 @@ import com.pinterest.psc.discovery.DiscoveryUtil; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.exception.startup.TopicUriSyntaxException; -import com.pinterest.psc.metadata.kafka.PscKafkaMetadataClient; +import com.pinterest.psc.metadata.MetadataUtils; +import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient; import org.junit.jupiter.api.Test; import java.io.IOException; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; public class TestPscMetadataClient { @@ -22,23 +24,20 @@ void testGetBackendMetadataClient() throws ConfigurationException, TopicUriSynta String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); PscConfiguration pscConfiguration = new PscConfiguration(); pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); + pscConfiguration.addProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "test-metadata-client"); PscMetadataClient pscMetadataClient = new PscMetadataClient(pscConfiguration); PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); assertEquals(PscKafkaMetadataClient.class, backendMetadataClient.getClass()); - assertEquals("kafkacluster01001:9092,kafkacluster01002:9092", backendMetadataClient.getServiceDiscoveryConfig().getConnect()); + assertEquals("kafkacluster01001:9092,kafkacluster01002:9092", backendMetadataClient.getDiscoveryConfig().getConnect()); } @Test - void testGetTopicRn() throws IOException, ConfigurationException, TopicUriSyntaxException { - String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); - PscConfiguration pscConfiguration = new PscConfiguration(); - pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); - - PscMetadataClient pscMetadataClient = new PscMetadataClient(pscConfiguration); - PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); + void testCreateTopicRn() throws TopicUriSyntaxException { TopicRn topic1Rn = TopicUri.validate(testKafkaTopic1).getTopicRn(); - TopicRn topic2Rn = backendMetadataClient.getTopicRn("topic2"); + TopicRn topic1RnCreated = MetadataUtils.createTopicRn(TopicUri.validate(testKafkaTopic1), "topic1"); + assertTrue(topic1Rn.equals(topic1RnCreated)); + TopicRn topic2Rn = MetadataUtils.createTopicRn(TopicUri.validate(testKafkaTopic1), "topic2"); assertEquals(topic1Rn.getStandard(), topic2Rn.getStandard()); assertEquals(topic1Rn.getService(), topic2Rn.getService()); assertEquals(topic1Rn.getEnvironment(), topic2Rn.getEnvironment()); @@ -46,5 +45,6 @@ void testGetTopicRn() throws IOException, ConfigurationException, TopicUriSyntax assertEquals(topic1Rn.getRegion(), topic2Rn.getRegion()); assertEquals(topic1Rn.getClassifier(), topic2Rn.getClassifier()); assertEquals(topic1Rn.getCluster(), topic2Rn.getCluster()); + assertEquals("topic2", topic2Rn.getTopic()); } } From 00562d5d6b1fe5e3eb6b35da7db64d34c19c3cd2 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 28 Aug 2024 18:58:48 -0400 Subject: [PATCH 3/7] Minor code cleanups --- .../client/TestPscMetadataClient.java | 5 ++ .../client/PscBackendMetadataClient.java | 26 +++++++-- .../client/kafka/PscKafkaMetadataClient.java | 57 ++++++++++++++++--- .../PscKafkaMetadataClientCreator.java | 1 - .../client/TestPscMetadataClient.java | 8 ++- 5 files changed, 82 insertions(+), 15 deletions(-) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 7bb1c5b..f3d3b4f 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -215,6 +215,11 @@ public void testListOffsets() throws Exception { pscProducer.close(); } + @Test + public void testListOffsetsForConsumerGroup() { + + } + private static PscProducer getPscProducer() throws ConfigurationException, ProducerException { PscConfiguration producerConfiguration = new PscConfiguration(); String baseProducerId = "psc-producer-client"; diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java index 39ba99e..f5a077d 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java @@ -12,9 +12,9 @@ import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.metadata.TopicRnMetadata; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -32,11 +32,29 @@ public void initialize(TopicUri topicUri, Environment env, PscConfigurationInter ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri); } - public abstract List listTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + public abstract List listTopicRns( + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException; - public abstract Map describeTopicRns(Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + public abstract Map describeTopicRns( + Collection topicRns, + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException; - public abstract Map listOffsets(Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException; + public abstract Map listOffsets( + Map topicRnsAndOptions, + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException; + + public abstract Map listOffsetsForConsumerGroup( + String consumerGroupId, + Collection topicUriPartitions, + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException; public abstract void close() throws Exception; diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java index a289795..2413602 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -2,14 +2,12 @@ import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.MessageId; -import com.pinterest.psc.common.ServiceDiscoveryConfig; import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.common.kafka.KafkaTopicUri; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.config.PscConfigurationUtils; -import com.pinterest.psc.discovery.ServiceDiscoveryManager; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.logging.PscLogger; @@ -19,11 +17,13 @@ import com.pinterest.psc.metadata.client.PscMetadataClient; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions; import org.apache.kafka.clients.admin.ListOffsetsResult; import org.apache.kafka.clients.admin.ListTopicsResult; import org.apache.kafka.clients.admin.OffsetSpec; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -45,7 +44,11 @@ public class PscKafkaMetadataClient extends PscBackendMetadataClient { private AdminClient kafkaAdminClient; @Override - public void initialize(TopicUri topicUri, Environment env, PscConfigurationInternal pscConfigurationInternal) throws ConfigurationException { + public void initialize( + TopicUri topicUri, + Environment env, + PscConfigurationInternal pscConfigurationInternal + ) throws ConfigurationException { super.initialize(topicUri, env, pscConfigurationInternal); Properties properties = PscConfigurationUtils.pscConfigurationInternalToProperties(pscConfigurationInternal); properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, discoveryConfig.getConnect()); @@ -55,14 +58,21 @@ public void initialize(TopicUri topicUri, Environment env, PscConfigurationInter } @Override - public List listTopicRns(long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public List listTopicRns( + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException { ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); return topicListing.stream().map(tl -> MetadataUtils.createTopicRn(topicUri, tl.name())).collect(Collectors.toList()); } @Override - public Map describeTopicRns(Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map describeTopicRns( + Collection topicRns, + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException { Collection topicNames = topicRns.stream().map(TopicRn::getTopic).collect(Collectors.toSet()); Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(timeout, timeUnit); Map result = new HashMap<>(); @@ -82,7 +92,11 @@ public Map describeTopicRns(Set topicRns, lon } @Override - public Map listOffsets(Map topicUriPartitionsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsets( + Map topicUriPartitionsAndOptions, + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException { Map topicPartitionOffsets = new HashMap<>(); for (Map.Entry entry : topicUriPartitionsAndOptions.entrySet()) { TopicUriPartition topicUriPartition = entry.getKey(); @@ -94,7 +108,8 @@ else if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) offsetSpec = OffsetSpec.latest(); else throw new IllegalArgumentException("Unsupported MetadataClientOption for listOffsets(): " + option); - topicPartitionOffsets.put(new TopicPartition(topicUriPartition.getTopicUri().getTopic(), topicUriPartition.getPartition()), offsetSpec); + topicPartitionOffsets.put( + new TopicPartition(topicUriPartition.getTopicUri().getTopic(), topicUriPartition.getPartition()), offsetSpec); } ListOffsetsResult listOffsetsResult = kafkaAdminClient.listOffsets(topicPartitionOffsets); Map result = new HashMap<>(); @@ -110,6 +125,32 @@ else if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) return result; } + @Override + public Map listOffsetsForConsumerGroup( + String consumerGroupId, + Collection topicUriPartitions, + long timeout, + TimeUnit timeUnit + ) throws ExecutionException, InterruptedException, TimeoutException { + ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions(); + options.topicPartitions(topicUriPartitions.stream().map(tup -> + new TopicPartition(tup.getTopicUri().getTopic(), tup.getPartition())).collect(Collectors.toList())); + Map offsets = kafkaAdminClient + .listConsumerGroupOffsets(consumerGroupId, options) + .partitionsToOffsetAndMetadata() + .get(timeout, timeUnit); + Map result = new HashMap<>(); + offsets.forEach((tp, offsetAndMetadata) -> { + TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); + TopicUriPartition tup = createKafkaTopicUriPartition(topicRn, tp.partition()); + result.put( + tup, + new MessageId(tup, offsetAndMetadata.offset()) + ); + }); + return result; + } + private TopicUriPartition createKafkaTopicUriPartition(TopicRn topicRn, int partition) { return new TopicUriPartition(new KafkaTopicUri(new BaseTopicUri(topicUri.getProtocol(), topicRn)), partition); } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java index a202d4c..7182933 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java @@ -3,7 +3,6 @@ import com.pinterest.psc.common.PscUtils; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.config.PscConfigurationInternal; -import com.pinterest.psc.discovery.ServiceDiscoveryManager; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient; diff --git a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 677c7b2..30fef16 100644 --- a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -20,7 +20,7 @@ public class TestPscMetadataClient { protected static final String testKafkaTopic1 = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1"; @Test - void testGetBackendMetadataClient() throws ConfigurationException, TopicUriSyntaxException, IOException { + void testGetBackendMetadataClient() throws Exception { String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); PscConfiguration pscConfiguration = new PscConfiguration(); pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); @@ -30,14 +30,18 @@ void testGetBackendMetadataClient() throws ConfigurationException, TopicUriSynta PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); assertEquals(PscKafkaMetadataClient.class, backendMetadataClient.getClass()); assertEquals("kafkacluster01001:9092,kafkacluster01002:9092", backendMetadataClient.getDiscoveryConfig().getConnect()); + pscMetadataClient.close(); } @Test void testCreateTopicRn() throws TopicUriSyntaxException { TopicRn topic1Rn = TopicUri.validate(testKafkaTopic1).getTopicRn(); TopicRn topic1RnCreated = MetadataUtils.createTopicRn(TopicUri.validate(testKafkaTopic1), "topic1"); - assertTrue(topic1Rn.equals(topic1RnCreated)); + assertTrue(topic1Rn.equals(topic1RnCreated)); // ensure that equality is implemented correctly + TopicRn topic2Rn = MetadataUtils.createTopicRn(TopicUri.validate(testKafkaTopic1), "topic2"); + + // Ensure that the new topic name is the only difference assertEquals(topic1Rn.getStandard(), topic2Rn.getStandard()); assertEquals(topic1Rn.getService(), topic2Rn.getService()); assertEquals(topic1Rn.getEnvironment(), topic2Rn.getEnvironment()); From 7b83ee8f7d40ec325d82d69b54e7db23fa955380 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 29 Aug 2024 15:03:45 -0400 Subject: [PATCH 4/7] Add test for listConsumerGroupOffsets --- .../client/TestPscMetadataClient.java | 207 ++++++++++++++++-- .../metadata/client/PscMetadataClient.java | 6 + .../client/kafka/PscKafkaMetadataClient.java | 3 +- 3 files changed, 197 insertions(+), 19 deletions(-) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index f3d3b4f..fdc95f8 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -8,6 +8,9 @@ import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.common.kafka.KafkaTopicUri; import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.consumer.PscConsumer; +import com.pinterest.psc.consumer.PscConsumerPollMessageIterator; +import com.pinterest.psc.exception.consumer.ConsumerException; import com.pinterest.psc.exception.producer.ProducerException; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.exception.startup.TopicUriSyntaxException; @@ -16,6 +19,7 @@ import com.pinterest.psc.metadata.TopicRnMetadata; import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; +import com.pinterest.psc.serde.IntegerDeserializer; import com.pinterest.psc.serde.IntegerSerializer; import com.pinterest.psc.utils.PscTestUtils; import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; @@ -27,6 +31,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -36,6 +41,8 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestPscMetadataClient { @@ -53,6 +60,7 @@ public class TestPscMetadataClient { private KafkaCluster kafkaCluster1; private String topicUriStr1, topicUriStr2, topicUriStr3, clusterUriStr; private TopicRn topic1Rn, topic2Rn, topic3Rn; + private TopicUri topic1Uri, topic2Uri, topic3Uri; /** * Initializes two Kafka clusters that are commonly used by all tests, and creates a single topic on each. @@ -84,9 +92,13 @@ public void setup() throws IOException, InterruptedException, TopicUriSyntaxExce PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic2, partitions2); PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic3, partitions3); - topic1Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), topic1); - topic2Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), topic2); - topic3Rn = MetadataUtils.createTopicRn(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr3)), topic3); + topic1Uri = KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)); + topic2Uri = KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)); + topic3Uri = KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr3)); + + topic1Rn = MetadataUtils.createTopicRn(topic1Uri, topic1); + topic2Rn = MetadataUtils.createTopicRn(topic2Uri, topic2); + topic3Rn = MetadataUtils.createTopicRn(topic3Uri, topic3); } /** @@ -149,8 +161,8 @@ public void testDescribeTopicRns() throws Exception { public void testListOffsets() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); Map topicUriPartitionsAndOptions = new HashMap<>(); - topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST); - topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); Map offsets = client.listOffsets( clusterUri, @@ -161,8 +173,8 @@ public void testListOffsets() throws Exception { assertEquals(2, offsets.size()); // ensure that the offsets are 0 when the topic is empty - assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); - assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); + assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); + assertEquals(0, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); // send one message to partition 0 for both topics PscProducer pscProducer = getPscProducer(); @@ -179,14 +191,14 @@ public void testListOffsets() throws Exception { // ensure sent offsets are captured in next metadataClient call assertEquals(2, offsets.size()); - assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); // earliest offset - assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); // latest offset + assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); // earliest offset + assertEquals(1, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); // latest offset // now change the spec to latest for both topics, and add topic1 partitions 5 and 10 to the query - topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); - topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); - topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); - topicUriPartitionsAndOptions.put(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 5), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); + topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 10), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); // send 2 messages to topic1 partition 0 - now the latest offset should be 3 pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0, 0, 0)); @@ -206,18 +218,177 @@ public void testListOffsets() throws Exception { // ensure sent offsets are captured in next metadataClient call assertEquals(4, offsets.size()); - assertEquals(3, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 0)).getOffset()); - assertEquals(1, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 5)).getOffset()); - assertEquals(0, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr1)), 10)).getOffset()); - assertEquals(2, offsets.get(new TopicUriPartition(KafkaTopicUri.validate(BaseTopicUri.validate(topicUriStr2)), 0)).getOffset()); + assertEquals(3, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); + assertEquals(1, offsets.get(new TopicUriPartition(topic1Uri, 5)).getOffset()); + assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 10)).getOffset()); + assertEquals(2, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); client.close(); pscProducer.close(); } @Test - public void testListOffsetsForConsumerGroup() { + public void testListOffsetsForConsumerGroup() throws Exception { + PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); + TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); + + String consumerGroupId = "test-psc-consumer-group"; + PscProducer pscProducer = getPscProducer(); + PscConsumer pscConsumer = getPscConsumer(consumerGroupId); + + sendNMessages(pscProducer, topicUriStr1, 0, 1000); + sendNMessages(pscProducer, topicUriStr1, 1, 1000); + sendNMessages(pscProducer, topicUriStr2, 23, 1000); + sendNMessages(pscProducer, topicUriStr3, 0, 1000); + + TopicUriPartition t1p0 = new TopicUriPartition(topic1Uri, 0); + TopicUriPartition t1p1 = new TopicUriPartition(topic1Uri, 1); + TopicUriPartition t2p23 = new TopicUriPartition(topic2Uri, 23); + TopicUriPartition t3p0 = new TopicUriPartition(topic3Uri, 0); + + List topicUriPartitions = Arrays.asList( + t1p0, + t1p1, + t2p23, + t3p0 + ); + + // assign to t1p0, poll 1 message, commit + pscConsumer.assign(Collections.singleton(t1p0)); + pollNMessages(1, pscConsumer); + pscConsumer.commitSync(); + + Map offsets = client.listOffsetsForConsumerGroup( + clusterUri, + consumerGroupId, + topicUriPartitions, + 10000, + TimeUnit.MILLISECONDS + ); + + assertEquals(4, offsets.size()); + assertTrue(offsets.containsKey(t1p0)); + assertTrue(offsets.containsKey(t1p1)); + assertTrue(offsets.containsKey(t2p23)); + assertTrue(offsets.containsKey(t3p0)); + assertEquals(t1p0, offsets.get(t1p0).getTopicUriPartition()); + assertEquals(1, offsets.get(t1p0).getOffset()); + assertNull(offsets.get(t1p1)); + assertNull(offsets.get(t2p23)); + assertNull(offsets.get(t3p0)); + + // already assigned to t1p0, poll 900 messages, commit - this should set the offset to 901 + pollNMessages(900, pscConsumer); + pscConsumer.commitSync(); + + // query again + offsets = client.listOffsetsForConsumerGroup( + clusterUri, + consumerGroupId, + topicUriPartitions, + 10000, + TimeUnit.MILLISECONDS + ); + + assertEquals(901, offsets.get(t1p0).getOffset()); + assertNull(offsets.get(t1p1)); + assertNull(offsets.get(t2p23)); + assertNull(offsets.get(t3p0)); + + // assign to t1p1, poll 500 messages, commit - this should set the offset to 500 + pscConsumer.unassign(); + pscConsumer.assign(Collections.singleton(t1p1)); + pollNMessages(500, pscConsumer); + pscConsumer.commitSync(); + + // query again + offsets = client.listOffsetsForConsumerGroup( + clusterUri, + consumerGroupId, + topicUriPartitions, + 10000, + TimeUnit.MILLISECONDS + ); + + assertEquals(901, offsets.get(t1p0).getOffset()); + assertEquals(500, offsets.get(t1p1).getOffset()); + assertNull(offsets.get(t2p23)); + assertNull(offsets.get(t3p0)); + + // assign to t2p23, poll 1000 messages, commit - this should set the offset to 1000 + pscConsumer.unassign(); + pscConsumer.assign(Collections.singleton(t2p23)); + pollNMessages(1000, pscConsumer); + pscConsumer.commitSync(); + + // query again + offsets = client.listOffsetsForConsumerGroup( + clusterUri, + consumerGroupId, + topicUriPartitions, + 10000, + TimeUnit.MILLISECONDS + ); + + assertEquals(901, offsets.get(t1p0).getOffset()); + assertEquals(500, offsets.get(t1p1).getOffset()); + assertEquals(1000, offsets.get(t2p23).getOffset()); + assertNull(offsets.get(t3p0)); + + // assign to t3p0, poll 999 messages, commit - this should set the offset to 999 + pscConsumer.unassign(); + pscConsumer.assign(Collections.singleton(t3p0)); + pollNMessages(999, pscConsumer); + pscConsumer.commitSync(); + + // query again + offsets = client.listOffsetsForConsumerGroup( + clusterUri, + consumerGroupId, + topicUriPartitions, + 10000, + TimeUnit.MILLISECONDS + ); + + assertEquals(901, offsets.get(t1p0).getOffset()); + assertEquals(500, offsets.get(t1p1).getOffset()); + assertEquals(1000, offsets.get(t2p23).getOffset()); + assertEquals(999, offsets.get(t3p0).getOffset()); + + pscConsumer.close(); + pscProducer.close(); + client.close(); + } + + private static void pollNMessages(int numberOfMessages, PscConsumer pscConsumer) throws ConsumerException { + int messagesSeen = 0; + while (messagesSeen < numberOfMessages) { + PscConsumerPollMessageIterator it = pscConsumer.poll(); + while (it.hasNext()) { + it.next(); + messagesSeen++; + } + } + } + + private static void sendNMessages(PscProducer producer, String topicUriStr, int partition, int numberOfMessages) throws ConfigurationException, ProducerException { + for (int i = 0; i < numberOfMessages; i++) { + producer.send(new PscProducerMessage<>(topicUriStr, partition, i, i)); + } + producer.flush(); + } + private static PscConsumer getPscConsumer(String groupId) throws ConfigurationException, ConsumerException { + PscConfiguration consumerConfiguration = new PscConfiguration(); + consumerConfiguration.setProperty(PscConfiguration.PSC_METRICS_REPORTER_CLASS, TestUtils.DEFAULT_METRICS_REPORTER); + consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "test-psc-consumer"); + consumerConfiguration.setProperty(PscConfiguration.PSC_CONFIG_LOGGING_ENABLED, "false"); + consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, groupId); + consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST); + consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_KEY_DESERIALIZER, IntegerDeserializer.class.getName()); + consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_VALUE_DESERIALIZER, IntegerDeserializer.class.getName()); + consumerConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_POLL_MESSAGES_MAX, "1"); + return new PscConsumer<>(consumerConfiguration); } private static PscProducer getPscProducer() throws ConfigurationException, ProducerException { diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java index e066acd..32f86cd 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -13,6 +13,7 @@ import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -55,6 +56,11 @@ public Map listOffsets(TopicUri clusterUri, Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); + return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, timeout, timeUnit); + } + @VisibleForTesting protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) { String topicUriPrefix = clusterUri.getTopicUriPrefix(); diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java index 2413602..a8f4270 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -143,9 +143,10 @@ public Map listOffsetsForConsumerGroup( offsets.forEach((tp, offsetAndMetadata) -> { TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); TopicUriPartition tup = createKafkaTopicUriPartition(topicRn, tp.partition()); + MessageId messageId = offsetAndMetadata == null ? null : new MessageId(tup, offsetAndMetadata.offset()); result.put( tup, - new MessageId(tup, offsetAndMetadata.offset()) + messageId ); }); return result; From 201a417d44a986ff64f8d2b1049c9734cafa0c04 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 29 Aug 2024 15:14:39 -0400 Subject: [PATCH 5/7] Try to fix test --- .../metadata/client/TestPscMetadataClient.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 30fef16..8d3ea67 100644 --- a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -14,25 +14,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestPscMetadataClient { protected static final String testKafkaTopic1 = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1"; - @Test - void testGetBackendMetadataClient() throws Exception { - String fallbackDiscoveryFilename = DiscoveryUtil.createTempFallbackFile(); - PscConfiguration pscConfiguration = new PscConfiguration(); - pscConfiguration.addProperty(PscConfiguration.PSC_DISCOVERY_FALLBACK_FILE, fallbackDiscoveryFilename); - pscConfiguration.addProperty(PscConfiguration.PSC_METADATA_CLIENT_ID, "test-metadata-client"); - - PscMetadataClient pscMetadataClient = new PscMetadataClient(pscConfiguration); - PscBackendMetadataClient backendMetadataClient = pscMetadataClient.getBackendMetadataClient(TopicUri.validate(testKafkaTopic1)); - assertEquals(PscKafkaMetadataClient.class, backendMetadataClient.getClass()); - assertEquals("kafkacluster01001:9092,kafkacluster01002:9092", backendMetadataClient.getDiscoveryConfig().getConnect()); - pscMetadataClient.close(); - } - @Test void testCreateTopicRn() throws TopicUriSyntaxException { TopicRn topic1Rn = TopicUri.validate(testKafkaTopic1).getTopicRn(); From f044efe82f524bf946dfb99bec0a7fbf2b9bb998 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 29 Aug 2024 15:42:25 -0400 Subject: [PATCH 6/7] Add javadocs --- .../client/TestPscMetadataClient.java | 60 ++++++++++++++ .../com/pinterest/psc/common/TopicRn.java | 13 --- .../pinterest/psc/metadata/MetadataUtils.java | 3 + .../psc/metadata/TopicRnMetadata.java | 3 + .../client/PscBackendMetadataClient.java | 9 +- .../metadata/client/PscMetadataClient.java | 82 +++++++++++++++++++ .../client/kafka/PscKafkaMetadataClient.java | 3 + .../PscBackendMetadataClientCreator.java | 3 + .../PscKafkaMetadataClientCreator.java | 3 + .../PscMetadataClientCreatorManager.java | 8 ++ .../client/TestPscMetadataClient.java | 4 + 11 files changed, 173 insertions(+), 18 deletions(-) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index fdc95f8..212cee8 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -44,6 +45,9 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +/** + * Tests the functionality and correctness of {@link PscMetadataClient} + */ public class TestPscMetadataClient { @RegisterExtension @@ -116,6 +120,11 @@ public void tearDown() throws ExecutionException, InterruptedException { Thread.sleep(1000); } + /** + * Tests that {@link PscMetadataClient#listTopicRns(TopicUri, long, TimeUnit)} returns the correct list of topic RNs + * + * @throws Exception + */ @Test public void testListTopicRns() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); @@ -125,6 +134,12 @@ public void testListTopicRns() throws Exception { client.close(); } + /** + * Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, long, TimeUnit)} returns the correct + * metadata for the supplied topic RNs + * + * @throws Exception + */ @Test public void testDescribeTopicRns() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); @@ -157,6 +172,12 @@ public void testDescribeTopicRns() throws Exception { client.close(); } + /** + * Tests that {@link PscMetadataClient#listOffsets(TopicUri, Map, long, TimeUnit)} returns the correct offsets for the + * supplied topic partitions and specs + * + * @throws Exception + */ @Test public void testListOffsets() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); @@ -227,6 +248,13 @@ public void testListOffsets() throws Exception { pscProducer.close(); } + /** + * Tests that {@link PscMetadataClient#listOffsetsForConsumerGroup(TopicUri, String, Collection, long, TimeUnit)} returns + * the correct offsets for the supplied consumer group and topic partitions. Also tests correct behavior when supplied + * with edge case scenarios such as non-existent consumerGroupId, non-existent partitions, etc. + * + * @throws Exception + */ @Test public void testListOffsetsForConsumerGroup() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); @@ -355,6 +383,38 @@ public void testListOffsetsForConsumerGroup() throws Exception { assertEquals(1000, offsets.get(t2p23).getOffset()); assertEquals(999, offsets.get(t3p0).getOffset()); + // query a non-existent consumer group + offsets = client.listOffsetsForConsumerGroup( + clusterUri, + "non-existent-consumer-group", + topicUriPartitions, + 10000, + TimeUnit.MILLISECONDS + ); + + assertEquals(4, offsets.size()); + assertTrue(offsets.containsKey(t1p0)); + assertTrue(offsets.containsKey(t1p1)); + assertTrue(offsets.containsKey(t2p23)); + assertTrue(offsets.containsKey(t3p0)); + assertNull(offsets.get(t1p0)); + assertNull(offsets.get(t1p1)); + assertNull(offsets.get(t2p23)); + assertNull(offsets.get(t3p0)); + + // query a non-existent set of partitions + offsets = client.listOffsetsForConsumerGroup( + clusterUri, + consumerGroupId, + Collections.singleton(new TopicUriPartition(topic1Uri, 100)), + 10000, + TimeUnit.MILLISECONDS + ); + + assertEquals(1, offsets.size()); + assertTrue(offsets.containsKey(new TopicUriPartition(topic1Uri, 100))); + assertNull(offsets.get(new TopicUriPartition(topic1Uri, 100))); + pscConsumer.close(); pscProducer.close(); client.close(); diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java index f2ff3d2..27017ff 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java @@ -108,10 +108,6 @@ public TopicRn( ); } - public String getTopicRnString() { - return topicRnString; - } - public String getTopicRnPrefixString() { return topicRnPrefixString; } @@ -176,15 +172,6 @@ private static String upgradeTopicRnToCurrentVersion(String topicRnAsStr, byte s throw new TopicRnSyntaxException(String.format("Unsupported topic RN version %d", serializedVersion)); } - public void setTopic(String topic) { - this.topic = topic; - if (this.topicRnString.endsWith(":")) { - this.topicRnString = this.topicRnString + topic; - } else { - this.topicRnString = this.topicRnString + ":" + topic; - } - } - @Override public int hashCode() { return Objects.hash( diff --git a/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java index cebc229..2cf9a23 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/MetadataUtils.java @@ -4,6 +4,9 @@ import com.pinterest.psc.common.TopicRn; import com.pinterest.psc.common.TopicUri; +/** + * Utility class for common metadata logic + */ public class MetadataUtils { @VisibleForTesting diff --git a/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java b/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java index ab00c5f..c818264 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/TopicRnMetadata.java @@ -5,6 +5,9 @@ import java.util.List; +/** + * Metadata for a {@link TopicRn}, including the list of its partitions + */ public class TopicRnMetadata { private final TopicRn topicRn; diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java index f5a077d..7f9390d 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java @@ -19,6 +19,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +/** + * An abstract class that defines the interface for metadata queries and operations. Specific implementations + * of this class should be created for each backend, such as Kafka, MemQ, etc. + */ public abstract class PscBackendMetadataClient implements AutoCloseable { protected TopicUri topicUri; @@ -62,9 +66,4 @@ public abstract Map listOffsetsForConsumerGroup( protected TopicUri getTopicUri() { return topicUri; } - - @VisibleForTesting - protected ServiceDiscoveryConfig getDiscoveryConfig() { - return discoveryConfig; - } } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java index 32f86cd..3d8fec8 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -22,6 +22,29 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +/** + * A client for metadata queries and operations, similar to the KafkaAdminClient but this is backend-agnostic. + * + * This class is responsible for creating and managing the lifecycle of the backend-specific metadata clients, + * such as the {@link com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient}, all of which should extend + * {@link com.pinterest.psc.metadata.client.PscBackendMetadataClient}. + * + * Each backend-specific metadata client is responsible for handling the actual metadata queries and operations. + * + * The creation of {@link PscBackendMetadataClient} relies on the supplied {@link TopicUri} at the time of + * method call to determine which backend-specific metadata client to create. The creation logic resides in + * the {@link PscMetadataClientCreatorManager} and the {@link PscBackendMetadataClientCreator} implementations. + * This model follows the same pattern as how backend clients are created in {@link com.pinterest.psc.consumer.PscConsumer} + * and {@link com.pinterest.psc.producer.PscProducer}. + * + * As such, each API method in this class will delegate to the appropriate backend-specific metadata client, and must + * accept a {@link TopicUri} to determine which backend-specific metadata client to use. + * + * The {@link TopicUri} supplied to each method does not need to be a full URI including the topic name. Instead, + * it should only include the URI prefix (up to the cluster name), since connecting to the cluster is the only + * information needed to determine which backend-specific metadata client to use. Supplying a full URI including the + * topic name will not cause any issues, but it is unnecessary. + */ public class PscMetadataClient implements AutoCloseable { private PscMetadataClientCreatorManager creatorManager; private Environment environment; @@ -41,21 +64,77 @@ private void initialize() { environment = pscConfigurationInternal.getEnvironment(); } + /** + * List all the {@link TopicRn}'s in the cluster. + * + * @param clusterUri + * @param timeout + * @param timeUnit + * @return the list of {@link TopicRn}'s in the cluster + * @throws ExecutionException + * @throws InterruptedException + * @throws TimeoutException + */ public List listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.listTopicRns(timeout, timeUnit); } + /** + * Describe the metadata for the given {@link TopicRn}'s in the cluster. + * + * @param clusterUri + * @param topicRns + * @param timeout + * @param timeUnit + * @return a map of {@link TopicRn} to {@link TopicRnMetadata} + * @throws ExecutionException + * @throws InterruptedException + * @throws TimeoutException + */ public Map describeTopicRns(TopicUri clusterUri, Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.describeTopicRns(topicRns, timeout, timeUnit); } + /** + * List the offsets for the given {@link TopicUriPartition}'s in the cluster. + * + * For each {@link TopicUriPartition}, the user can specify an {@link MetadataClientOption} to specify whether to + * get the earliest or latest offset. + * + * For example, if the user specifies {@link MetadataClientOption#OFFSET_SPEC_EARLIEST}, the client will return the earliest + * offset for the partition. + * + * @param clusterUri + * @param topicRnsAndOptions + * @param timeout + * @param timeUnit + * @return a map of {@link TopicUriPartition} to {@link MessageId}. The {@link MessageId} will contain the offset but + * not necessarily the timestamp (timestamp might be null or unset) + * @throws ExecutionException + * @throws InterruptedException + * @throws TimeoutException + */ public Map listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.listOffsets(topicRnsAndOptions, timeout, timeUnit); } + /** + * List the offsets for the given {@link TopicUriPartition}'s in the cluster for the given consumer group ID. + * + * @param clusterUri + * @param consumerGroup + * @param topicUriPartitions + * @param timeout + * @param timeUnit + * @return a map of {@link TopicUriPartition} to {@link MessageId}. The {@link MessageId} will contain the offset but + * not necessarily the timestamp (timestamp might be null or unset) + * @throws ExecutionException + * @throws InterruptedException + * @throws TimeoutException + */ public Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, timeout, timeUnit); @@ -83,6 +162,9 @@ public void close() throws Exception { pscBackendMetadataClientByTopicUriPrefix.clear(); } + /** + * An enum to specify the options for the metadata client API's + */ public enum MetadataClientOption { OFFSET_SPEC_EARLIEST, OFFSET_SPEC_LATEST diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java index a8f4270..9965bf2 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -38,6 +38,9 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; +/** + * A Kafka-specific implementation of the {@link PscBackendMetadataClient}. + */ public class PscKafkaMetadataClient extends PscBackendMetadataClient { private static final PscLogger logger = PscLogger.getLogger(PscKafkaMetadataClient.class); diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java index 69be429..12987c2 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java @@ -6,6 +6,9 @@ import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.metadata.client.PscBackendMetadataClient; +/** + * An abstract class that defines the interface for creating a metadata client for a specific backend. + */ public abstract class PscBackendMetadataClientCreator { public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException; diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java index 7182933..74515f6 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java @@ -7,6 +7,9 @@ import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient; +/** + * A class that creates a {@link com.pinterest.psc.metadata.client.PscBackendMetadataClient} for Kafka. + */ @PscMetadataClientCreatorPlugin(backend = PscUtils.BACKEND_TYPE_KAFKA) public class PscKafkaMetadataClientCreator extends PscBackendMetadataClientCreator { diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java index c89ed8a..43adc39 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscMetadataClientCreatorManager.java @@ -7,6 +7,14 @@ import java.util.Map; import java.util.Set; +/** + * Manages the different {@link PscBackendMetadataClientCreator} implementations and provides a registry of them. + * + * This class is responsible for finding and registering the different {@link PscBackendMetadataClientCreator} implementations + * that are annotated with {@link PscMetadataClientCreatorPlugin}. Each backend can have at most one implementation of + * {@link PscBackendMetadataClientCreator} that is returned by this class. To access all the backend creators, keyed by + * the backend name, use {@link #getBackendCreators()}. + */ public class PscMetadataClientCreatorManager { private static final PscLogger logger = PscLogger.getLogger(PscMetadataClientCreatorManager.class); diff --git a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 8d3ea67..802bae0 100644 --- a/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -22,6 +22,10 @@ public class TestPscMetadataClient { protected static final String testKafkaTopic1 = "plaintext:" + TopicUri.SEPARATOR + TopicUri.STANDARD + ":kafka:env:aws_us-west-1::kafkacluster01:topic1"; + /** + * Ensure that {@link TopicRn} creation is correct, and that equality is implemented correctly + * @throws TopicUriSyntaxException + */ @Test void testCreateTopicRn() throws TopicUriSyntaxException { TopicRn topic1Rn = TopicUri.validate(testKafkaTopic1).getTopicRn(); From f03388655f42dc2e7b5c5f029328ae08db97283c Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 11 Sep 2024 14:30:00 -0400 Subject: [PATCH 7/7] Address comments --- .../client/TestPscMetadataClient.java | 92 ++++++++----------- .../client/PscBackendMetadataClient.java | 21 ++--- .../metadata/client/PscMetadataClient.java | 30 +++--- .../client/kafka/PscKafkaMetadataClient.java | 40 ++++---- 4 files changed, 79 insertions(+), 104 deletions(-) diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index 212cee8..507f372 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -30,6 +30,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -39,7 +40,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -121,21 +121,21 @@ public void tearDown() throws ExecutionException, InterruptedException { } /** - * Tests that {@link PscMetadataClient#listTopicRns(TopicUri, long, TimeUnit)} returns the correct list of topic RNs + * Tests that {@link PscMetadataClient#listTopicRns(TopicUri, Duration)} returns the correct list of topic RNs * * @throws Exception */ @Test public void testListTopicRns() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); - List topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), 10000, TimeUnit.MILLISECONDS); + List topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), Duration.ofMillis(10000)); List expectedTopicRnList = Arrays.asList(topic1Rn, topic2Rn, topic3Rn); assertEquals(expectedTopicRnList, topicRnList); client.close(); } /** - * Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, long, TimeUnit)} returns the correct + * Tests that {@link PscMetadataClient#describeTopicRns(TopicUri, java.util.Set, Duration)} returns the correct * metadata for the supplied topic RNs * * @throws Exception @@ -146,8 +146,7 @@ public void testDescribeTopicRns() throws Exception { Map topicRnDescriptionMap = client.describeTopicRns( KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)), - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(3, topicRnDescriptionMap.size()); @@ -173,7 +172,7 @@ public void testDescribeTopicRns() throws Exception { } /** - * Tests that {@link PscMetadataClient#listOffsets(TopicUri, Map, long, TimeUnit)} returns the correct offsets for the + * Tests that {@link PscMetadataClient#listOffsets(TopicUri, Map, Duration)} returns the correct offsets for the * supplied topic partitions and specs * * @throws Exception @@ -185,35 +184,33 @@ public void testListOffsets() throws Exception { topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST); topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); - Map offsets = client.listOffsets( + Map offsets = client.listOffsets( clusterUri, topicUriPartitionsAndOptions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(2, offsets.size()); // ensure that the offsets are 0 when the topic is empty - assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); - assertEquals(0, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 0))); + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic2Uri, 0))); // send one message to partition 0 for both topics PscProducer pscProducer = getPscProducer(); - pscProducer.send(new PscProducerMessage(topicUriStr1, 0,0,0)); - pscProducer.send(new PscProducerMessage(topicUriStr2, 0,0,0)); + pscProducer.send(new PscProducerMessage<>(topicUriStr1, 0,0,0)); + pscProducer.send(new PscProducerMessage<>(topicUriStr2, 0,0,0)); pscProducer.flush(); offsets = client.listOffsets( clusterUri, topicUriPartitionsAndOptions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); // ensure sent offsets are captured in next metadataClient call assertEquals(2, offsets.size()); - assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); // earliest offset - assertEquals(1, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); // latest offset + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 0))); // earliest offset + assertEquals(1, (long) offsets.get(new TopicUriPartition(topic2Uri, 0))); // latest offset // now change the spec to latest for both topics, and add topic1 partitions 5 and 10 to the query topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); @@ -233,23 +230,22 @@ public void testListOffsets() throws Exception { offsets = client.listOffsets( clusterUri, topicUriPartitionsAndOptions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); // ensure sent offsets are captured in next metadataClient call assertEquals(4, offsets.size()); - assertEquals(3, offsets.get(new TopicUriPartition(topic1Uri, 0)).getOffset()); - assertEquals(1, offsets.get(new TopicUriPartition(topic1Uri, 5)).getOffset()); - assertEquals(0, offsets.get(new TopicUriPartition(topic1Uri, 10)).getOffset()); - assertEquals(2, offsets.get(new TopicUriPartition(topic2Uri, 0)).getOffset()); + assertEquals(3, (long) offsets.get(new TopicUriPartition(topic1Uri, 0))); + assertEquals(1, (long) offsets.get(new TopicUriPartition(topic1Uri, 5))); + assertEquals(0, (long) offsets.get(new TopicUriPartition(topic1Uri, 10))); + assertEquals(2, (long) offsets.get(new TopicUriPartition(topic2Uri, 0))); client.close(); pscProducer.close(); } /** - * Tests that {@link PscMetadataClient#listOffsetsForConsumerGroup(TopicUri, String, Collection, long, TimeUnit)} returns + * Tests that {@link PscMetadataClient#listOffsetsForConsumerGroup(TopicUri, String, Collection, Duration)} returns * the correct offsets for the supplied consumer group and topic partitions. Also tests correct behavior when supplied * with edge case scenarios such as non-existent consumerGroupId, non-existent partitions, etc. * @@ -286,12 +282,11 @@ public void testListOffsetsForConsumerGroup() throws Exception { pollNMessages(1, pscConsumer); pscConsumer.commitSync(); - Map offsets = client.listOffsetsForConsumerGroup( + Map offsets = client.listOffsetsForConsumerGroup( clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(4, offsets.size()); @@ -299,8 +294,7 @@ public void testListOffsetsForConsumerGroup() throws Exception { assertTrue(offsets.containsKey(t1p1)); assertTrue(offsets.containsKey(t2p23)); assertTrue(offsets.containsKey(t3p0)); - assertEquals(t1p0, offsets.get(t1p0).getTopicUriPartition()); - assertEquals(1, offsets.get(t1p0).getOffset()); + assertEquals(1, (long) offsets.get(t1p0)); assertNull(offsets.get(t1p1)); assertNull(offsets.get(t2p23)); assertNull(offsets.get(t3p0)); @@ -314,11 +308,10 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); assertNull(offsets.get(t1p1)); assertNull(offsets.get(t2p23)); assertNull(offsets.get(t3p0)); @@ -334,12 +327,11 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); - assertEquals(500, offsets.get(t1p1).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); + assertEquals(500, (long) offsets.get(t1p1)); assertNull(offsets.get(t2p23)); assertNull(offsets.get(t3p0)); @@ -354,13 +346,12 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); - assertEquals(500, offsets.get(t1p1).getOffset()); - assertEquals(1000, offsets.get(t2p23).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); + assertEquals(500, (long) offsets.get(t1p1)); + assertEquals(1000, (long) offsets.get(t2p23)); assertNull(offsets.get(t3p0)); // assign to t3p0, poll 999 messages, commit - this should set the offset to 999 @@ -374,22 +365,20 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); - assertEquals(901, offsets.get(t1p0).getOffset()); - assertEquals(500, offsets.get(t1p1).getOffset()); - assertEquals(1000, offsets.get(t2p23).getOffset()); - assertEquals(999, offsets.get(t3p0).getOffset()); + assertEquals(901, (long) offsets.get(t1p0)); + assertEquals(500, (long) offsets.get(t1p1)); + assertEquals(1000, (long) offsets.get(t2p23)); + assertEquals(999, (long) offsets.get(t3p0)); // query a non-existent consumer group offsets = client.listOffsetsForConsumerGroup( clusterUri, "non-existent-consumer-group", topicUriPartitions, - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(4, offsets.size()); @@ -407,8 +396,7 @@ public void testListOffsetsForConsumerGroup() throws Exception { clusterUri, consumerGroupId, Collections.singleton(new TopicUriPartition(topic1Uri, 100)), - 10000, - TimeUnit.MILLISECONDS + Duration.ofMillis(10000) ); assertEquals(1, offsets.size()); diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java index 7f9390d..acb9d3e 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscBackendMetadataClient.java @@ -12,11 +12,11 @@ import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.metadata.TopicRnMetadata; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -36,28 +36,23 @@ public void initialize(TopicUri topicUri, Environment env, PscConfigurationInter ServiceDiscoveryManager.getServiceDiscoveryConfig(env, pscConfigurationInternal.getDiscoveryConfiguration(), topicUri); } - public abstract List listTopicRns( - long timeout, - TimeUnit timeUnit - ) throws ExecutionException, InterruptedException, TimeoutException; + public abstract List listTopicRns(Duration duration) + throws ExecutionException, InterruptedException, TimeoutException; public abstract Map describeTopicRns( Collection topicRns, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException; - public abstract Map listOffsets( + public abstract Map listOffsets( Map topicRnsAndOptions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException; - public abstract Map listOffsetsForConsumerGroup( + public abstract Map listOffsetsForConsumerGroup( String consumerGroupId, Collection topicUriPartitions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException; public abstract void close() throws Exception; diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java index 3d8fec8..0dcebc3 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -13,13 +13,13 @@ import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -68,16 +68,15 @@ private void initialize() { * List all the {@link TopicRn}'s in the cluster. * * @param clusterUri - * @param timeout - * @param timeUnit + * @param duration * @return the list of {@link TopicRn}'s in the cluster * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException */ - public List listTopicRns(TopicUri clusterUri, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public List listTopicRns(TopicUri clusterUri, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.listTopicRns(timeout, timeUnit); + return backendMetadataClient.listTopicRns(duration); } /** @@ -85,16 +84,15 @@ public List listTopicRns(TopicUri clusterUri, long timeout, TimeUnit ti * * @param clusterUri * @param topicRns - * @param timeout - * @param timeUnit + * @param duration * @return a map of {@link TopicRn} to {@link TopicRnMetadata} * @throws ExecutionException * @throws InterruptedException * @throws TimeoutException */ - public Map describeTopicRns(TopicUri clusterUri, Set topicRns, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map describeTopicRns(TopicUri clusterUri, Set topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.describeTopicRns(topicRns, timeout, timeUnit); + return backendMetadataClient.describeTopicRns(topicRns, duration); } /** @@ -108,17 +106,16 @@ public Map describeTopicRns(TopicUri clusterUri, Set listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.listOffsets(topicRnsAndOptions, timeout, timeUnit); + return backendMetadataClient.listOffsets(topicRnsAndOptions, duration); } /** @@ -127,17 +124,16 @@ public Map listOffsets(TopicUri clusterUri, Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, long timeout, TimeUnit timeUnit) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); - return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, timeout, timeUnit); + return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, duration); } @VisibleForTesting diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java index 9965bf2..2bfa0af 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/kafka/PscKafkaMetadataClient.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -61,23 +62,20 @@ public void initialize( } @Override - public List listTopicRns( - long timeout, - TimeUnit timeUnit - ) throws ExecutionException, InterruptedException, TimeoutException { + public List listTopicRns(Duration duration) + throws ExecutionException, InterruptedException, TimeoutException { ListTopicsResult listTopicsResult = kafkaAdminClient.listTopics(); - Collection topicListing = listTopicsResult.listings().get(timeout, timeUnit); + Collection topicListing = listTopicsResult.listings().get(duration.toMillis(), TimeUnit.MILLISECONDS); return topicListing.stream().map(tl -> MetadataUtils.createTopicRn(topicUri, tl.name())).collect(Collectors.toList()); } @Override public Map describeTopicRns( Collection topicRns, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { Collection topicNames = topicRns.stream().map(TopicRn::getTopic).collect(Collectors.toSet()); - Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(timeout, timeUnit); + Map topicMetadata = kafkaAdminClient.describeTopics(topicNames).all().get(duration.toMillis(), TimeUnit.MILLISECONDS); Map result = new HashMap<>(); for (Map.Entry entry : topicMetadata.entrySet()) { String topicName = entry.getKey(); @@ -95,10 +93,9 @@ public Map describeTopicRns( } @Override - public Map listOffsets( + public Map listOffsets( Map topicUriPartitionsAndOptions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { Map topicPartitionOffsets = new HashMap<>(); for (Map.Entry entry : topicUriPartitionsAndOptions.entrySet()) { @@ -115,25 +112,24 @@ else if (option == PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST) new TopicPartition(topicUriPartition.getTopicUri().getTopic(), topicUriPartition.getPartition()), offsetSpec); } ListOffsetsResult listOffsetsResult = kafkaAdminClient.listOffsets(topicPartitionOffsets); - Map result = new HashMap<>(); - listOffsetsResult.all().get(timeout, timeUnit).entrySet().forEach(e -> { + Map result = new HashMap<>(); + listOffsetsResult.all().get(duration.toMillis(), TimeUnit.MILLISECONDS).entrySet().forEach(e -> { TopicPartition tp = e.getKey(); ListOffsetsResult.ListOffsetsResultInfo info = e.getValue(); TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); result.put( createKafkaTopicUriPartition(topicRn, tp.partition()), - new MessageId(createKafkaTopicUriPartition(topicRn, tp.partition()), info.offset(), info.timestamp()) + info.offset() ); }); return result; } @Override - public Map listOffsetsForConsumerGroup( + public Map listOffsetsForConsumerGroup( String consumerGroupId, Collection topicUriPartitions, - long timeout, - TimeUnit timeUnit + Duration duration ) throws ExecutionException, InterruptedException, TimeoutException { ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions(); options.topicPartitions(topicUriPartitions.stream().map(tup -> @@ -141,15 +137,15 @@ public Map listOffsetsForConsumerGroup( Map offsets = kafkaAdminClient .listConsumerGroupOffsets(consumerGroupId, options) .partitionsToOffsetAndMetadata() - .get(timeout, timeUnit); - Map result = new HashMap<>(); + .get(duration.toMillis(), TimeUnit.MILLISECONDS); + Map result = new HashMap<>(); offsets.forEach((tp, offsetAndMetadata) -> { TopicRn topicRn = MetadataUtils.createTopicRn(topicUri, tp.topic()); TopicUriPartition tup = createKafkaTopicUriPartition(topicRn, tp.partition()); - MessageId messageId = offsetAndMetadata == null ? null : new MessageId(tup, offsetAndMetadata.offset()); + Long offset = offsetAndMetadata == null ? null : offsetAndMetadata.offset(); result.put( - tup, - messageId + createKafkaTopicUriPartition(topicRn, tp.partition()), + offset ); }); return result;