Skip to content

Commit

Permalink
WIP fixing PscSourceeITCase integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Oct 10, 2024
1 parent 4a3a57b commit 19c20c4
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package com.pinterest.flink.connector.psc.source;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer;
import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema;
import com.pinterest.flink.connector.psc.testutils.PscSourceExternalContextFactory;
import com.pinterest.flink.connector.psc.testutils.PscSourceTestEnv;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.common.TopicUriPartition;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.exception.consumer.DeserializerException;
Expand Down Expand Up @@ -58,6 +60,7 @@

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
Expand All @@ -75,19 +78,23 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;

import static com.pinterest.flink.connector.psc.testutils.PscSourceExternalContext.SplitMappingMode.PARTITION;
import static com.pinterest.flink.connector.psc.testutils.PscSourceExternalContext.SplitMappingMode.TOPIC;
import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.putDiscoveryProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

/** Unite test class for {@link PscSource}. */
public class PscSourceITCase {
private static final String TOPIC1 = "topic1";
private static final String TOPIC_URI1 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC1;
private static final String TOPIC2 = "topic2";
private static final String TOPIC_URI2 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC2;

@Nested
@TestInstance(Lifecycle.PER_CLASS)
Expand All @@ -96,9 +103,9 @@ class KafkaSpecificTests {
public void setup() throws Throwable {
PscSourceTestEnv.setup();
PscSourceTestEnv.setupTopic(
TOPIC1, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp);
TOPIC_URI1, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp);
PscSourceTestEnv.setupTopic(
TOPIC2, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp);
TOPIC_URI2, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp);
}

@AfterAll
Expand All @@ -109,21 +116,25 @@ public void tearDown() throws Exception {
@ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
@ValueSource(booleans = {false, true})
public void testTimestamp(boolean enableObjectReuse) throws Throwable {
final String topic =
"testTimestamp-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
final String topicUri =
PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX +
"testTimestamp-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE);
final long currentTimestamp = System.currentTimeMillis();
PscSourceTestEnv.createTestTopic(topic, 1, 1);
PscSourceTestEnv.createTestTopic(topicUri, 1, 1);
PscSourceTestEnv.produceMessages(
Arrays.asList(
new PscProducerMessage<>(topic, 0, "key0", 0,currentTimestamp + 1L),
new PscProducerMessage<>(topic, 0, "key1", 1, currentTimestamp + 2L),
new PscProducerMessage<>(topic, 0, "key2", 2, currentTimestamp + 3L)));

new PscProducerMessage<>(topicUri, 0, "key0", 0,currentTimestamp + 1L),
new PscProducerMessage<>(topicUri, 0, "key1", 1, currentTimestamp + 2L),
new PscProducerMessage<>(topicUri, 0, "key2", 2, currentTimestamp + 3L)));
Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource<PartitionAndValue> source =
PscSource.<PartitionAndValue>builder()
// .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings)
.setGroupId("testTimestampAndWatermark")
.setTopicUris(topic)
.setTopicUris(topicUri)
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(
new TestingPscRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
Expand Down Expand Up @@ -152,11 +163,15 @@ public void testTimestamp(boolean enableObjectReuse) throws Throwable {
@ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
@ValueSource(booleans = {false, true})
public void testBasicRead(boolean enableObjectReuse) throws Exception {
Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource<PartitionAndValue> source =
PscSource.<PartitionAndValue>builder()
// .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings)
.setGroupId("testBasicRead")
.setTopicUris(Arrays.asList(TOPIC1, TOPIC2))
.setTopicUris(Arrays.asList(TOPIC_URI1, TOPIC_URI2))
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(
new TestingPscRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
Expand All @@ -172,11 +187,15 @@ public void testBasicRead(boolean enableObjectReuse) throws Exception {

@Test
public void testValueOnlyDeserializer() throws Exception {
Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource<Integer> source =
PscSource.<Integer>builder()
// .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings)
.setGroupId("testValueOnlyDeserializer")
.setTopicUris(Arrays.asList(TOPIC1, TOPIC2))
.setTopicUris(Arrays.asList(TOPIC_URI1, TOPIC_URI2))
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(
PscRecordDeserializationSchema.valueOnly(
IntegerDeserializer.class))
Expand Down Expand Up @@ -221,11 +240,15 @@ public void testValueOnlyDeserializer() throws Exception {
@ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
@ValueSource(booleans = {false, true})
public void testRedundantParallelism(boolean enableObjectReuse) throws Exception {
Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource<PartitionAndValue> source =
PscSource.<PartitionAndValue>builder()
// .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings)
.setGroupId("testRedundantParallelism")
.setTopicUris(Collections.singletonList(TOPIC1))
.setTopicUris(Collections.singletonList(TOPIC_URI1))
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(
new TestingPscRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
Expand All @@ -246,11 +269,16 @@ public void testRedundantParallelism(boolean enableObjectReuse) throws Exception

@ParameterizedTest(name = "Object reuse in deserializer = {arguments}")
@ValueSource(booleans = {false, true})
@Disabled("Disabled because PscConsumer does not support instantiation without group id")
public void testBasicReadWithoutGroupId(boolean enableObjectReuse) throws Exception {
Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource<PartitionAndValue> source =
PscSource.<PartitionAndValue>builder()
// .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings)
.setTopicUris(Arrays.asList(TOPIC1, TOPIC2))
.setTopicUris(Arrays.asList(TOPIC_URI1, TOPIC_URI2))
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(
new TestingPscRecordDeserializationSchema(enableObjectReuse))
.setStartingOffsets(OffsetsInitializer.earliest())
Expand All @@ -269,7 +297,7 @@ public void testBasicReadWithoutGroupId(boolean enableObjectReuse) throws Except

@Test
public void testPerPartitionWatermark() throws Throwable {
String watermarkTopic = "watermarkTestTopic-" + UUID.randomUUID();
String watermarkTopic = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "watermarkTestTopic-" + UUID.randomUUID();
PscSourceTestEnv.createTestTopic(watermarkTopic, 2, 1);
List<PscProducerMessage<String, Integer>> records =
Arrays.asList(
Expand All @@ -280,11 +308,15 @@ public void testPerPartitionWatermark() throws Throwable {
new PscProducerMessage<>(watermarkTopic, 1, null, 250, 250L),
new PscProducerMessage<>(watermarkTopic, 1, null, 350, 350L));
PscSourceTestEnv.produceMessages(records);
Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource<PartitionAndValue> source =
PscSource.<PartitionAndValue>builder()
// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopicUris(watermarkTopic)
.setGroupId("watermark-test")
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(new TestingPscRecordDeserializationSchema(false))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
Expand Down Expand Up @@ -316,13 +348,17 @@ public void processElement(

@Test
public void testConsumingEmptyTopic() throws Throwable {
String emptyTopic = "emptyTopic-" + UUID.randomUUID();
String emptyTopic = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "emptyTopic-" + UUID.randomUUID();
PscSourceTestEnv.createTestTopic(emptyTopic, 3, 1);
Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
PscSource<PartitionAndValue> source =
PscSource.<PartitionAndValue>builder()
// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopicUris(emptyTopic)
.setGroupId("empty-topic-test")
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(new TestingPscRecordDeserializationSchema(false))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
Expand All @@ -341,7 +377,7 @@ public void testConsumingEmptyTopic() throws Throwable {

@Test
public void testConsumingTopicWithEmptyPartitions() throws Throwable {
String topicWithEmptyPartitions = "topicWithEmptyPartitions-" + UUID.randomUUID();
String topicWithEmptyPartitions = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "topicWithEmptyPartitions-" + UUID.randomUUID();
PscSourceTestEnv.createTestTopic(
topicWithEmptyPartitions, PscSourceTestEnv.NUM_PARTITIONS, 1);
List<PscProducerMessage<String, Integer>> records =
Expand All @@ -354,11 +390,16 @@ public void testConsumingTopicWithEmptyPartitions() throws Throwable {
Collections.singletonList(
new TopicUriPartition(topicWithEmptyPartitions, partitionWithRecords)));

Properties props = new Properties();
putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);

PscSource<PartitionAndValue> source =
PscSource.<PartitionAndValue>builder()
// .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setTopicUris(topicWithEmptyPartitions)
.setGroupId("topic-with-empty-partition-test")
.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)
.setProperties(props)
.setDeserializer(new TestingPscRecordDeserializationSchema(false))
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
Expand Down Expand Up @@ -419,7 +460,7 @@ private static class PartitionAndValue implements Serializable {
public PartitionAndValue() {}

private PartitionAndValue(TopicUriPartition tp, int value) {
this.tp = tp.toString();
this.tp = tp.getTopicUri().getTopic() + "-" + tp.getPartition();
this.value = value;
}
}
Expand All @@ -444,7 +485,8 @@ public void deserialize(
}

if (enableObjectReuse) {
reuse.tp = record.getMessageId().getTopicUriPartition().toString();
reuse.tp = record.getMessageId().getTopicUriPartition().getTopicUri().getTopic() +
"-" + record.getMessageId().getTopicUriPartition().getPartition();
reuse.value = deserializer.deserialize(record.getValue());
collector.collect(reuse);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.consumer.PscConsumerMessage;
import com.pinterest.psc.exception.ClientException;
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.consumer.DeserializerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.serde.ByteArrayDeserializer;
Expand All @@ -45,8 +44,8 @@
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;

import org.apache.flink.util.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
Expand Down Expand Up @@ -206,7 +205,8 @@ public void testNumBytesInCounter() throws Exception {
@ParameterizedTest
@EmptySource
@ValueSource(strings = {"_underscore.period-minus"})
@Disabled("This test is flaky due to metric reporting interval")
@Disabled("This test is flaky due to records-lag metric not present, instead we use records-lag-max in a 30 second window." +
" Concurrency of validations and metric updates in native KafkaConsumer causes flakiness.")
public void testPendingRecordsGauge(String topicSuffix) throws Throwable {
final String topic1UriStr = TOPIC_URI1 + topicSuffix;
final String topic2UriStr = TOPIC_URI2 + topicSuffix;
Expand All @@ -220,6 +220,7 @@ public void testPendingRecordsGauge(String topicSuffix) throws Throwable {
final Properties props = new Properties();
props.setProperty(PscConfiguration.PSC_CONSUMER_POLL_MESSAGES_MAX, "1");
props.setProperty(PscConfiguration.PSC_METRICS_FREQUENCY_MS, "100");
props.setProperty("psc.consumer." + ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "500");
PscTopicUriPartitionSplitReader reader =
createReader(
props,
Expand Down
4 changes: 3 additions & 1 deletion psc/src/main/java/com/pinterest/psc/common/TopicRn.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.pinterest.psc.logging.PscLogger;

import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Matcher;
Expand All @@ -26,8 +27,9 @@
* </ul>
* An example of topic RN would be <code>rn:kafka:env:aws_us-west-1::kafkacluster01:topic01</code>.
*/
public class TopicRn {
public class TopicRn implements Serializable {
private static final PscLogger logger = PscLogger.getLogger(BaseTopicUri.class);
private static final long serialVersionUID = -6081489815985829052L;
protected static byte CURRENT_VERSION = 0;
public static final String STANDARD = getTopicRnStandard();

Expand Down
Loading

0 comments on commit 19c20c4

Please sign in to comment.