Skip to content

Commit

Permalink
WIP finish PscTransactionLogITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Sep 26, 2024
1 parent 602c345 commit a2cfd2e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@

package com.pinterest.flink.connector.psc.sink;

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.config.PscConfiguration;
import com.pinterest.psc.config.PscConfigurationUtils;
import com.pinterest.psc.exception.consumer.ConsumerException;
import com.pinterest.psc.exception.producer.ProducerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.serde.ByteArraySerializer;
import com.pinterest.psc.serde.IntegerSerializer;
import org.apache.flink.util.TestLogger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -44,6 +47,7 @@
import static com.pinterest.flink.connector.psc.sink.PscTransactionLog.TransactionState.Ongoing;
import static com.pinterest.flink.connector.psc.sink.PscTransactionLog.TransactionState.PrepareAbort;
import static com.pinterest.flink.connector.psc.sink.PscTransactionLog.TransactionState.PrepareCommit;
import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs;
import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer;
import static org.apache.flink.util.DockerImageVersions.KAFKA;
import static org.hamcrest.Matchers.containsInAnyOrder;
Expand All @@ -53,29 +57,35 @@
public class PscTransactionLogITCase extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(PscSinkITCase.class);
private static final String TOPIC_NAME = "kafkaTransactionLogTest";
private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log";
private static final String TOPIC_URI_STR = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString() + "kafkaTransactionLogTest";
private static final String TRANSACTIONAL_ID_PREFIX = "psc-log";

@ClassRule
public static final KafkaContainer KAFKA_CONTAINER =
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();

private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>();
private final List<PscProducer<byte[], Integer>> openProducers = new ArrayList<>();

@After
public void tearDown() {
openProducers.forEach(Producer::close);
openProducers.forEach(p -> {
try {
p.close();
} catch (Exception e) {
LOG.warn("Error closing producer", e);
}
});
}

@Test
public void testGetTransactions() throws ConfigurationException, ConsumerException {
public void testGetTransactions() throws ConfigurationException, ConsumerException, ProducerException {
committedTransaction(1);
abortedTransaction(2);
lingeringTransaction(3);
lingeringTransaction(4);

final PscTransactionLog transactionLog =
new PscTransactionLog(null, getKafkaClientConfiguration()); // TODO: clusterUriStr needs to be replaced
new PscTransactionLog(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString(), getKafkaClientConfiguration());
final List<PscTransactionLog.TransactionRecord> transactions = transactionLog.getTransactions();
assertThat(
transactions,
Expand All @@ -94,45 +104,54 @@ public void testGetTransactions() throws ConfigurationException, ConsumerExcepti
new PscTransactionLog.TransactionRecord(buildTransactionalId(4), Ongoing)));
}

private void committedTransaction(long id) {
private void committedTransaction(long id) throws ConfigurationException, ProducerException {
submitTransaction(
id,
producer -> {
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC_NAME, 0, null, null, 1));
producer.flush();
producer.commitTransaction();
producer.flush();
try {
producer.beginTransaction();
producer.send(new PscProducerMessage<>(TOPIC_URI_STR, 0, null, 1, null));
producer.flush();
producer.commitTransaction();
producer.flush();
} catch (ProducerException | ConfigurationException e) {
throw new RuntimeException(e);
}
});
}

private void lingeringTransaction(long id) {
private void lingeringTransaction(long id) throws ConfigurationException, ProducerException {
submitTransaction(
id,
producer -> {
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC_NAME, 0, null, null, 1));
producer.flush();
try {
producer.beginTransaction();
producer.send(new PscProducerMessage<>(TOPIC_URI_STR, 0, null, 1, null));
producer.flush();
} catch (ConfigurationException | ProducerException e) {
throw new RuntimeException(e);
}
});
}

private void abortedTransaction(long id) {
private void abortedTransaction(long id) throws ConfigurationException, ProducerException {
submitTransaction(
id,
producer -> {
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC_NAME, 0, null, null, 1));
producer.flush();
producer.abortTransaction();
producer.flush();
try {
producer.beginTransaction();
producer.send(new PscProducerMessage<>(TOPIC_URI_STR, 0, null, 1, null));
producer.flush();
producer.abortTransaction();
producer.flush();
} catch (ProducerException | ConfigurationException e) {
throw new RuntimeException(e);
}
});
}

private void submitTransaction(long id, Consumer<Producer<byte[], Integer>> producerAction) {
Producer<byte[], Integer> producer = createProducer(buildTransactionalId(id));
private void submitTransaction(long id, Consumer<PscProducer<byte[], Integer>> producerAction) throws ConfigurationException, ProducerException {
PscProducer<byte[], Integer> producer = createProducer(buildTransactionalId(id));
openProducers.add(producer);
producerAction.accept(producer);
// don't close here for lingering transactions
Expand All @@ -142,23 +161,31 @@ private static String buildTransactionalId(long id) {
return TRANSACTIONAL_ID_PREFIX + id;
}

private static Producer<byte[], Integer> createProducer(String transactionalId) {
private static PscProducer<byte[], Integer> createProducer(String transactionalId) throws ConfigurationException, ProducerException {
final Properties producerProperties = getKafkaClientConfiguration();
producerProperties.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName());
producerProperties.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
return new KafkaProducer<>(producerProperties);
PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, IntegerSerializer.class.getName());
producerProperties.put(PscConfiguration.PSC_PRODUCER_TRANSACTIONAL_ID, transactionalId);
producerProperties.put(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString());
return new PscProducer<>(PscConfigurationUtils.propertiesToPscConfiguration(producerProperties));
}

private static Properties getKafkaClientConfiguration() {
final Properties standardProps = new Properties();
standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put("group.id", "flink-tests");
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.id.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
// standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "flink-tests");
standardProps.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false);
standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST);
standardProps.put(PscConfiguration.PSC_CONSUMER_PARTITION_FETCH_MAX_BYTES, 256);
standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "flink-tests");
injectDiscoveryConfigs(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString());

// standardProps.put("group.id", "flink-tests");
// standardProps.put("enable.auto.commit", false);
// standardProps.put("auto.id.reset", "earliest");
// standardProps.put("max.partition.fetch.bytes", 256);
return standardProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public static List<PscConsumerMessage<byte[], byte[]>> drainAllRecordsFromTopic(
consumerConfig.put(
PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL,
committed ? PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL_TRANSACTIONAL : PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL_NON_TRANSACTIONAL);
consumerConfig.put(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "psc-test-consumer");
return drainAllRecordsFromTopic(topicUriStr, consumerConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public abstract class PscTestEnvironmentWithKafkaAsPubSub {

static {
try {
PSC_TEST_CLUSTER_URI = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_PREFIX + "cluster1");
PSC_TEST_CLUSTER_URI = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_PREFIX);
} catch (TopicUriSyntaxException e) {
throw new RuntimeException("Unable to validate clusterUri", e);
}
Expand Down

0 comments on commit a2cfd2e

Please sign in to comment.