Skip to content

Commit

Permalink
WIP fixed Test Sink with lower parallelism in PscSinkITCase integrati…
Browse files Browse the repository at this point in the history
…on tests
  • Loading branch information
jeffxiang committed Sep 24, 2024
1 parent 85c68ba commit 864ea72
Show file tree
Hide file tree
Showing 13 changed files with 430 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void initTransactions() throws ProducerException {
throw new IllegalStateException("initTransactions() can only be called when there is exactly one backend producer" +
" already created.");
PscBackendProducer<K, V> backendProducer = getBackendProducers().iterator().next();
initTransactions(backendProducer);
initTransactions(backendProducer, false);
}

public void setTransactionId(String transactionalId) throws ProducerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ public PscProducerMessage<byte[], byte[]> serialize(
targetTopicUri,
context.getPartitionsForTopicUri(targetTopicUri)))
: OptionalInt.empty();

return new PscProducerMessage<>(
targetTopicUri,
partition.isPresent() ? partition.getAsInt() : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ private void abortTransactionsWithPrefix(String prefix, long startCheckpointId)
*/
private int abortTransactionOfSubtask(String prefix, long startCheckpointId, int subtaskId) throws ProducerException {
int numTransactionAborted = 0;
int numCalled = 0;
int numCalled1 = 0;
for (long checkpointId = startCheckpointId; ; checkpointId++, numTransactionAborted++) {
// initTransactions fences all old transactions with the same id by bumping the epoch
String transactionalId =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void testInitTransactionId() throws IOException {
reuse = new FlinkPscInternalProducer<>(getProperties(), "dummy");
for (int i = 1; i <= numTransactions; i++) {
reuse.initTransactionId(TRANSACTION_PREFIX + i);
reuse.beginTransaction();
reuse.send(new PscProducerMessage<>(topicUriStr, "test-value-" + i));
if (i % 2 == 0) {
reuse.commitTransaction();
Expand All @@ -107,11 +108,13 @@ void testResetInnerTransactionIfFinalizingTransactionFailed(
try (FlinkPscInternalProducer<String, String> fenced =
new FlinkPscInternalProducer<>(getProperties(), "dummy")) {
fenced.initTransactions();
fenced.beginTransaction();
fenced.send(new PscProducerMessage<>(topicUriStr, "test-value"));
// Start a second producer that fences the first one
try (FlinkPscInternalProducer<String, String> producer =
new FlinkPscInternalProducer<>(getProperties(), "dummy")) {
producer.initTransactions();
producer.beginTransaction();
producer.send(new PscProducerMessage<>(topicUriStr, "test-value"));
producer.commitTransaction();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/** Tests for {@link KafkaRecordSerializationSchemaBuilder}. */
/** Tests for {@link PscRecordSerializationSchemaBuilder}. */
public class PscRecordSerializationSchemaBuilderTest extends TestLogger {

private static final String DEFAULT_TOPIC = "test";
Expand Down Expand Up @@ -229,13 +229,15 @@ public void testSerializeRecordWithTimestamp() {
schema.serialize("a", null, 0L);
assertEquals(0L, (long) recordWithTimestampZero.getPublishTimestamp());

final PscProducerMessage<byte[], byte[]> recordWithoutTimestamp =
schema.serialize("a", null, null);
assertNull(recordWithoutTimestamp.getPublishTimestamp());
// the below tests are commented out because PSC core injects the timestamp if it's null

final PscProducerMessage<byte[], byte[]> recordWithInvalidTimestamp =
schema.serialize("a", null, -100L);
assertNull(recordWithInvalidTimestamp.getPublishTimestamp());
// final PscProducerMessage<byte[], byte[]> recordWithoutTimestamp =
// schema.serialize("a", null, null);
// assertNull(recordWithoutTimestamp.getPublishTimestamp());
//
// final PscProducerMessage<byte[], byte[]> recordWithInvalidTimestamp =
// schema.serialize("a", null, -100L);
// assertNull(recordWithInvalidTimestamp.getPublishTimestamp());
}

private static void assertOnlyOneSerializerAllowed(
Expand Down Expand Up @@ -288,7 +290,7 @@ private static void assertOnlyOneSerializerAllowed(

/**
* Serializer based on Kafka's serialization stack. This is the special case that implements
* {@link Configurable}
* {@link PscPlugin}
*
* <p>This class must be public to make it instantiable by the tests.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.pinterest.flink.connector.psc.sink;

import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -27,7 +28,7 @@

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;

/** Tests for {@link KafkaSinkBuilder}. */
/** Tests for {@link PscSinkBuilder}. */
@ExtendWith(TestLoggerExtension.class)
public class PscSinkBuilderTest {

Expand All @@ -40,7 +41,7 @@ public void testBootstrapServerSettingWithProperties() {
.setPscProducerConfig(testConf)
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString("topic")
.setTopicUriString(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + "topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@

package com.pinterest.flink.connector.psc.sink;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.sink.testutils.PscSinkExternalContextFactory;
import com.pinterest.flink.connector.psc.sink.testutils.PscSinkTestSuiteBase;
import com.pinterest.flink.connector.psc.testutils.PscUtil;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationUtils;
import com.pinterest.psc.consumer.PscConsumer;
Expand Down Expand Up @@ -108,6 +111,7 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs;
import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer;
import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.hamcrest.CoreMatchers.containsString;
Expand All @@ -118,7 +122,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/** Tests for using KafkaSink writing to a Kafka cluster. */
/** Tests for using PscSink writing to a Kafka cluster. */
public class PscSinkITCase extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(PscSinkITCase.class);
Expand All @@ -129,6 +133,7 @@ public class PscSinkITCase extends TestLogger {
private static AdminClient admin;

private String topic;
private String topicUriStr;
private SharedReference<AtomicLong> emittedRecordsCount;
private SharedReference<AtomicLong> emittedRecordsWithCheckpoint;
private SharedReference<AtomicBoolean> failed;
Expand Down Expand Up @@ -167,6 +172,7 @@ public void setUp() throws ExecutionException, InterruptedException, TimeoutExce
lastCheckpointedRecord = sharedObjects.add(new AtomicLong(0));
topic = UUID.randomUUID().toString();
createTestTopic(topic, 1, TOPIC_REPLICATION_FACTOR);
topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic;
}

@After
Expand All @@ -176,7 +182,7 @@ public void tearDown() throws ExecutionException, InterruptedException, TimeoutE

/** Integration test based on connector testing framework. */
@Nested
class IntegrationTests extends SinkTestSuiteBase<String> {
class IntegrationTests extends PscSinkTestSuiteBase<String> {
// Defines test environment on Flink MiniCluster
@SuppressWarnings("unused")
@TestEnv
Expand Down Expand Up @@ -370,7 +376,7 @@ private void testRecoveryWithAssertion(
// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString(topic)
.setTopicUriString(topicUriStr)
.setValueSerializationSchema(new RecordSerializer())
.build())
.setTransactionalIdPrefix("kafka-sink")
Expand All @@ -392,13 +398,18 @@ private void writeRecordsToKafka(
env.addSource(
new InfiniteIntegerSource(
emittedRecordsCount, emittedRecordsWithCheckpoint));
Properties producerProperties = new Properties();
producerProperties.setProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "test-client");
producerProperties.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG,PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
injectDiscoveryConfigs(producerProperties, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
source.sinkTo(
new PscSinkBuilder<Long>()
.setPscProducerConfig(producerProperties)
// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setDeliverGuarantee(deliveryGuarantee)
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString(topic)
.setTopicUriString(topicUriStr)
.setValueSerializationSchema(new RecordSerializer())
.build())
.setTransactionalIdPrefix("kafka-sink")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception {
new FlinkPscInternalProducer<>(properties, transactionalId)) {

producer.initTransactions();
producer.beginTransaction();
// producer.beginTransaction();
producer.send(new PscProducerMessage<>(topic, "2".getBytes()));
producer.commitTransaction();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.pinterest.flink.connector.psc.sink.testutils;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.sink.PscRecordSerializationSchema;
import com.pinterest.flink.connector.psc.sink.PscSink;
import com.pinterest.flink.connector.psc.sink.PscSinkBuilder;
Expand All @@ -27,6 +28,8 @@
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.serde.StringDeserializer;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
Expand All @@ -35,14 +38,10 @@
import org.apache.flink.connector.testframe.external.sink.DataStreamSinkV2ExternalContext;
import org.apache.flink.connector.testframe.external.sink.TestingSinkSettings;
import org.apache.flink.streaming.api.CheckpointingMode;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
Expand All @@ -61,6 +60,7 @@
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs;
import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;

/** A Kafka external context that will create only one topic and use partitions in that topic. */
Expand All @@ -79,6 +79,7 @@ public class PscSinkExternalContext implements DataStreamSinkV2ExternalContext<S
protected TopicUri clusterUri;

protected final String topic;
protected final String topicUriString;

private final List<ExternalSystemDataReader<String>> readers = new ArrayList<>();

Expand All @@ -94,6 +95,7 @@ public PscSinkExternalContext(String bootstrapServers, TopicUri clusterUri, List
this.connectorJarPaths = connectorJarPaths;
this.topic =
TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
this.topicUriString = clusterUri.getTopicUriAsString() + topic;
kafkaAdminClient = createAdminClient();
}

Expand Down Expand Up @@ -139,13 +141,16 @@ public Sink<String> createSink(TestingSinkSettings sinkSettings) {
final Properties properties = new Properties();
properties.put(
PscConfiguration.PSC_PRODUCER_TRANSACTION_TIMEOUT_MS, DEFAULT_TRANSACTION_TIMEOUT_IN_MS);
properties.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "PscSinkExternalContext");
injectDiscoveryConfigs(properties, bootstrapServers, clusterUri.getTopicUriAsString());
properties.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, clusterUri.getTopicUriAsString());
builder // TODO: might need to set cluster URI
.setDeliverGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode()))
.setTransactionalIdPrefix("testingFramework")
.setPscProducerConfig(properties)
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString(topic)
.setTopicUriString(topicUriString)
.setValueSerializationSchema(new SimpleStringSchema())
.build());
return builder.build();
Expand Down Expand Up @@ -180,6 +185,8 @@ public ExternalSystemDataReader<String> createSinkDataReader(TestingSinkSettings
properties.setProperty(PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL, PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL_TRANSACTIONAL);
}
properties.setProperty(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
properties.setProperty(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "PscSinkExternalContext");
injectDiscoveryConfigs(properties, bootstrapServers, clusterUri.getTopicUriAsString());
try {
readers.add(new PscDataReader(properties, subscribedPartitions));
} catch (ConfigurationException | ConsumerException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.pinterest.flink.connector.psc.sink.testutils;

import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.common.BaseTopicUri;
import com.pinterest.psc.common.TopicUri;
import com.pinterest.psc.common.kafka.KafkaTopicUri;
Expand All @@ -27,6 +28,7 @@

import java.net.URL;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;

/** Kafka sink external context factory. */
Expand All @@ -49,16 +51,8 @@ private String getBootstrapServer() {
return String.join(",", kafkaContainer.getBootstrapServers(), internalEndpoints);
}

private TopicUri getClusterUri() {
try {
return new KafkaTopicUri(BaseTopicUri.validate("plaintext:/rn:kafka:env:cloud_region::test_cluster:"));
} catch (TopicUriSyntaxException e) {
throw new RuntimeException(e);
}
}

@Override
public PscSinkExternalContext createExternalContext(String testName) {
return new PscSinkExternalContext(getBootstrapServer(), getClusterUri(), connectorJars);
return new PscSinkExternalContext(getBootstrapServer(), PscTestUtils.getClusterUri(), connectorJars);
}
}
Loading

0 comments on commit 864ea72

Please sign in to comment.