diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java index 3bd5eec..9b6c7ee 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/PscSourceITCase.java @@ -18,10 +18,12 @@ package com.pinterest.flink.connector.psc.source; +import com.pinterest.flink.connector.psc.PscFlinkConfiguration; import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer; import com.pinterest.flink.connector.psc.source.reader.deserializer.PscRecordDeserializationSchema; import com.pinterest.flink.connector.psc.testutils.PscSourceExternalContextFactory; import com.pinterest.flink.connector.psc.testutils.PscSourceTestEnv; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.exception.consumer.DeserializerException; @@ -58,6 +60,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @@ -75,19 +78,23 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import static com.pinterest.flink.connector.psc.testutils.PscSourceExternalContext.SplitMappingMode.PARTITION; import static com.pinterest.flink.connector.psc.testutils.PscSourceExternalContext.SplitMappingMode.TOPIC; +import static com.pinterest.flink.connector.psc.testutils.PscTestUtils.putDiscoveryProperties; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; /** Unite test class for {@link PscSource}. */ public class PscSourceITCase { private static final String TOPIC1 = "topic1"; + private static final String TOPIC_URI1 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC1; private static final String TOPIC2 = "topic2"; + private static final String TOPIC_URI2 = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + TOPIC2; @Nested @TestInstance(Lifecycle.PER_CLASS) @@ -96,9 +103,9 @@ class KafkaSpecificTests { public void setup() throws Throwable { PscSourceTestEnv.setup(); PscSourceTestEnv.setupTopic( - TOPIC1, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp); + TOPIC_URI1, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp); PscSourceTestEnv.setupTopic( - TOPIC2, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp); + TOPIC_URI2, true, true, PscSourceTestEnv::getRecordsForTopicWithoutTimestamp); } @AfterAll @@ -109,21 +116,25 @@ public void tearDown() throws Exception { @ParameterizedTest(name = "Object reuse in deserializer = {arguments}") @ValueSource(booleans = {false, true}) public void testTimestamp(boolean enableObjectReuse) throws Throwable { - final String topic = - "testTimestamp-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE); + final String topicUri = + PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + + "testTimestamp-" + ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE); final long currentTimestamp = System.currentTimeMillis(); - PscSourceTestEnv.createTestTopic(topic, 1, 1); + PscSourceTestEnv.createTestTopic(topicUri, 1, 1); PscSourceTestEnv.produceMessages( Arrays.asList( - new PscProducerMessage<>(topic, 0, "key0", 0,currentTimestamp + 1L), - new PscProducerMessage<>(topic, 0, "key1", 1, currentTimestamp + 2L), - new PscProducerMessage<>(topic, 0, "key2", 2, currentTimestamp + 3L))); - + new PscProducerMessage<>(topicUri, 0, "key0", 0,currentTimestamp + 1L), + new PscProducerMessage<>(topicUri, 0, "key1", 1, currentTimestamp + 2L), + new PscProducerMessage<>(topicUri, 0, "key2", 2, currentTimestamp + 3L))); + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() // .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings) .setGroupId("testTimestampAndWatermark") - .setTopicUris(topic) + .setTopicUris(topicUri) + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer( new TestingPscRecordDeserializationSchema(enableObjectReuse)) .setStartingOffsets(OffsetsInitializer.earliest()) @@ -152,11 +163,15 @@ public void testTimestamp(boolean enableObjectReuse) throws Throwable { @ParameterizedTest(name = "Object reuse in deserializer = {arguments}") @ValueSource(booleans = {false, true}) public void testBasicRead(boolean enableObjectReuse) throws Exception { + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() // .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings) .setGroupId("testBasicRead") - .setTopicUris(Arrays.asList(TOPIC1, TOPIC2)) + .setTopicUris(Arrays.asList(TOPIC_URI1, TOPIC_URI2)) + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer( new TestingPscRecordDeserializationSchema(enableObjectReuse)) .setStartingOffsets(OffsetsInitializer.earliest()) @@ -172,11 +187,15 @@ public void testBasicRead(boolean enableObjectReuse) throws Exception { @Test public void testValueOnlyDeserializer() throws Exception { + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() // .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings) .setGroupId("testValueOnlyDeserializer") - .setTopicUris(Arrays.asList(TOPIC1, TOPIC2)) + .setTopicUris(Arrays.asList(TOPIC_URI1, TOPIC_URI2)) + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer( PscRecordDeserializationSchema.valueOnly( IntegerDeserializer.class)) @@ -221,11 +240,15 @@ public void testValueOnlyDeserializer() throws Exception { @ParameterizedTest(name = "Object reuse in deserializer = {arguments}") @ValueSource(booleans = {false, true}) public void testRedundantParallelism(boolean enableObjectReuse) throws Exception { + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() // .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings) .setGroupId("testRedundantParallelism") - .setTopicUris(Collections.singletonList(TOPIC1)) + .setTopicUris(Collections.singletonList(TOPIC_URI1)) + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer( new TestingPscRecordDeserializationSchema(enableObjectReuse)) .setStartingOffsets(OffsetsInitializer.earliest()) @@ -246,11 +269,16 @@ public void testRedundantParallelism(boolean enableObjectReuse) throws Exception @ParameterizedTest(name = "Object reuse in deserializer = {arguments}") @ValueSource(booleans = {false, true}) + @Disabled("Disabled because PscConsumer does not support instantiation without group id") public void testBasicReadWithoutGroupId(boolean enableObjectReuse) throws Exception { + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() // .setBootstrapServers(PscSourceTestEnv.brokerConnectionStrings) - .setTopicUris(Arrays.asList(TOPIC1, TOPIC2)) + .setTopicUris(Arrays.asList(TOPIC_URI1, TOPIC_URI2)) + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer( new TestingPscRecordDeserializationSchema(enableObjectReuse)) .setStartingOffsets(OffsetsInitializer.earliest()) @@ -269,7 +297,7 @@ public void testBasicReadWithoutGroupId(boolean enableObjectReuse) throws Except @Test public void testPerPartitionWatermark() throws Throwable { - String watermarkTopic = "watermarkTestTopic-" + UUID.randomUUID(); + String watermarkTopic = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "watermarkTestTopic-" + UUID.randomUUID(); PscSourceTestEnv.createTestTopic(watermarkTopic, 2, 1); List> records = Arrays.asList( @@ -280,11 +308,15 @@ public void testPerPartitionWatermark() throws Throwable { new PscProducerMessage<>(watermarkTopic, 1, null, 250, 250L), new PscProducerMessage<>(watermarkTopic, 1, null, 350, 350L)); PscSourceTestEnv.produceMessages(records); + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() // .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) .setTopicUris(watermarkTopic) .setGroupId("watermark-test") + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer(new TestingPscRecordDeserializationSchema(false)) .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) @@ -316,13 +348,17 @@ public void processElement( @Test public void testConsumingEmptyTopic() throws Throwable { - String emptyTopic = "emptyTopic-" + UUID.randomUUID(); + String emptyTopic = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "emptyTopic-" + UUID.randomUUID(); PscSourceTestEnv.createTestTopic(emptyTopic, 3, 1); + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); PscSource source = PscSource.builder() // .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) .setTopicUris(emptyTopic) .setGroupId("empty-topic-test") + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer(new TestingPscRecordDeserializationSchema(false)) .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) @@ -341,7 +377,7 @@ public void testConsumingEmptyTopic() throws Throwable { @Test public void testConsumingTopicWithEmptyPartitions() throws Throwable { - String topicWithEmptyPartitions = "topicWithEmptyPartitions-" + UUID.randomUUID(); + String topicWithEmptyPartitions = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "topicWithEmptyPartitions-" + UUID.randomUUID(); PscSourceTestEnv.createTestTopic( topicWithEmptyPartitions, PscSourceTestEnv.NUM_PARTITIONS, 1); List> records = @@ -354,11 +390,16 @@ public void testConsumingTopicWithEmptyPartitions() throws Throwable { Collections.singletonList( new TopicUriPartition(topicWithEmptyPartitions, partitionWithRecords))); + Properties props = new Properties(); + putDiscoveryProperties(props, PscSourceTestEnv.getBrokerConnectionStrings(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + PscSource source = PscSource.builder() // .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings) .setTopicUris(topicWithEmptyPartitions) .setGroupId("topic-with-empty-partition-test") + .setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .setProperties(props) .setDeserializer(new TestingPscRecordDeserializationSchema(false)) .setStartingOffsets(OffsetsInitializer.earliest()) .setBounded(OffsetsInitializer.latest()) @@ -419,7 +460,7 @@ private static class PartitionAndValue implements Serializable { public PartitionAndValue() {} private PartitionAndValue(TopicUriPartition tp, int value) { - this.tp = tp.toString(); + this.tp = tp.getTopicUri().getTopic() + "-" + tp.getPartition(); this.value = value; } } @@ -444,7 +485,8 @@ public void deserialize( } if (enableObjectReuse) { - reuse.tp = record.getMessageId().getTopicUriPartition().toString(); + reuse.tp = record.getMessageId().getTopicUriPartition().getTopicUri().getTopic() + + "-" + record.getMessageId().getTopicUriPartition().getPartition(); reuse.value = deserializer.deserialize(record.getValue()); collector.collect(reuse); } else { diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java index e412189..94cf4cf 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscTopicUriPartitionSplitReaderTest.java @@ -26,7 +26,6 @@ import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.exception.ClientException; -import com.pinterest.psc.exception.consumer.ConsumerException; import com.pinterest.psc.exception.consumer.DeserializerException; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.serde.ByteArrayDeserializer; @@ -45,8 +44,8 @@ import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; - import org.apache.flink.util.ExceptionUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.hamcrest.CoreMatchers; import org.hamcrest.MatcherAssert; @@ -206,7 +205,8 @@ public void testNumBytesInCounter() throws Exception { @ParameterizedTest @EmptySource @ValueSource(strings = {"_underscore.period-minus"}) - @Disabled("This test is flaky due to metric reporting interval") + @Disabled("This test is flaky due to records-lag metric not present, instead we use records-lag-max in a 30 second window." + + " Concurrency of validations and metric updates in native KafkaConsumer causes flakiness.") public void testPendingRecordsGauge(String topicSuffix) throws Throwable { final String topic1UriStr = TOPIC_URI1 + topicSuffix; final String topic2UriStr = TOPIC_URI2 + topicSuffix; @@ -220,6 +220,7 @@ public void testPendingRecordsGauge(String topicSuffix) throws Throwable { final Properties props = new Properties(); props.setProperty(PscConfiguration.PSC_CONSUMER_POLL_MESSAGES_MAX, "1"); props.setProperty(PscConfiguration.PSC_METRICS_FREQUENCY_MS, "100"); + props.setProperty("psc.consumer." + ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG, "500"); PscTopicUriPartitionSplitReader reader = createReader( props, diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java index 27017ff..812edee 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicRn.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicRn.java @@ -4,6 +4,7 @@ import com.pinterest.psc.logging.PscLogger; import java.io.IOException; +import java.io.Serializable; import java.util.Objects; import java.util.Properties; import java.util.regex.Matcher; @@ -26,8 +27,9 @@ * * An example of topic RN would be rn:kafka:env:aws_us-west-1::kafkacluster01:topic01. */ -public class TopicRn { +public class TopicRn implements Serializable { private static final PscLogger logger = PscLogger.getLogger(BaseTopicUri.class); + private static final long serialVersionUID = -6081489815985829052L; protected static byte CURRENT_VERSION = 0; public static final String STANDARD = getTopicRnStandard(); diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java index 03aaff0..d0442c7 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java @@ -6,6 +6,8 @@ import com.pinterest.psc.logging.PscLogger; import org.apache.commons.configuration2.PropertiesConfiguration; +import java.io.Serializable; + /** * Various configurations for PSC client are defined here. There are a few configuration categories: *
    @@ -46,7 +48,7 @@ * {@value PSC_CONSUMER_INTERCEPTORS_RAW_CLASSES}. Therefore, to avoid misconfiguration of any single-value config do * not use a comma in the config value. */ -public class PscConfiguration extends PropertiesConfiguration { +public class PscConfiguration extends PropertiesConfiguration implements Serializable { private static final PscLogger logger = PscLogger.getLogger(PscConfiguration.class); public final static String PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST = "earliest"; @@ -570,6 +572,7 @@ public class PscConfiguration extends PropertiesConfiguration { * is provided in the file discovery.json.example. */ public static final String PSC_DISCOVERY_FALLBACK_FILE = PSC_DISCOVERY + "." + FALLBACK_FILE; + private static final long serialVersionUID = 5394130837651422684L; public PscConfiguration() { super();