From cd5a9306b7a61e2f622dae533b5c907a5b9c89b5 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 10 Oct 2024 18:53:20 -0400 Subject: [PATCH] WIP finish PscSourceITCase, need to look into BaseTopicUri.equals() and whether we can introduce logic in validate() to build the correct subclass of TopicUri --- .../enumerator/PscSourceEnumerator.java | 1 - .../PscTopicUriPartitionSplitReader.java | 6 ++--- .../testutils/PscSourceExternalContext.java | 23 +++++++++++++++---- .../pinterest/psc/common/BaseTopicUri.java | 6 ++++- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java index e69c0aa..db290d2 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/enumerator/PscSourceEnumerator.java @@ -396,7 +396,6 @@ PartitionChange getPartitionChange(Set fetchedPartitions) { removedPartitions.add(tp); } }; - assignedPartitions.forEach(dedupOrMarkAsRemoved); pendingPartitionSplitAssignment.forEach( (reader, splits) -> diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java index 71e65e9..c8e60bd 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReader.java @@ -311,17 +311,17 @@ private void seekToStartingOffsets( Map partitionsStartingFromSpecifiedOffsets) throws ConsumerException { if (!partitionsStartingFromEarliest.isEmpty()) { - LOG.trace("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest); + LOG.info("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest); consumer.seekToBeginning(partitionsStartingFromEarliest); } if (!partitionsStartingFromLatest.isEmpty()) { - LOG.trace("Seeking starting offsets to end: {}", partitionsStartingFromLatest); + LOG.info("Seeking starting offsets to end: {}", partitionsStartingFromLatest); consumer.seekToEnd(partitionsStartingFromLatest); } if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) { - LOG.trace( + LOG.info( "Seeking starting offsets to specified offsets: {}", partitionsStartingFromSpecifiedOffsets); partitionsStartingFromSpecifiedOffsets.forEach((tup, offset) -> { diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java index c9ad063..a8aab90 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscSourceExternalContext.java @@ -22,6 +22,7 @@ import com.pinterest.flink.connector.psc.source.PscSourceBuilder; import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer; import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.serde.ByteArraySerializer; @@ -52,9 +53,10 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.regex.Pattern; +import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.injectDiscoveryConfigs; import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; -/** External context for testing {@link KafkaSource}. */ +/** External context for testing {@link PscSource}. */ public class PscSourceExternalContext implements DataStreamSourceExternalContext { private static final Logger LOG = LoggerFactory.getLogger(PscSourceExternalContext.class); @@ -67,6 +69,7 @@ public class PscSourceExternalContext implements DataStreamSourceExternalContext private final List connectorJarPaths; private final String bootstrapServers; private final String topicName; + private final String topicUriStr; private final SplitMappingMode splitMappingMode; private final AdminClient adminClient; private final List writers = new ArrayList<>(); @@ -78,6 +81,7 @@ protected PscSourceExternalContext( this.connectorJarPaths = connectorJarPaths; this.bootstrapServers = bootstrapServers; this.topicName = randomize(TOPIC_NAME_PREFIX); + this.topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + this.topicName; this.splitMappingMode = splitMappingMode; this.adminClient = createAdminClient(); } @@ -91,8 +95,14 @@ public List getConnectorJarPaths() { public Source createSource(TestingSourceSettings sourceSettings) { final PscSourceBuilder builder = PscSource.builder(); + Properties props = new Properties(); + props.setProperty(PscConfiguration.PSC_AUTO_RESOLUTION_ENABLED, "false"); + injectDiscoveryConfigs(props, bootstrapServers, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + builder // .setBootstrapServers(bootstrapServers) + .setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setTopicUriPattern(TOPIC_NAME_PATTERN) .setGroupId(randomize(GROUP_ID_PREFIX)) .setDeserializer( @@ -182,16 +192,18 @@ private AdminClient createAdminClient() { private PscTopicUriPartitionDataWriter createSinglePartitionTopic(int topicIndex) throws Exception { String newTopicName = topicName + "-" + topicIndex; + String newTopicUriStr = topicUriStr + "-" + topicIndex; LOG.info("Creating topic '{}'", newTopicName); adminClient .createTopics(Collections.singletonList(new NewTopic(newTopicName, 1, (short) 1))) .all() .get(); return new PscTopicUriPartitionDataWriter( - getPscProducerProperties(topicIndex), new TopicUriPartition(newTopicName, 0)); + getPscProducerProperties(topicIndex), new TopicUriPartition(newTopicUriStr, 0)); } private PscTopicUriPartitionDataWriter scaleOutTopic(String topicName) throws Exception { + final String topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topicName; final Set topics = adminClient.listTopics().names().get(); if (topics.contains(topicName)) { final Map topicDescriptions = @@ -206,7 +218,7 @@ private PscTopicUriPartitionDataWriter scaleOutTopic(String topicName) throws Ex .get(); return new PscTopicUriPartitionDataWriter( getPscProducerProperties(numPartitions), - new TopicUriPartition(topicName, numPartitions)); + new TopicUriPartition(topicUriStr, numPartitions)); } else { LOG.info("Creating topic '{}'", topicName); adminClient @@ -214,7 +226,7 @@ private PscTopicUriPartitionDataWriter scaleOutTopic(String topicName) throws Ex .all() .get(); return new PscTopicUriPartitionDataWriter( - getPscProducerProperties(0), new TopicUriPartition(topicName, 0)); + getPscProducerProperties(0), new TopicUriPartition(topicUriStr, 0)); } } @@ -226,13 +238,14 @@ private Properties getPscProducerProperties(int producerId) { PscConfiguration.PSC_PRODUCER_CLIENT_ID, String.join( "-", - "flink-kafka-split-writer", + "flink-psc-split-writer", Integer.toString(producerId), Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)))); pscProducerProperties.setProperty( PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName()); pscProducerProperties.setProperty( PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName()); + injectDiscoveryConfigs(pscProducerProperties, bootstrapServers, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); return pscProducerProperties; } diff --git a/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java b/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java index 64a111e..08f3d25 100644 --- a/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java +++ b/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java @@ -130,9 +130,13 @@ public boolean equals(Object other) { if (this == other) { return true; } - if (other == null || getClass() != other.getClass()) { + if (other == null) { return false; } + if (getClass() != other.getClass()) { + if (!(other instanceof TopicUri)) // this allows for comparison with other implementations of TopicUri + return false; + } BaseTopicUri otherBaseTopicUri = (BaseTopicUri) other; return PscCommon.equals(topicUriAsString, otherBaseTopicUri.topicUriAsString) && PscCommon.equals(protocol, otherBaseTopicUri.protocol) &&