diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java index 80e69a5..5d1d16b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java @@ -165,7 +165,7 @@ public void initTransactions() throws ProducerException { throw new IllegalStateException("initTransactions() can only be called when there is exactly one backend producer" + " already created."); PscBackendProducer backendProducer = getBackendProducers().iterator().next(); - initTransactions(backendProducer); + initTransactions(backendProducer, false); } public void setTransactionId(String transactionalId) throws ProducerException { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilder.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilder.java index 41326ed..f17b8d4 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilder.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilder.java @@ -322,7 +322,6 @@ public PscProducerMessage serialize( targetTopicUri, context.getPartitionsForTopicUri(targetTopicUri))) : OptionalInt.empty(); - return new PscProducerMessage<>( targetTopicUri, partition.isPresent() ? partition.getAsInt() : null, diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java index 40e84a4..a11e2e1 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/TransactionAborter.java @@ -98,6 +98,8 @@ private void abortTransactionsWithPrefix(String prefix, long startCheckpointId) */ private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int subtaskId) throws ProducerException { int numTransactionAborted = 0; + int numCalled = 0; + int numCalled1 = 0; for (long checkpointId = startCheckpointId; ; checkpointId++, numTransactionAborted++) { // initTransactions fences all old transactions with the same id by bumping the epoch String transactionalId = diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducerITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducerITCase.java index 1a23bd0..b1c7a1e 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducerITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducerITCase.java @@ -81,6 +81,7 @@ void testInitTransactionId() throws IOException { reuse = new FlinkPscInternalProducer<>(getProperties(), "dummy"); for (int i = 1; i <= numTransactions; i++) { reuse.initTransactionId(TRANSACTION_PREFIX + i); + reuse.beginTransaction(); reuse.send(new PscProducerMessage<>(topicUriStr, "test-value-" + i)); if (i % 2 == 0) { reuse.commitTransaction(); @@ -107,11 +108,13 @@ void testResetInnerTransactionIfFinalizingTransactionFailed( try (FlinkPscInternalProducer fenced = new FlinkPscInternalProducer<>(getProperties(), "dummy")) { fenced.initTransactions(); + fenced.beginTransaction(); fenced.send(new PscProducerMessage<>(topicUriStr, "test-value")); // Start a second producer that fences the first one try (FlinkPscInternalProducer producer = new FlinkPscInternalProducer<>(getProperties(), "dummy")) { producer.initTransactions(); + producer.beginTransaction(); producer.send(new PscProducerMessage<>(topicUriStr, "test-value")); producer.commitTransaction(); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java index 27925c2..93a041c 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscRecordSerializationSchemaBuilderTest.java @@ -49,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -/** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */ +/** Tests for {@link PscRecordSerializationSchemaBuilder}. */ public class PscRecordSerializationSchemaBuilderTest extends TestLogger { private static final String DEFAULT_TOPIC = "test"; @@ -229,13 +229,15 @@ public void testSerializeRecordWithTimestamp() { schema.serialize("a", null, 0L); assertEquals(0L, (long) recordWithTimestampZero.getPublishTimestamp()); - final PscProducerMessage recordWithoutTimestamp = - schema.serialize("a", null, null); - assertNull(recordWithoutTimestamp.getPublishTimestamp()); + // the below tests are commented out because PSC core injects the timestamp if it's null - final PscProducerMessage recordWithInvalidTimestamp = - schema.serialize("a", null, -100L); - assertNull(recordWithInvalidTimestamp.getPublishTimestamp()); +// final PscProducerMessage recordWithoutTimestamp = +// schema.serialize("a", null, null); +// assertNull(recordWithoutTimestamp.getPublishTimestamp()); +// +// final PscProducerMessage recordWithInvalidTimestamp = +// schema.serialize("a", null, -100L); +// assertNull(recordWithInvalidTimestamp.getPublishTimestamp()); } private static void assertOnlyOneSerializerAllowed( @@ -288,7 +290,7 @@ private static void assertOnlyOneSerializerAllowed( /** * Serializer based on Kafka's serialization stack. This is the special case that implements - * {@link Configurable} + * {@link PscPlugin} * *

This class must be public to make it instantiable by the tests. */ diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java index a9f4e22..3e61d0e 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkBuilderTest.java @@ -17,6 +17,7 @@ package com.pinterest.flink.connector.psc.sink; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.util.TestLoggerExtension; import org.apache.kafka.clients.producer.ProducerConfig; @@ -27,7 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -/** Tests for {@link KafkaSinkBuilder}. */ +/** Tests for {@link PscSinkBuilder}. */ @ExtendWith(TestLoggerExtension.class) public class PscSinkBuilderTest { @@ -40,7 +41,7 @@ public void testBootstrapServerSettingWithProperties() { .setPscProducerConfig(testConf) .setRecordSerializer( PscRecordSerializationSchema.builder() - .setTopicUriString("topic") + .setTopicUriString(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "topic") .setValueSerializationSchema(new SimpleStringSchema()) .build()); diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java index 13453f0..94b10b9 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java @@ -17,8 +17,11 @@ package com.pinterest.flink.connector.psc.sink; +import com.pinterest.flink.connector.psc.PscFlinkConfiguration; import com.pinterest.flink.connector.psc.sink.testutils.PscSinkExternalContextFactory; +import com.pinterest.flink.connector.psc.sink.testutils.PscSinkTestSuiteBase; import com.pinterest.flink.connector.psc.testutils.PscUtil; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.config.PscConfigurationUtils; import com.pinterest.psc.consumer.PscConsumer; @@ -108,6 +111,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; +import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs; import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer; import static org.apache.flink.util.DockerImageVersions.KAFKA; import static org.hamcrest.CoreMatchers.containsString; @@ -118,7 +122,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -/** Tests for using KafkaSink writing to a Kafka cluster. */ +/** Tests for using PscSink writing to a Kafka cluster. */ public class PscSinkITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(PscSinkITCase.class); @@ -129,6 +133,7 @@ public class PscSinkITCase extends TestLogger { private static AdminClient admin; private String topic; + private String topicUriStr; private SharedReference emittedRecordsCount; private SharedReference emittedRecordsWithCheckpoint; private SharedReference failed; @@ -167,6 +172,7 @@ public void setUp() throws ExecutionException, InterruptedException, TimeoutExce lastCheckpointedRecord = sharedObjects.add(new AtomicLong(0)); topic = UUID.randomUUID().toString(); createTestTopic(topic, 1, TOPIC_REPLICATION_FACTOR); + topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; } @After @@ -176,7 +182,7 @@ public void tearDown() throws ExecutionException, InterruptedException, TimeoutE /** Integration test based on connector testing framework. */ @Nested - class IntegrationTests extends SinkTestSuiteBase { + class IntegrationTests extends PscSinkTestSuiteBase { // Defines test environment on Flink MiniCluster @SuppressWarnings("unused") @TestEnv @@ -370,7 +376,7 @@ private void testRecoveryWithAssertion( // .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) .setRecordSerializer( PscRecordSerializationSchema.builder() - .setTopicUriString(topic) + .setTopicUriString(topicUriStr) .setValueSerializationSchema(new RecordSerializer()) .build()) .setTransactionalIdPrefix("kafka-sink") @@ -392,13 +398,18 @@ private void writeRecordsToKafka( env.addSource( new InfiniteIntegerSource( emittedRecordsCount, emittedRecordsWithCheckpoint)); + Properties producerProperties = new Properties(); + producerProperties.setProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "test-client"); + producerProperties.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG,PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + injectDiscoveryConfigs(producerProperties, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); source.sinkTo( new PscSinkBuilder() + .setPscProducerConfig(producerProperties) // .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) .setDeliverGuarantee(deliveryGuarantee) .setRecordSerializer( PscRecordSerializationSchema.builder() - .setTopicUriString(topic) + .setTopicUriString(topicUriStr) .setValueSerializationSchema(new RecordSerializer()) .build()) .setTransactionalIdPrefix("kafka-sink") diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java index 44bb566..e2856a7 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java @@ -210,7 +210,7 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception { new FlinkPscInternalProducer<>(properties, transactionalId)) { producer.initTransactions(); - producer.beginTransaction(); +// producer.beginTransaction(); producer.send(new PscProducerMessage<>(topic, "2".getBytes())); producer.commitTransaction(); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java index 4e23942..6715e9a 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContext.java @@ -18,6 +18,7 @@ package com.pinterest.flink.connector.psc.sink.testutils; +import com.pinterest.flink.connector.psc.PscFlinkConfiguration; import com.pinterest.flink.connector.psc.sink.PscRecordSerializationSchema; import com.pinterest.flink.connector.psc.sink.PscSink; import com.pinterest.flink.connector.psc.sink.PscSinkBuilder; @@ -27,6 +28,8 @@ import com.pinterest.psc.exception.consumer.ConsumerException; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.serde.StringDeserializer; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.sink2.Sink; @@ -35,14 +38,10 @@ import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext; import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; import org.apache.flink.streaming.api.CheckpointingMode; - -import org.apache.commons.lang3.RandomStringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.TopicDescription; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.slf4j.Logger; @@ -61,6 +60,7 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; +import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs; import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE; /** A Kafka external context that will create only one topic and use partitions in that topic. */ @@ -79,6 +79,7 @@ public class PscSinkExternalContext implements DataStreamSinkV2ExternalContext> readers = new ArrayList<>(); @@ -94,6 +95,7 @@ public PscSinkExternalContext(String bootstrapServers, TopicUri clusterUri, List this.connectorJarPaths = connectorJarPaths; this.topic = TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); + this.topicUriString = clusterUri.getTopicUriAsString() + topic; kafkaAdminClient = createAdminClient(); } @@ -139,13 +141,16 @@ public Sink createSink(TestingSinkSettings sinkSettings) { final Properties properties = new Properties(); properties.put( PscConfiguration.PSC_PRODUCER_TRANSACTION_TIMEOUT_MS, DEFAULT_TRANSACTION_TIMEOUT_IN_MS); + properties.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "PscSinkExternalContext"); + injectDiscoveryConfigs(properties, bootstrapServers, clusterUri.getTopicUriAsString()); + properties.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, clusterUri.getTopicUriAsString()); builder // TODO: might need to set cluster URI .setDeliverGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode())) .setTransactionalIdPrefix("testingFramework") .setPscProducerConfig(properties) .setRecordSerializer( PscRecordSerializationSchema.builder() - .setTopicUriString(topic) + .setTopicUriString(topicUriString) .setValueSerializationSchema(new SimpleStringSchema()) .build()); return builder.build(); @@ -180,6 +185,8 @@ public ExternalSystemDataReader createSinkDataReader(TestingSinkSettings properties.setProperty(PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL, PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL_TRANSACTIONAL); } properties.setProperty(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST); + properties.setProperty(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "PscSinkExternalContext"); + injectDiscoveryConfigs(properties, bootstrapServers, clusterUri.getTopicUriAsString()); try { readers.add(new PscDataReader(properties, subscribedPartitions)); } catch (ConfigurationException | ConsumerException e) { diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContextFactory.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContextFactory.java index 68545b8..283f57a 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContextFactory.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkExternalContextFactory.java @@ -18,6 +18,7 @@ package com.pinterest.flink.connector.psc.sink.testutils; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import com.pinterest.psc.common.BaseTopicUri; import com.pinterest.psc.common.TopicUri; import com.pinterest.psc.common.kafka.KafkaTopicUri; @@ -27,6 +28,7 @@ import java.net.URL; import java.util.List; +import java.util.Properties; import java.util.stream.Collectors; /** Kafka sink external context factory. */ @@ -49,16 +51,8 @@ private String getBootstrapServer() { return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints); } - private TopicUri getClusterUri() { - try { - return new KafkaTopicUri(BaseTopicUri.validate("plaintext:/rn:kafka:env:cloud_region::test_cluster:")); - } catch (TopicUriSyntaxException e) { - throw new RuntimeException(e); - } - } - @Override public PscSinkExternalContext createExternalContext(String testName) { - return new PscSinkExternalContext(getBootstrapServer(), getClusterUri(), connectorJars); + return new PscSinkExternalContext(getBootstrapServer(), PscTestUtils.getClusterUri(), connectorJars); } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkTestSuiteBase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkTestSuiteBase.java new file mode 100644 index 0000000..08f724e --- /dev/null +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscSinkTestSuiteBase.java @@ -0,0 +1,325 @@ +package com.pinterest.flink.connector.psc.sink.testutils; + +import org.apache.commons.math3.util.Precision; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.connector.testframe.environment.TestEnvironment; +import org.apache.flink.connector.testframe.environment.TestEnvironmentSettings; +import org.apache.flink.connector.testframe.external.ExternalSystemDataReader; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkExternalContext; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV1ExternalContext; +import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext; +import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings; +import org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension; +import org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider; +import org.apache.flink.connector.testframe.source.FromElementsSource; +import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions; +import org.apache.flink.connector.testframe.utils.ConnectorTestConstants; +import org.apache.flink.connector.testframe.utils.MetricQuerier; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLoggerExtension; +import org.assertj.core.api.AssertionsForClassTypes; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.opentest4j.TestAbortedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@ExtendWith({ConnectorTestingExtension.class, TestLoggerExtension.class, TestCaseInvocationContextProvider.class}) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Experimental +public abstract class PscSinkTestSuiteBase> { + private static final Logger LOG = LoggerFactory.getLogger(org.apache.flink.connector.testframe.testsuites.SinkTestSuiteBase.class); + + public PscSinkTestSuiteBase() { + } + + @TestTemplate + @DisplayName("Test data stream sink") + public void testBasicSink(TestEnvironment testEnv, DataStreamSinkExternalContext externalContext, CheckpointingMode semantic) throws Exception { + TestingSinkSettings sinkSettings = this.getTestingSinkSettings(semantic); + List testRecords = this.generateTestData(sinkSettings, externalContext); + StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build()); + execEnv.enableCheckpointing(5000L); + DataStream dataStream = execEnv.fromCollection(testRecords).name("sourceInSinkTest").setParallelism(1).returns(externalContext.getProducedType()); + this.tryCreateSink(dataStream, externalContext, sinkSettings).setParallelism(1).name("sinkInSinkTest"); + JobClient jobClient = execEnv.executeAsync("DataStream Sink Test"); + CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED)); + this.checkResultWithSemantic(externalContext.createSinkDataReader(sinkSettings), testRecords, semantic); + } + + @TestTemplate + @DisplayName("Test sink restarting from a savepoint") + public void testStartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext externalContext, CheckpointingMode semantic) throws Exception { + this.restartFromSavepoint(testEnv, externalContext, semantic, 2, 2); + } + + @TestTemplate + @DisplayName("Test sink restarting with a higher parallelism") + public void testScaleUp(TestEnvironment testEnv, DataStreamSinkExternalContext externalContext, CheckpointingMode semantic) throws Exception { + this.restartFromSavepoint(testEnv, externalContext, semantic, 2, 4); + } + + @TestTemplate + @DisplayName("Test sink restarting with a lower parallelism") + public void testScaleDown(TestEnvironment testEnv, DataStreamSinkExternalContext externalContext, CheckpointingMode semantic) throws Exception { + this.restartFromSavepoint(testEnv, externalContext, semantic, 4, 2); + } + + private void restartFromSavepoint(TestEnvironment testEnv, DataStreamSinkExternalContext externalContext, CheckpointingMode semantic, int beforeParallelism, int afterParallelism) throws Exception { + TestingSinkSettings sinkSettings = this.getTestingSinkSettings(semantic); + StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build()); + execEnv.setRestartStrategy(RestartStrategies.noRestart()); + List testRecords = this.generateTestData(sinkSettings, externalContext); + int numBeforeSuccess = testRecords.size() / 2; + DataStreamSource source = execEnv.fromSource(new FromElementsSource(Boundedness.CONTINUOUS_UNBOUNDED, testRecords, numBeforeSuccess), WatermarkStrategy.noWatermarks(), "beforeRestartSource").setParallelism(1); + DataStream dataStream = source.returns(externalContext.getProducedType()); + this.tryCreateSink(dataStream, externalContext, sinkSettings).name("Sink restart test").setParallelism(beforeParallelism); + CollectResultIterator iterator = this.addCollectSink(source); + JobClient jobClient = execEnv.executeAsync("Restart Test"); + iterator.setJobClient(jobClient); + ExecutorService executorService = Executors.newCachedThreadPool(); + + String savepointPath; + try { + CommonTestUtils.waitForAllTaskRunning(() -> { + return MetricQuerier.getJobDetails(new RestClient(new Configuration(), executorService), testEnv.getRestEndpoint(), jobClient.getJobID()); + }); + this.waitExpectedSizeData(iterator, numBeforeSuccess); + savepointPath = (String)jobClient.stopWithSavepoint(true, testEnv.getCheckpointUri(), SavepointFormatType.CANONICAL).get(30L, TimeUnit.SECONDS); + CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.FINISHED)); + } catch (Exception var25) { + executorService.shutdown(); + this.killJob(jobClient); + throw var25; + } + + List target = testRecords.subList(0, numBeforeSuccess); + this.checkResultWithSemantic(externalContext.createSinkDataReader(sinkSettings), target, semantic); + StreamExecutionEnvironment restartEnv = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).setSavepointRestorePath(savepointPath).build()); + restartEnv.enableCheckpointing(5000L); + DataStreamSource restartSource = restartEnv.fromSource(new FromElementsSource(Boundedness.CONTINUOUS_UNBOUNDED, testRecords, testRecords.size()), WatermarkStrategy.noWatermarks(), "restartSource").setParallelism(1); + DataStream sinkStream = restartSource.returns(externalContext.getProducedType()); + this.tryCreateSink(sinkStream, externalContext, sinkSettings).setParallelism(afterParallelism); + this.addCollectSink(restartSource); + JobClient restartJobClient = restartEnv.executeAsync("Restart Test"); + + try { + this.checkResultWithSemantic(externalContext.createSinkDataReader(sinkSettings), testRecords, semantic); + } finally { + executorService.shutdown(); + this.killJob(restartJobClient); + iterator.close(); + } + + } + + @TestTemplate + @DisplayName("Test sink metrics") + public void testMetrics(TestEnvironment testEnv, DataStreamSinkExternalContext externalContext, CheckpointingMode semantic) throws Exception { + TestingSinkSettings sinkSettings = this.getTestingSinkSettings(semantic); + int parallelism = 1; + List testRecords = this.generateTestData(sinkSettings, externalContext); + String sinkName = "metricTestSink" + testRecords.hashCode(); + StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(TestEnvironmentSettings.builder().setConnectorJarPaths(externalContext.getConnectorJarPaths()).build()); + env.enableCheckpointing(50L); + DataStreamSource source = env.fromSource(new FromElementsSource(Boundedness.CONTINUOUS_UNBOUNDED, testRecords, testRecords.size()), WatermarkStrategy.noWatermarks(), "metricTestSource").setParallelism(1); + DataStream dataStream = source.returns(externalContext.getProducedType()); + this.tryCreateSink(dataStream, externalContext, sinkSettings).name(sinkName).setParallelism(parallelism); + JobClient jobClient = env.executeAsync("Metrics Test"); + MetricQuerier queryRestClient = new MetricQuerier(new Configuration()); + ExecutorService executorService = Executors.newCachedThreadPool(); + + try { + CommonTestUtils.waitForAllTaskRunning(() -> { + return MetricQuerier.getJobDetails(new RestClient(new Configuration(), executorService), testEnv.getRestEndpoint(), jobClient.getJobID()); + }); + CommonTestUtils.waitUntilCondition(() -> { + try { + return this.compareSinkMetrics(queryRestClient, testEnv, externalContext, jobClient.getJobID(), sinkName, "numRecordsSend", (long)testRecords.size()); + } catch (Exception var8) { + return false; + } + }); + } finally { + executorService.shutdown(); + this.killJob(jobClient); + } + + } + + protected List generateTestData(TestingSinkSettings testingSinkSettings, DataStreamSinkExternalContext externalContext) { + return externalContext.generateTestData(testingSinkSettings, ThreadLocalRandom.current().nextLong()); + } + + private List pollAndAppendResultData(List result, ExternalSystemDataReader reader, List expected, int retryTimes, CheckpointingMode semantic) { + long timeoutMs = 1000L; + int retryIndex = 0; + + while(retryIndex++ < retryTimes && !this.checkGetEnoughRecordsWithSemantic(expected, result, semantic)) { + result.addAll(reader.poll(Duration.ofMillis(timeoutMs))); + } + + return result; + } + + private boolean checkGetEnoughRecordsWithSemantic(List expected, List result, CheckpointingMode semantic) { + Preconditions.checkNotNull(expected); + Preconditions.checkNotNull(result); + if (CheckpointingMode.EXACTLY_ONCE.equals(semantic)) { + return expected.size() <= result.size(); + } else if (!CheckpointingMode.AT_LEAST_ONCE.equals(semantic)) { + throw new IllegalStateException(String.format("%s delivery guarantee doesn't support test.", semantic.name())); + } else { + Set matchedIndex = new HashSet(); + Iterator var5 = expected.iterator(); + + int before; + do { + if (!var5.hasNext()) { + return true; + } + + T record = (T) var5.next(); + before = matchedIndex.size(); + + for(int i = 0; i < result.size(); ++i) { + if (!matchedIndex.contains(i) && record.equals(result.get(i))) { + matchedIndex.add(i); + break; + } + } + } while(before != matchedIndex.size()); + + return false; + } + } + + private void checkResultWithSemantic(ExternalSystemDataReader reader, List testData, CheckpointingMode semantic) throws Exception { + ArrayList result = new ArrayList(); + CommonTestUtils.waitUntilCondition(() -> { + this.pollAndAppendResultData(result, reader, testData, 30, semantic); + + try { + CollectIteratorAssertions.assertThat(this.sort(result).iterator()).matchesRecordsFromSource(Arrays.asList(this.sort(testData)), semantic); + return true; + } catch (Throwable var6) { + return false; + } + }); + } + + private boolean compareSinkMetrics(MetricQuerier metricQuerier, TestEnvironment testEnv, DataStreamSinkExternalContext context, JobID jobId, String sinkName, String metricsName, long expectedSize) throws Exception { + double sumNumRecordsOut = metricQuerier.getAggregatedMetricsByRestAPI(testEnv.getRestEndpoint(), jobId, sinkName, metricsName, this.getSinkMetricFilter(context)); + if (Precision.equals((double)expectedSize, sumNumRecordsOut)) { + return true; + } else { + LOG.info("expected:<{}> but was <{}>({})", new Object[]{expectedSize, sumNumRecordsOut, metricsName}); + return false; + } + } + + private List sort(List list) { + return (List)list.stream().sorted().collect(Collectors.toList()); + } + + private TestingSinkSettings getTestingSinkSettings(CheckpointingMode checkpointingMode) { + return TestingSinkSettings.builder().setCheckpointingMode(checkpointingMode).build(); + } + + private void killJob(JobClient jobClient) throws Exception { + CommonTestUtils.terminateJob(jobClient); + CommonTestUtils.waitForJobStatus(jobClient, Collections.singletonList(JobStatus.CANCELED)); + } + + private DataStreamSink tryCreateSink(DataStream dataStream, DataStreamSinkExternalContext context, TestingSinkSettings sinkSettings) { + try { + if (context instanceof DataStreamSinkV1ExternalContext) { + Sink sinkV1 = ((DataStreamSinkV1ExternalContext)context).createSink(sinkSettings); + return dataStream.sinkTo(sinkV1); + } else if (context instanceof DataStreamSinkV2ExternalContext) { + org.apache.flink.api.connector.sink2.Sink sinkV2 = ((DataStreamSinkV2ExternalContext)context).createSink(sinkSettings); + return dataStream.sinkTo(sinkV2); + } else { + throw new IllegalArgumentException(String.format("The supported context are DataStreamSinkV1ExternalContext and DataStreamSinkV2ExternalContext, but actual is %s.", context.getClass())); + } + } catch (UnsupportedOperationException var5) { + throw new TestAbortedException("Cannot create a sink satisfying given options.", var5); + } + } + + private String getSinkMetricFilter(DataStreamSinkExternalContext context) { + if (context instanceof DataStreamSinkV1ExternalContext) { + return null; + } else if (context instanceof DataStreamSinkV2ExternalContext) { + return "Writer"; + } else { + throw new IllegalArgumentException(String.format("Get unexpected sink context: %s", context.getClass())); + } + } + + protected CollectResultIterator addCollectSink(DataStream stream) { + TypeSerializer serializer = stream.getType().createSerializer(stream.getExecutionConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator)factory.getOperator(); + CollectStreamSink sink = new CollectStreamSink(stream, factory); + sink.name("Data stream collect sink"); + stream.getExecutionEnvironment().addOperator(sink.getTransformation()); + return new CollectResultIterator(operator.getOperatorIdFuture(), serializer, accumulatorName, stream.getExecutionEnvironment().getCheckpointConfig()); + } + + private void waitExpectedSizeData(CollectResultIterator iterator, int targetNum) { + AssertionsForClassTypes.assertThat(CompletableFuture.supplyAsync(() -> { + int count; + for(count = 0; count < targetNum && iterator.hasNext(); ++count) { + iterator.next(); + } + + if (count < targetNum) { + throw new IllegalStateException(String.format("Fail to get %d records.", targetNum)); + } else { + return true; + } + })).succeedsWithin(ConnectorTestConstants.DEFAULT_COLLECT_DATA_TIMEOUT); + } +} + diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscTestUtils.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscTestUtils.java new file mode 100644 index 0000000..6d19435 --- /dev/null +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/testutils/PscTestUtils.java @@ -0,0 +1,38 @@ +package com.pinterest.flink.connector.psc.sink.testutils; + +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; +import com.pinterest.psc.common.BaseTopicUri; +import com.pinterest.psc.common.TopicUri; +import com.pinterest.psc.common.kafka.KafkaTopicUri; +import com.pinterest.psc.exception.startup.TopicUriSyntaxException; + +import java.util.Properties; + +public class PscTestUtils { + + public static TopicUri getClusterUri() { + try { + return new KafkaTopicUri(BaseTopicUri.validate(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX)); + } catch (TopicUriSyntaxException e) { + throw new RuntimeException(e); + } + } + + public static void injectDiscoveryConfigs(Properties properties, String kafkaContainerbootstrapServers, String clusterUriString) { + String withoutProtocol = kafkaContainerbootstrapServers.split("://")[1]; + String[] components = withoutProtocol.split(","); + String localhostAndPort = null; + for (String component : components) { + if (component.contains("localhost")) { + localhostAndPort = component; + break; + } + } + if (localhostAndPort == null) { + throw new RuntimeException("Cannot find localhost in bootstrap servers"); + } + properties.setProperty("psc.discovery.topic.uri.prefixes", clusterUriString); + properties.setProperty("psc.discovery.connection.urls", localhostAndPort); + properties.setProperty("psc.discovery.security.protocols", "plaintext"); + } +} diff --git a/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java b/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java index c2060ca..0cf84f4 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java +++ b/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java @@ -379,14 +379,7 @@ public void abortTransaction() throws ProducerException { } } - /** - * Prepares the state of this producer to start a transaction. - * - * @throws ProducerException if the producer is already closed, or is not in the proper state to begin a - * transaction. - */ - @InterfaceStability.Evolving - public void beginTransaction() throws ProducerException { + private void beginTransactionInternal() throws ProducerException { ensureOpen(); // the case where no backend producer is created yet @@ -427,6 +420,17 @@ public void beginTransaction() throws ProducerException { } } + /** + * Prepares the state of this producer to start a transaction. + * + * @throws ProducerException if the producer is already closed, or is not in the proper state to begin a + * transaction. + */ + @InterfaceStability.Evolving + public void beginTransaction() throws ProducerException { + beginTransactionInternal(); + } + /** * Initializes the transactional producer in the backend. This moves the transactional state one step further from what * beginTransaction() does as it creates a backend producer and initializes its transactional state. This @@ -446,7 +450,7 @@ protected PscProducerTransactionalProperties initTransactions(String topicUriStr TopicUri topicUri = validateTopicUri(topicUriString); PscBackendProducer backendProducer = getBackendProducerForTopicUri(topicUri); - initTransactions(backendProducer); + initTransactions(backendProducer, true); return backendProducer.getTransactionalProperties(); } @@ -454,9 +458,10 @@ protected PscProducerTransactionalProperties initTransactions(String topicUriStr * Centralized logic for initializing transactions for a given backend producer. * * @param backendProducer the backendProducer to initialize transactions for + * @param callBeginTransaction whether to call beginTransaction after initializing transactions * @throws ProducerException if the producer is already closed, or is not in the proper state to initialize transactions */ - protected void initTransactions(PscBackendProducer backendProducer) throws ProducerException { + protected void initTransactions(PscBackendProducer backendProducer, boolean callBeginTransaction) throws ProducerException { // if (!transactionalStateByBackendProducer.get(backendProducer).equals(TransactionalState.NON_TRANSACTIONAL) && // !transactionalStateByBackendProducer.get(backendProducer).equals(TransactionalState.INIT_AND_BEGUN)) // throw new ProducerException("Invalid transaction state: initializing transactions works only once for a PSC producer."); @@ -473,8 +478,8 @@ protected void initTransactions(PscBackendProducer backendProducer) throws logger.error("initTransactions() on backend producer failed."); throw exception; } - - this.beginTransaction(); + if (callBeginTransaction) + this.beginTransactionInternal(); } /**