From 0a1bded706b9ede7cea780d9b12e72707f8541a6 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Tue, 29 Oct 2024 16:13:00 -0400 Subject: [PATCH] Make PscMetadataClient convert BaseTopicUri to backend-specific topicUri --- .../enumerator/PscSourceEnumerator.java | 6 ++++ .../psc/testutils/PscSourceTestEnv.java | 3 +- .../client/TestPscMetadataClient.java | 8 ++--- .../metadata/client/PscMetadataClient.java | 36 ++++++++++++++----- .../PscBackendMetadataClientCreator.java | 2 ++ .../PscKafkaMetadataClientCreator.java | 7 ++++ 6 files changed, 48 insertions(+), 14 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java index db290d2..dc28b10 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java @@ -530,6 +530,8 @@ public Map committedOffsets(Collection listOffsets( + topicPartitionOffsets + " due to", e); + } catch (TopicUriSyntaxException e) { + throw new FlinkRuntimeException(e); } } @@ -579,6 +583,8 @@ private Map listOffsetsForTimestamps( + topicUriPartitionsAndTimestamps + " due to", e); + } catch (TopicUriSyntaxException e) { + throw new FlinkRuntimeException(e); } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java index be6be07..6e6bc7b 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceTestEnv.java @@ -28,6 +28,7 @@ import com.pinterest.psc.consumer.PscConsumer; import com.pinterest.psc.exception.consumer.ConsumerException; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.metadata.client.PscMetadataClient; import com.pinterest.psc.producer.PscProducerMessage; import com.pinterest.psc.serde.IntegerDeserializer; @@ -255,7 +256,7 @@ public static void setupEarliestOffsets(List partitions) thro } public static void setupCommittedOffsets(String topicUriString) - throws ExecutionException, InterruptedException, ConfigurationException, ConsumerException, TimeoutException { + throws ExecutionException, InterruptedException, ConfigurationException, ConsumerException, TimeoutException, TopicUriSyntaxException { List partitions = getPartitionsForTopic(topicUriString); Map committedOffsets = getCommittedOffsets(partitions); List toCommit = new ArrayList<>(); diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java index bc9e294..231ac96 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/metadata/client/TestPscMetadataClient.java @@ -128,7 +128,7 @@ public void tearDown() throws ExecutionException, InterruptedException { @Test public void testListTopicRns() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); - List topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), Duration.ofMillis(10000)); + List topicRnList = client.listTopicRns(BaseTopicUri.validate(clusterUriStr), Duration.ofMillis(10000)); List expectedTopicRnList = Arrays.asList(topic1Rn, topic2Rn, topic3Rn); assertEquals(expectedTopicRnList, topicRnList); client.close(); @@ -144,7 +144,7 @@ public void testListTopicRns() throws Exception { public void testDescribeTopicRns() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); Map topicRnDescriptionMap = client.describeTopicRns( - KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), + BaseTopicUri.validate(clusterUriStr), new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)), Duration.ofMillis(10000) ); @@ -183,7 +183,7 @@ public void testListOffsets() throws Exception { Map topicUriPartitionsAndOptions = new HashMap<>(); topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST); topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST); - TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); + TopicUri clusterUri = BaseTopicUri.validate(clusterUriStr); Map offsets = client.listOffsets( clusterUri, topicUriPartitionsAndOptions, @@ -254,7 +254,7 @@ public void testListOffsets() throws Exception { @Test public void testListOffsetsForConsumerGroup() throws Exception { PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration); - TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)); + TopicUri clusterUri = BaseTopicUri.validate(clusterUriStr); String consumerGroupId = "test-psc-consumer-group"; PscProducer pscProducer = getPscProducer(); diff --git a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java index 2212654..a0af4a1 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/client/PscMetadataClient.java @@ -8,10 +8,15 @@ import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.environment.Environment; +import com.pinterest.psc.exception.ExceptionMessage; +import com.pinterest.psc.exception.producer.ProducerException; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.PscStartupException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.metadata.TopicRnMetadata; import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator; import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager; +import com.pinterest.psc.producer.creation.PscBackendProducerCreator; import java.io.IOException; import java.time.Duration; @@ -75,7 +80,7 @@ private void initialize() { * @throws InterruptedException * @throws TimeoutException */ - public List listTopicRns(TopicUri clusterUri, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { + public List listTopicRns(TopicUri clusterUri, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.listTopicRns(duration); } @@ -91,7 +96,7 @@ public List listTopicRns(TopicUri clusterUri, Duration duration) throws * @throws InterruptedException * @throws TimeoutException */ - public Map describeTopicRns(TopicUri clusterUri, Set topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { + public Map describeTopicRns(TopicUri clusterUri, Set topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.describeTopicRns(topicRns, duration); } @@ -114,7 +119,7 @@ public Map describeTopicRns(TopicUri clusterUri, Set listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsets(TopicUri clusterUri, Map topicRnsAndOptions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.listOffsets(topicRnsAndOptions, duration); } @@ -132,23 +137,24 @@ public Map listOffsets(TopicUri clusterUri, Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection topicUriPartitions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, duration); } - public Map listOffsetsForTimestamps(TopicUri clusterUri, Map topicUriPartitionsAndTimes, Duration duration) throws ExecutionException, InterruptedException, TimeoutException { + public Map listOffsetsForTimestamps(TopicUri clusterUri, Map topicUriPartitionsAndTimes, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException { PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri); return backendMetadataClient.listOffsetsForTimestamps(topicUriPartitionsAndTimes, duration); } @VisibleForTesting - protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) { - String topicUriPrefix = clusterUri.getTopicUriPrefix(); + protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) throws TopicUriSyntaxException { + TopicUri convertedClusterUri = validateTopicUri(clusterUri); + String topicUriPrefix = convertedClusterUri.getTopicUriPrefix(); pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> { - PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(clusterUri.getBackend()); + PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(convertedClusterUri.getBackend()); try { - return backendMetadataClientCreator.create(environment, pscConfigurationInternal, clusterUri); + return backendMetadataClientCreator.create(environment, pscConfigurationInternal, convertedClusterUri); } catch (ConfigurationException e) { throw new RuntimeException(e); } @@ -156,6 +162,18 @@ protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix); } + private TopicUri validateTopicUri(TopicUri topicUri) throws TopicUriSyntaxException { + if (topicUri == null) + throw new IllegalArgumentException("Null topic URI was passed to the producer API."); + + Map backendCreators = creatorManager.getBackendCreators(); + String backend = topicUri.getBackend(); + if (!backendCreators.containsKey(backend)) + throw new IllegalArgumentException("Invalid backend type: " + backend); + topicUri = backendCreators.get(backend).validateBackendTopicUri(topicUri); + return topicUri; + } + @Override public void close() throws IOException { for (PscBackendMetadataClient client : pscBackendMetadataClientByTopicUriPrefix.values()) { diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java index 12987c2..82e16ab 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscBackendMetadataClientCreator.java @@ -4,6 +4,7 @@ import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.metadata.client.PscBackendMetadataClient; /** @@ -12,5 +13,6 @@ public abstract class PscBackendMetadataClientCreator { public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException; + public abstract TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException; } diff --git a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java index 74515f6..774e534 100644 --- a/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java +++ b/psc/src/main/java/com/pinterest/psc/metadata/creation/PscKafkaMetadataClientCreator.java @@ -2,9 +2,11 @@ import com.pinterest.psc.common.PscUtils; import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.kafka.KafkaTopicUri; import com.pinterest.psc.config.PscConfigurationInternal; import com.pinterest.psc.environment.Environment; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient; /** @@ -23,4 +25,9 @@ public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal p ); return pscKafkaMetadataClient; } + + @Override + public TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException { + return KafkaTopicUri.validate(topicUri); + } }