Skip to content

Commit

Permalink
Make PscMetadataClient convert BaseTopicUri to backend-specific topicUri
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 29, 2024
1 parent e42732c commit 0a1bded
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ public Map<TopicUriPartition, Long> committedOffsets(Collection<TopicUriPartitio
+ groupId
+ " due to",
e);
} catch (TopicUriSyntaxException e) {
throw new FlinkRuntimeException(e);
}
}

Expand Down Expand Up @@ -559,6 +561,8 @@ private Map<TopicUriPartition, Long> listOffsets(
+ topicPartitionOffsets
+ " due to",
e);
} catch (TopicUriSyntaxException e) {
throw new FlinkRuntimeException(e);
}
}

Expand All @@ -579,6 +583,8 @@ private Map<TopicUriPartition, Long> listOffsetsForTimestamps(
+ topicUriPartitionsAndTimestamps
+ " due to",
e);
} catch (TopicUriSyntaxException e) {
throw new FlinkRuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.pinterest.psc.consumer.PscConsumer;
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metadata.client.PscMetadataClient;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.serde.IntegerDeserializer;
Expand Down Expand Up @@ -255,7 +256,7 @@ public static void setupEarliestOffsets(List<TopicUriPartition> partitions) thro
}

public static void setupCommittedOffsets(String topicUriString)
throws ExecutionException, InterruptedException, ConfigurationException, ConsumerException, TimeoutException {
throws ExecutionException, InterruptedException, ConfigurationException, ConsumerException, TimeoutException, TopicUriSyntaxException {
List<TopicUriPartition> partitions = getPartitionsForTopic(topicUriString);
Map<TopicUriPartition, Long> committedOffsets = getCommittedOffsets(partitions);
List<MessageId> toCommit = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void tearDown() throws ExecutionException, InterruptedException {
@Test
public void testListTopicRns() throws Exception {
PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration);
List<TopicRn> topicRnList = client.listTopicRns(KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)), Duration.ofMillis(10000));
List<TopicRn> topicRnList = client.listTopicRns(BaseTopicUri.validate(clusterUriStr), Duration.ofMillis(10000));
List<TopicRn> expectedTopicRnList = Arrays.asList(topic1Rn, topic2Rn, topic3Rn);
assertEquals(expectedTopicRnList, topicRnList);
client.close();
Expand All @@ -144,7 +144,7 @@ public void testListTopicRns() throws Exception {
public void testDescribeTopicRns() throws Exception {
PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration);
Map<TopicRn, TopicRnMetadata> topicRnDescriptionMap = client.describeTopicRns(
KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr)),
BaseTopicUri.validate(clusterUriStr),
new HashSet<>(Arrays.asList(topic1Rn, topic2Rn, topic3Rn)),
Duration.ofMillis(10000)
);
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testListOffsets() throws Exception {
Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicUriPartitionsAndOptions = new HashMap<>();
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic1Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_EARLIEST);
topicUriPartitionsAndOptions.put(new TopicUriPartition(topic2Uri, 0), PscMetadataClient.MetadataClientOption.OFFSET_SPEC_LATEST);
TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr));
TopicUri clusterUri = BaseTopicUri.validate(clusterUriStr);
Map<TopicUriPartition, Long> offsets = client.listOffsets(
clusterUri,
topicUriPartitionsAndOptions,
Expand Down Expand Up @@ -254,7 +254,7 @@ public void testListOffsets() throws Exception {
@Test
public void testListOffsetsForConsumerGroup() throws Exception {
PscMetadataClient client = new PscMetadataClient(metadataClientConfiguration);
TopicUri clusterUri = KafkaTopicUri.validate(BaseTopicUri.validate(clusterUriStr));
TopicUri clusterUri = BaseTopicUri.validate(clusterUriStr);

String consumerGroupId = "test-psc-consumer-group";
PscProducer<Integer, Integer> pscProducer = getPscProducer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.environment.Environment;
import com.pinterest.psc.exception.ExceptionMessage;
import com.pinterest.psc.exception.producer.ProducerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.PscStartupException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metadata.TopicRnMetadata;
import com.pinterest.psc.metadata.creation.PscBackendMetadataClientCreator;
import com.pinterest.psc.metadata.creation.PscMetadataClientCreatorManager;
import com.pinterest.psc.producer.creation.PscBackendProducerCreator;

import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -75,7 +80,7 @@ private void initialize() {
* @throws InterruptedException
* @throws TimeoutException
*/
public List<TopicRn> listTopicRns(TopicUri clusterUri, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
public List<TopicRn> listTopicRns(TopicUri clusterUri, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listTopicRns(duration);
}
Expand All @@ -91,7 +96,7 @@ public List<TopicRn> listTopicRns(TopicUri clusterUri, Duration duration) throws
* @throws InterruptedException
* @throws TimeoutException
*/
public Map<TopicRn, TopicRnMetadata> describeTopicRns(TopicUri clusterUri, Set<TopicRn> topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
public Map<TopicRn, TopicRnMetadata> describeTopicRns(TopicUri clusterUri, Set<TopicRn> topicRns, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.describeTopicRns(topicRns, duration);
}
Expand All @@ -114,7 +119,7 @@ public Map<TopicRn, TopicRnMetadata> describeTopicRns(TopicUri clusterUri, Set<T
* @throws InterruptedException
* @throws TimeoutException
*/
public Map<TopicUriPartition, Long> listOffsets(TopicUri clusterUri, Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicRnsAndOptions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
public Map<TopicUriPartition, Long> listOffsets(TopicUri clusterUri, Map<TopicUriPartition, PscMetadataClient.MetadataClientOption> topicRnsAndOptions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listOffsets(topicRnsAndOptions, duration);
}
Expand All @@ -132,30 +137,43 @@ public Map<TopicUriPartition, Long> listOffsets(TopicUri clusterUri, Map<TopicUr
* @throws InterruptedException
* @throws TimeoutException
*/
public Map<TopicUriPartition, Long> listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection<TopicUriPartition> topicUriPartitions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
public Map<TopicUriPartition, Long> listOffsetsForConsumerGroup(TopicUri clusterUri, String consumerGroup, Collection<TopicUriPartition> topicUriPartitions, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listOffsetsForConsumerGroup(consumerGroup, topicUriPartitions, duration);
}

public Map<TopicUriPartition, Long> listOffsetsForTimestamps(TopicUri clusterUri, Map<TopicUriPartition, Long> topicUriPartitionsAndTimes, Duration duration) throws ExecutionException, InterruptedException, TimeoutException {
public Map<TopicUriPartition, Long> listOffsetsForTimestamps(TopicUri clusterUri, Map<TopicUriPartition, Long> topicUriPartitionsAndTimes, Duration duration) throws ExecutionException, InterruptedException, TimeoutException, TopicUriSyntaxException {
PscBackendMetadataClient backendMetadataClient = getBackendMetadataClient(clusterUri);
return backendMetadataClient.listOffsetsForTimestamps(topicUriPartitionsAndTimes, duration);
}

@VisibleForTesting
protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) {
String topicUriPrefix = clusterUri.getTopicUriPrefix();
protected PscBackendMetadataClient getBackendMetadataClient(TopicUri clusterUri) throws TopicUriSyntaxException {
TopicUri convertedClusterUri = validateTopicUri(clusterUri);
String topicUriPrefix = convertedClusterUri.getTopicUriPrefix();
pscBackendMetadataClientByTopicUriPrefix.computeIfAbsent(topicUriPrefix, k -> {
PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(clusterUri.getBackend());
PscBackendMetadataClientCreator backendMetadataClientCreator = creatorManager.getBackendCreators().get(convertedClusterUri.getBackend());
try {
return backendMetadataClientCreator.create(environment, pscConfigurationInternal, clusterUri);
return backendMetadataClientCreator.create(environment, pscConfigurationInternal, convertedClusterUri);
} catch (ConfigurationException e) {
throw new RuntimeException(e);
}
});
return pscBackendMetadataClientByTopicUriPrefix.get(topicUriPrefix);
}

private TopicUri validateTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
if (topicUri == null)
throw new IllegalArgumentException("Null topic URI was passed to the producer API.");

Map<String, PscBackendMetadataClientCreator> backendCreators = creatorManager.getBackendCreators();
String backend = topicUri.getBackend();
if (!backendCreators.containsKey(backend))
throw new IllegalArgumentException("Invalid backend type: " + backend);
topicUri = backendCreators.get(backend).validateBackendTopicUri(topicUri);
return topicUri;
}

@Override
public void close() throws IOException {
for (PscBackendMetadataClient client : pscBackendMetadataClientByTopicUriPrefix.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.environment.Environment;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metadata.client.PscBackendMetadataClient;

/**
Expand All @@ -12,5 +13,6 @@
public abstract class PscBackendMetadataClientCreator {

public abstract PscBackendMetadataClient create(Environment env, PscConfigurationInternal pscConfigurationInternal, TopicUri clusterUri) throws ConfigurationException;
public abstract TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import com.pinterest.psc.common.PscUtils;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.kafka.KafkaTopicUri;
import com.pinterest.psc.config.PscConfigurationInternal;
import com.pinterest.psc.environment.Environment;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.exception.startup.TopicUriSyntaxException;
import com.pinterest.psc.metadata.client.kafka.PscKafkaMetadataClient;

/**
Expand All @@ -23,4 +25,9 @@ public PscKafkaMetadataClient create(Environment env, PscConfigurationInternal p
);
return pscKafkaMetadataClient;
}

@Override
public TopicUri validateBackendTopicUri(TopicUri topicUri) throws TopicUriSyntaxException {
return KafkaTopicUri.validate(topicUri);
}
}

0 comments on commit 0a1bded

Please sign in to comment.