diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java index 9824c3d..636ef3b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/PscFlinkConfiguration.java @@ -7,7 +7,7 @@ import java.util.Properties; public class PscFlinkConfiguration { - public static final String CLUSTER_URI_CONFIG = "psc.producer.cluster.uri"; + public static final String CLUSTER_URI_CONFIG = "psc.cluster.uri"; public static TopicUri validateAndGetBaseClusterUri(Properties properties) throws TopicUriSyntaxException { if (!properties.containsKey(CLUSTER_URI_CONFIG)) { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java index f10cb69..c3b7577 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReader.java @@ -91,7 +91,8 @@ protected void onSplitFinished(Map finis if (splitState.getCurrentOffset() >= 0) { offsetsOfFinishedSplits.put( splitState.getTopicUriPartition(), - new MessageId(splitState.getTopicUriPartition(), splitState.getCurrentOffset())); + // last processed offset. split.getCurrentOffset() is the next offset to be processed + new MessageId(splitState.getTopicUriPartition(), splitState.getCurrentOffset() - 1)); } }); } @@ -115,7 +116,8 @@ public List snapshotState(long checkpointId) { if (split.getStartingOffset() >= 0) { offsetsMap.put( split.getTopicUriPartition(), - new MessageId(split.getTopicUriPartition(), split.getStartingOffset())); + // last processed offset. split.getStartingOffset() is the next offset to be processed + new MessageId(split.getTopicUriPartition(), split.getStartingOffset() - 1)); } } // Put offsets of all the finished splits. @@ -162,7 +164,7 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception { committedPartitions.forEach( (tp, offset) -> pscSourceReaderMetrics.recordCommittedOffset( - tp, offset.getOffset())); + tp, offset.getOffset() + 1)); // offset.getOffset() is the last processed offset offsetsOfFinishedSplits .entrySet() .removeIf( diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java index 2923b43..67a8bf8 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/source/reader/PscSourceReaderTest.java @@ -178,7 +178,7 @@ void testCommitOffsetsWithoutAliveFetchers() throws Exception { assertThat(committedOffsets).hasSize(1); assertThat(committedOffsets.values()) .extracting(OffsetAndMetadata::offset) - .allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT + 1); + .allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT); } } @@ -249,7 +249,7 @@ void testOffsetCommitOnCheckpointComplete() throws Exception { assertThat(committedOffsets).hasSize(numSplits); assertThat(committedOffsets.values()) .extracting(OffsetAndMetadata::offset) - .allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT + 1); + .allMatch(offset -> offset == NUM_RECORDS_PER_SPLIT); } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/BasicPscConsumerTest.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/BasicPscConsumerTest.java index 54d7e70..4add435 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/BasicPscConsumerTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/BasicPscConsumerTest.java @@ -27,7 +27,7 @@ public void testSimpleConsumptionCount() throws Exception { int numPartitions = parallelism; String topicName = "test_simple_count"; - createTestTopic(topicName, numPartitions, 1); + createTestTopic(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topicName, numPartitions, 1); pscTestEnvWithKafka.produceToKafka(topicName, recordsInEachPartition, numPartitions, "hello"); Properties readProps = new Properties(); diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscInternalProducerITCase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscInternalProducerITCase.java index f8ed1e3..0b0faf2 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscInternalProducerITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscInternalProducerITCase.java @@ -191,7 +191,7 @@ public void testFlushAfterClosed() throws ProducerException, ConfigurationExcept public void testProducerWhenCommitEmptyPartitionsToOutdatedTxnCoordinator() throws Exception { String topic = "flink-kafka-producer-txn-coordinator-changed"; String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; - createTestTopic(topic, 1, 2); + createTestTopic(topicUri, 1, 2); PscProducer pscProducer = new FlinkPscInternalProducer<>(extraProducerProperties); try { // a simple transactional call to launch a backend producer diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java index f117a07..8a23555 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscConsumerTestBaseWithKafkaAsPubSub.java @@ -18,6 +18,9 @@ package com.pinterest.flink.streaming.connectors.psc; +import com.pinterest.flink.connector.psc.source.PscSource; +import com.pinterest.flink.connector.psc.source.PscSourceBuilder; +import com.pinterest.flink.connector.psc.source.enumerator.initializer.OffsetsInitializer; import com.pinterest.flink.streaming.connectors.psc.config.StartupMode; import com.pinterest.flink.streaming.connectors.psc.internals.PscDeserializationSchemaWrapper; import com.pinterest.flink.streaming.connectors.psc.internals.PscTopicUriPartition; @@ -28,8 +31,10 @@ import com.pinterest.flink.streaming.connectors.psc.testutils.ThrottledMapper; import com.pinterest.flink.streaming.connectors.psc.testutils.Tuple2FlinkPartitioner; import com.pinterest.flink.streaming.connectors.psc.testutils.ValidatingExactlyOnceSink; +import com.pinterest.flink.streaming.util.serialization.psc.KeyedSerializationSchema; import com.pinterest.flink.streaming.util.serialization.psc.TypeInformationKeyValueSerializationSchema; import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.TopicUriPartition; import com.pinterest.psc.common.kafka.KafkaTopicUri; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.consumer.PscConsumerMessage; @@ -39,6 +44,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; @@ -54,7 +60,6 @@ import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.client.ClientUtils; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.core.memory.DataInputView; @@ -80,7 +85,6 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor; import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator; -import com.pinterest.flink.streaming.util.serialization.psc.KeyedSerializationSchema; import org.apache.flink.test.util.SuccessException; import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.testutils.junit.RetryRule; @@ -188,12 +192,7 @@ public void runFailOnNoBrokerTest() throws Exception { pscConsumerConfiguration.setProperty("psc.discovery.connection.urls", "localhost:80"); pscConsumerConfiguration.setProperty("psc.discovery.security.protocols", "plaintext"); pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); - FlinkPscConsumerBase source = pscTestEnvWithKafka.getPscConsumer( - PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "doesntexist", - new SimpleStringSchema(), - pscConsumerConfiguration - ); - DataStream stream = see.addSource(source); + DataStream stream = getStream(see, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "doesntexist", new SimpleStringSchema(), pscConsumerConfiguration); stream.print(); see.execute("No broker test"); } catch (JobExecutionException jee) { @@ -244,8 +243,7 @@ public void runCommitOffsetsToKafka() throws Exception { pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - DataStream stream = env - .addSource(pscTestEnvWithKafka.getPscConsumer(topicUri.getTopicUriAsString(), new SimpleStringSchema(), pscConsumerConfiguration)); + DataStream stream = getStream(env, topicUri.getTopicUriAsString(), new SimpleStringSchema(), pscConsumerConfiguration); stream.addSink(new DiscardingSink()); final AtomicReference errorRef = new AtomicReference<>(); @@ -336,8 +334,7 @@ public void runAutoOffsetRetrievalAndCommitToKafka() throws Exception { PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_LATEST ); // set to reset to latest, so that partitions are initially not read - DataStream stream = env - .addSource(pscTestEnvWithKafka.getPscConsumer(topicUri.getTopicUriAsString(), new SimpleStringSchema(), readConfiguration)); + DataStream stream = getStream(env, topicUri.getTopicUriAsString(), new SimpleStringSchema(), readConfiguration); stream.addSink(new DiscardingSink()); final AtomicReference errorRef = new AtomicReference<>(); @@ -483,13 +480,23 @@ public void runStartFromLatestOffsets() throws Exception { PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST ); // this should be ignored - // TODO: this test needs to take into account useNewSource - - FlinkPscConsumerBase> latestReadingConsumer = pscTestEnvWithKafka - .getPscConsumer(topicUri.getTopicUriAsString(), deserSchema, readConfiguration); - latestReadingConsumer.setStartFromLatest(); + DataStreamSource> stream; + if (useNewSource) { + PscSource> source = + pscTestEnvWithKafka + .getSourceBuilder(topicUri.getTopicUriAsString(), deserSchema, readConfiguration) + .setStartingOffsets(OffsetsInitializer.latest()) + .setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .build(); + stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "KafkaSource"); + } else { + FlinkPscConsumerBase> latestReadingConsumer = pscTestEnvWithKafka + .getPscConsumer(topicUri.getTopicUriAsString(), deserSchema, readConfiguration); + latestReadingConsumer.setStartFromLatest(); + stream = env.addSource(latestReadingConsumer); + } - env.addSource(latestReadingConsumer).setParallelism(parallelism) + stream.setParallelism(parallelism) .flatMap(new FlatMapFunction, Object>() { @Override public void flatMap(Tuple2 value, @@ -788,7 +795,7 @@ public void runSimpleConcurrentProducerConsumerTopology() throws Exception { final int elementsPerPartition = 100; final int totalElements = parallelism * elementsPerPartition; - createTestTopic(topic, parallelism, 2); + createTestTopic(topicUri, parallelism, 2); createTestTopic(additionalEmptyTopic, parallelism, 1); // create an empty topic which will // remain empty all the time @@ -849,11 +856,8 @@ public void cancel() { pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); pscConsumerConfiguration.putAll(securePscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - FlinkPscConsumerBase> source = pscTestEnvWithKafka.getPscConsumer(topicUris, - sourceSchema, pscConsumerConfiguration); - DataStreamSource> consuming = env.addSource(source) - .setParallelism(parallelism); + DataStreamSource> consuming = getStream(env, topicUris, sourceSchema, pscConsumerConfiguration).setParallelism(parallelism); consuming.addSink(new RichSinkFunction>() { @@ -944,7 +948,8 @@ public void runOneToOneExactlyOnceTest() throws Exception { pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); FlinkPscConsumerBase kafkaSource = pscTestEnvWithKafka.getPscConsumer(topicUri, schema, pscConsumerConfiguration); - env.addSource(kafkaSource).map(new PartitionValidatingMapper(parallelism, 1)) + getStream(env, topicUri, schema, pscConsumerConfiguration) + .map(new PartitionValidatingMapper(parallelism, 1)) .map(new FailingIdentityMapper(failAfterElements)) .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); @@ -990,9 +995,10 @@ public void runOneSourceMultiplePartitionsExactlyOnceTest() throws Exception { pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); FlinkPscConsumerBase kafkaSource = pscTestEnvWithKafka.getPscConsumer(topicUri, schema, pscConsumerConfiguration); - env.addSource(kafkaSource).map(new PartitionValidatingMapper(numPartitions, 3)) - .map(new FailingIdentityMapper(failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); + getStream(env, topicUri, schema, pscConsumerConfiguration) + .map(new PartitionValidatingMapper(numPartitions, 3)) + .map(new FailingIdentityMapper(failAfterElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); FailingIdentityMapper.failedBefore = false; tryExecute(env, "One-source-multi-partitions exactly once test"); @@ -1039,9 +1045,10 @@ public void runMultipleSourcesOnePartitionExactlyOnceTest() throws Exception { pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); FlinkPscConsumerBase kafkaSource = pscTestEnvWithKafka.getPscConsumer(topicUri, schema, pscConsumerConfiguration); - env.addSource(kafkaSource).map(new PartitionValidatingMapper(numPartitions, 1)) - .map(new FailingIdentityMapper(failAfterElements)) - .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); + getStream(env, topicUri, schema, pscConsumerConfiguration) + .map(new PartitionValidatingMapper(numPartitions, 1)) + .map(new FailingIdentityMapper(failAfterElements)) + .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); FailingIdentityMapper.failedBefore = false; tryExecute(env, "multi-source-one-partitions exactly once test"); @@ -1076,10 +1083,9 @@ public void runCancelingOnFullInputTest() throws Exception { pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); pscConsumerConfiguration.putAll(securePscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - FlinkPscConsumerBase source = pscTestEnvWithKafka.getPscConsumer(topicUri, new SimpleStringSchema(), - pscConsumerConfiguration); - env.addSource(source).addSink(new DiscardingSink()); + getStream(env, topicUri, new SimpleStringSchema(), pscConsumerConfiguration) + .addSink(new DiscardingSink()); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); final JobID jobId = jobGraph.getJobID(); @@ -1138,7 +1144,7 @@ public void runCancelingOnEmptyInputTest() throws Exception { final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; final int parallelism = 3; - createTestTopic(topic, parallelism, 1); + createTestTopic(topicUri, parallelism, 1); final AtomicReference error = new AtomicReference<>(); @@ -1149,10 +1155,9 @@ public void runCancelingOnEmptyInputTest() throws Exception { Properties pscConsumerConfiguration = new Properties(); pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); pscConsumerConfiguration.putAll(securePscConsumerConfiguration); - FlinkPscConsumerBase source = pscTestEnvWithKafka.getPscConsumer(topicUri, new SimpleStringSchema(), - pscConsumerConfiguration); - env.addSource(source).addSink(new DiscardingSink()); + getStream(env, topicUri, new SimpleStringSchema(), pscConsumerConfiguration) + .addSink(new DiscardingSink()); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); final JobID jobId = jobGraph.getJobID(); @@ -1270,10 +1275,10 @@ public void cancel() { pscConsumerProperties.putAll(pscDiscoveryConfiguration); if (useLegacySchema) { Tuple2WithTopicSchema schema = new Tuple2WithTopicSchema(env.getConfig()); - stream = env.addSource(pscTestEnvWithKafka.getPscConsumer(topicUris, schema, pscConsumerProperties)); + stream = getStream(env, topicUris, schema, pscConsumerProperties); } else { TestDeserializer schema = new TestDeserializer(env.getConfig()); - stream = env.addSource(pscTestEnvWithKafka.getPscConsumer(topicUris, schema, pscConsumerProperties)); + stream = getStream(env, topicUris, schema, pscConsumerProperties); } stream.flatMap(new FlatMapFunction, Integer>() { @@ -1352,9 +1357,7 @@ public void runBigRecordTestTopology() throws Exception { pscConsumerConfiguration.putAll(securePscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - FlinkPscConsumerBase> source = pscTestEnvWithKafka.getPscConsumer(topicUri, serSchema, - pscConsumerConfiguration); - DataStreamSource> consuming = env.addSource(source); + DataStreamSource> consuming = getStream(env, topicUri, serSchema, pscConsumerConfiguration); consuming.addSink(new SinkFunction>() { @@ -1462,9 +1465,9 @@ public void runBrokerFailureTest() throws Exception { pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); pscConsumerConfiguration.putAll(securePscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - FlinkPscConsumerBase kafkaSource = pscTestEnvWithKafka.getPscConsumer(topicUri, schema, pscConsumerConfiguration); - env.addSource(kafkaSource).map(new PartitionValidatingMapper(parallelism, 1)) + getStream(env, topicUri, schema, pscConsumerConfiguration) + .map(new PartitionValidatingMapper(parallelism, 1)) .map(new BrokerKillingMapper(leaderId, failAfterElements)) .addSink(new ValidatingExactlyOnceSink(totalElements)).setParallelism(1); @@ -1530,8 +1533,7 @@ public void cancel() { pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); pscConsumerConfiguration.putAll(securePscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - DataStream> fromKafka = env - .addSource(pscTestEnvWithKafka.getPscConsumer(topicUri, readSchema, pscConsumerConfiguration)); + DataStream> fromKafka = getStream(env, topicUri, readSchema, pscConsumerConfiguration); fromKafka.flatMap(new RichFlatMapFunction, Object>() { long counter = 0; @@ -1574,7 +1576,7 @@ public PojoValue() { public void runAllDeletesTest() throws Exception { final String topic = "alldeletestest"; final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; - createTestTopic(topic, 1, 1); + createTestTopic(topicUri, 1, 1); final int elementCount = 300; // ----------- Write some data into Kafka ------------------- @@ -1621,8 +1623,7 @@ public void cancel() { pscConsumerConfiguration.putAll(standardPscConsumerConfiguration); pscConsumerConfiguration.putAll(securePscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - DataStream> fromKafka = env - .addSource(pscTestEnvWithKafka.getPscConsumer(topicUri, schema, pscConsumerConfiguration)); + DataStream> fromKafka = getStream(env, topicUri, schema, pscConsumerConfiguration); fromKafka.flatMap(new RichFlatMapFunction, Object>() { long counter = 0; @@ -1666,8 +1667,8 @@ public void runEndOfStreamTest() throws Exception { pscConsumerConfiguration.putAll(securePscConsumerConfiguration); pscConsumerConfiguration.putAll(pscDiscoveryConfiguration); - DataStream> fromKafka = env1.addSource( - pscTestEnvWithKafka.getPscConsumer(topicUri.getTopicUriAsString(), new FixedNumberDeserializationSchema(elementCount), pscConsumerConfiguration)); + DataStream> fromKafka = + getStream(env1, topicUri.getTopicUriAsString(), new FixedNumberDeserializationSchema(elementCount), pscConsumerConfiguration); fromKafka.flatMap(new FlatMapFunction, Void>() { @Override public void flatMap(Tuple2 value, Collector out) throws Exception { @@ -1766,8 +1767,7 @@ public void runMetricsTest() throws Throwable { Properties properties = new Properties(); properties.putAll(standardPscConsumerConfiguration); properties.putAll(pscDiscoveryConfiguration); - DataStream> fromKafka = env1 - .addSource(pscTestEnvWithKafka.getPscConsumer(topicUri, schema, properties)); + DataStream> fromKafka = getStream(env1, topicUri, schema, properties); fromKafka.flatMap(new FlatMapFunction, Void>() { @Override public void flatMap(Tuple2 value, Collector out) throws Exception {// no @@ -1981,12 +1981,32 @@ protected void readSequence(final StreamExecutionEnvironment env, // cc.setProperty("psc.discovery.topic.uri.prefixes", PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); // cc.setProperty("psc.discovery.connection.urls", pscTestEnvWithKafka.getBrokerConnectionString().split(",")[0]); // cc.setProperty("psc.discovery.security.protocols", "plaintext"); - FlinkPscConsumerBase> consumer = pscTestEnvWithKafka.getPscConsumer(topicUri, - deser, cc); - setPscConsumerOffset(startupMode, consumer, specificStartupOffsets, startupTimestamp); + DataStreamSource> source; + if (useNewSource) { + PscSourceBuilder> sourceBuilder = + pscTestEnvWithKafka.getSourceBuilder(topicUri, deser, cc) + .setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + Map startOffsets = new HashMap<>(); + if (specificStartupOffsets != null) { + specificStartupOffsets.forEach( + (ktp, offset) -> + startOffsets.put( + new TopicUriPartition(ktp.getTopicUri(), ktp.getPartition()), + offset)); + } + setPscSourceOffset(startupMode, sourceBuilder, startOffsets, startupTimestamp); + source = + env.fromSource( + sourceBuilder.build(), WatermarkStrategy.noWatermarks(), "PscSource"); + } else { + FlinkPscConsumerBase> consumer = pscTestEnvWithKafka.getPscConsumer(topicUri, + deser, cc); + setPscConsumerOffset(startupMode, consumer, specificStartupOffsets, startupTimestamp); + source = env.addSource(consumer); + } - DataStream> source = env.addSource(consumer) - .setParallelism(sourceParallelism).map(new ThrottledMapper>(20)) + source.setParallelism(sourceParallelism) + .map(new ThrottledMapper>(20)) .setParallelism(sourceParallelism); // verify data @@ -2071,6 +2091,30 @@ protected void readSequence(final StreamExecutionEnvironment env, partitionsToValuesCountAndStartOffset); } + protected void setPscSourceOffset( + final StartupMode startupMode, + final PscSourceBuilder pscSourceBuilder, + final Map specificStartupOffsets, + final Long startupTimestamp) { + switch (startupMode) { + case EARLIEST: + pscSourceBuilder.setStartingOffsets(OffsetsInitializer.earliest()); + break; + case LATEST: + pscSourceBuilder.setStartingOffsets(OffsetsInitializer.latest()); + break; + case SPECIFIC_OFFSETS: + pscSourceBuilder.setStartingOffsets(OffsetsInitializer.offsets(specificStartupOffsets)); + break; + case GROUP_OFFSETS: + pscSourceBuilder.setStartingOffsets(OffsetsInitializer.committedOffsets(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST)); + break; + case TIMESTAMP: + pscSourceBuilder.setStartingOffsets(OffsetsInitializer.timestamp(startupTimestamp)); + break; + } + } + protected void setPscConsumerOffset(final StartupMode startupMode, final FlinkPscConsumerBase> consumer, final Map specificStartupOffsets, @@ -2282,14 +2326,26 @@ private boolean validateSequence(String topicUri, readConfiguration.setProperty(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "flink-tests-validator-consumer"); readConfiguration.putAll(securePscConsumerConfiguration); readConfiguration.putAll(pscDiscoveryConfiguration); - FlinkPscConsumerBase> pscConsumer = pscTestEnvWithKafka.getPscConsumer( - topicUri, - deserSchema, - readConfiguration - ); - pscConsumer.setStartFromEarliest(); - readEnv.addSource(pscConsumer) + DataStreamSource> dataStreamSource; + if (useNewSource) { + PscSource> source = + pscTestEnvWithKafka.getSourceBuilder(topicUri, deserSchema, readConfiguration) + .setStartingOffsets(OffsetsInitializer.earliest()) + .setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .build(); + dataStreamSource = readEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "PscSource"); + } else { + FlinkPscConsumerBase> pscConsumer = pscTestEnvWithKafka.getPscConsumer( + topicUri, + deserSchema, + readConfiguration + ); + pscConsumer.setStartFromEarliest(); + dataStreamSource = readEnv.addSource(pscConsumer); + } + + dataStreamSource .map(new RichMapFunction, Tuple2>() { private final int totalCount = parallelism * totalNumElements; @@ -2351,6 +2407,58 @@ public Tuple2 map(Tuple2 value) throws Excep return success; } + private DataStreamSource getStream( + StreamExecutionEnvironment env, + String topicUri, + DeserializationSchema schema, + Properties props) { + return getStream(env, Collections.singletonList(topicUri), schema, props); + } + + private DataStreamSource getStream( + StreamExecutionEnvironment env, + String topicUri, + PscDeserializationSchema schema, + Properties props) { + return getStream(env, Collections.singletonList(topicUri), schema, props); + } + + private DataStreamSource getStream( + StreamExecutionEnvironment env, + List topicUris, + DeserializationSchema schema, + Properties props) { + if (useNewSource) { + PscSource pscSource = + pscTestEnvWithKafka.getSourceBuilder(topicUris, schema, props) + .setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .build(); + return env.fromSource(pscSource, WatermarkStrategy.noWatermarks(), "PscSource"); + } else { + FlinkPscConsumerBase flinkPscConsumer = + pscTestEnvWithKafka.getPscConsumer(topicUris, schema, props); + return env.addSource(flinkPscConsumer); + } + } + + private DataStreamSource getStream( + StreamExecutionEnvironment env, + List topicUris, + PscDeserializationSchema schema, + Properties props) { + if (useNewSource) { + PscSource pscSource = + pscTestEnvWithKafka.getSourceBuilder(topicUris, schema, props) + .setClusterUri(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX) + .build(); + return env.fromSource(pscSource, WatermarkStrategy.noWatermarks(), "PscSource"); + } else { + FlinkPscConsumerBase flinkPscConsumer = + pscTestEnvWithKafka.getPscConsumer(topicUris, schema, props); + return env.addSource(flinkPscConsumer); + } + } + // ------------------------------------------------------------------------ // Debugging utilities // ------------------------------------------------------------------------ diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscShortRetentionTestBase.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscShortRetentionTestBase.java index c06e57d..f9735be 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscShortRetentionTestBase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscShortRetentionTestBase.java @@ -136,7 +136,7 @@ public void runAutoOffsetResetTest() throws Exception { Properties tprops = new Properties(); tprops.setProperty("retention.ms", "250"); - pscTestEnvWithKafka.createTestTopic(topic, parallelism, 1, tprops); + pscTestEnvWithKafka.createTestTopic(topicUri, parallelism, 1, tprops); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); @@ -234,7 +234,7 @@ public void runFailOnAutoOffsetResetNone() throws Exception { final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; final int parallelism = 1; - pscTestEnvWithKafka.createTestTopic(topic, parallelism, 1); + pscTestEnvWithKafka.createTestTopic(topicUri, parallelism, 1); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); @@ -272,7 +272,7 @@ public void runFailOnAutoOffsetResetNoneEager() throws Exception { final String topicUri = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; final int parallelism = 1; - pscTestEnvWithKafka.createTestTopic(topic, parallelism, 1); + pscTestEnvWithKafka.createTestTopic(topicUri, parallelism, 1); // ----------- add consumer ---------- diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java index c1478fd..5c4087a 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSubImpl.java @@ -31,6 +31,7 @@ import com.pinterest.psc.exception.consumer.ConsumerException; import com.pinterest.psc.exception.producer.ProducerException; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.metrics.NullMetricsReporter; import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; @@ -238,8 +239,18 @@ public void createTestTopic(String topicUriString, NewTopic topicObj = new NewTopic(BaseTopicUri.validate(topicUriString).getTopic(), numberOfPartitions, (short) replicationFactor); adminClient.createTopics(Collections.singleton(topicObj)).all().get(); } catch (Exception e) { - e.printStackTrace(); - fail("Create test topic : " + topicUriString + " failed, " + e.getMessage()); + // try to create it assuming that it's not a topicUriString + if (e instanceof TopicUriSyntaxException) { + LOG.warn("Trying to create assuming that {} is just the topicName", topicUriString); + try (AdminClient adminClient = AdminClient.create(getStandardKafkaProperties())) { + NewTopic topicObj = new NewTopic(topicUriString, numberOfPartitions, (short) replicationFactor); + adminClient.createTopics(Collections.singleton(topicObj)).all().get(); + } catch (Exception e2) { + fail("Create test topic : " + topicUriString + " failed, " + e.getMessage()); + } + } else { + fail("Create test topic : " + topicUriString + " failed, " + e.getMessage()); + } } } diff --git a/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java b/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java index 08f3d25..4668635 100644 --- a/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java +++ b/psc/src/main/java/com/pinterest/psc/common/BaseTopicUri.java @@ -4,6 +4,7 @@ import com.pinterest.psc.exception.startup.TopicUriSyntaxException; import com.pinterest.psc.logging.PscLogger; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -12,7 +13,7 @@ // base topic URI only parses the URI down to topic RN; :[topicRN] -public class BaseTopicUri implements TopicUri { +public class BaseTopicUri implements TopicUri, Serializable { private static final PscLogger logger = PscLogger.getLogger(BaseTopicUri.class); private static final String DEFAULT_PROTOCOL = "plaintext"; private static final String[] SCHEMAS = { @@ -23,6 +24,7 @@ public class BaseTopicUri implements TopicUri { "((?[a-zA-Z0-9-_.]+):%s)?(?[a-zA-Z0-9-_.:]+)", SEPARATOR )) }; + private static final long serialVersionUID = 6728709313941136158L; private final String topicUriAsString; protected final String protocol; diff --git a/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java b/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java index 0d5fc38..39b8a1c 100644 --- a/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java +++ b/psc/src/main/java/com/pinterest/psc/common/TopicUriPartition.java @@ -2,10 +2,13 @@ import com.pinterest.psc.exception.startup.TopicUriSyntaxException; +import java.io.Serializable; + /** * A topic URI and partition pair to identify the most granular pub/sub concept plus the access protocol. */ -public class TopicUriPartition implements Comparable { +public class TopicUriPartition implements Comparable, Serializable { + private static final long serialVersionUID = -7784054113828809322L; private final String topicUriStr; private final int partition; private TopicUri backendTopicUri;