From a2cfd2e52bdb8006d40cff810d5e6dd1049e2213 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 26 Sep 2024 14:05:59 -0400 Subject: [PATCH] WIP finish PscTransactionLogITCase --- .../psc/sink/PscTransactionLogITCase.java | 113 +++++++++++------- .../connector/psc/testutils/PscUtil.java | 1 + .../PscTestEnvironmentWithKafkaAsPubSub.java | 2 +- 3 files changed, 72 insertions(+), 44 deletions(-) diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java index 8612738..73217f2 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java @@ -17,15 +17,18 @@ package com.pinterest.flink.connector.psc.sink; +import com.pinterest.flink.connector.psc.PscFlinkConfiguration; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; +import com.pinterest.psc.config.PscConfiguration; +import com.pinterest.psc.config.PscConfigurationUtils; import com.pinterest.psc.exception.consumer.ConsumerException; +import com.pinterest.psc.exception.producer.ProducerException; import com.pinterest.psc.exception.startup.ConfigurationException; +import com.pinterest.psc.producer.PscProducer; +import com.pinterest.psc.producer.PscProducerMessage; +import com.pinterest.psc.serde.ByteArraySerializer; +import com.pinterest.psc.serde.IntegerSerializer; import org.apache.flink.util.TestLogger; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.serialization.IntegerSerializer; import org.junit.After; import org.junit.ClassRule; import org.junit.Test; @@ -44,6 +47,7 @@ import static com.pinterest.flink.connector.psc.sink.PscTransactionLog.TransactionState.Ongoing; import static com.pinterest.flink.connector.psc.sink.PscTransactionLog.TransactionState.PrepareAbort; import static com.pinterest.flink.connector.psc.sink.PscTransactionLog.TransactionState.PrepareCommit; +import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs; import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer; import static org.apache.flink.util.DockerImageVersions.KAFKA; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -53,29 +57,35 @@ public class PscTransactionLogITCase extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(PscSinkITCase.class); - private static final String TOPIC_NAME = "kafkaTransactionLogTest"; - private static final String TRANSACTIONAL_ID_PREFIX = "kafka-log"; + private static final String TOPIC_URI_STR = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString() + "kafkaTransactionLogTest"; + private static final String TRANSACTIONAL_ID_PREFIX = "psc-log"; @ClassRule public static final KafkaContainer KAFKA_CONTAINER = createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper(); - private final List> openProducers = new ArrayList<>(); + private final List> openProducers = new ArrayList<>(); @After public void tearDown() { - openProducers.forEach(Producer::close); + openProducers.forEach(p -> { + try { + p.close(); + } catch (Exception e) { + LOG.warn("Error closing producer", e); + } + }); } @Test - public void testGetTransactions() throws ConfigurationException, ConsumerException { + public void testGetTransactions() throws ConfigurationException, ConsumerException, ProducerException { committedTransaction(1); abortedTransaction(2); lingeringTransaction(3); lingeringTransaction(4); final PscTransactionLog transactionLog = - new PscTransactionLog(null, getKafkaClientConfiguration()); // TODO: clusterUriStr needs to be replaced + new PscTransactionLog(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString(), getKafkaClientConfiguration()); final List transactions = transactionLog.getTransactions(); assertThat( transactions, @@ -94,45 +104,54 @@ public void testGetTransactions() throws ConfigurationException, ConsumerExcepti new PscTransactionLog.TransactionRecord(buildTransactionalId(4), Ongoing))); } - private void committedTransaction(long id) { + private void committedTransaction(long id) throws ConfigurationException, ProducerException { submitTransaction( id, producer -> { - producer.initTransactions(); - producer.beginTransaction(); - producer.send(new ProducerRecord<>(TOPIC_NAME, 0, null, null, 1)); - producer.flush(); - producer.commitTransaction(); - producer.flush(); + try { + producer.beginTransaction(); + producer.send(new PscProducerMessage<>(TOPIC_URI_STR, 0, null, 1, null)); + producer.flush(); + producer.commitTransaction(); + producer.flush(); + } catch (ProducerException | ConfigurationException e) { + throw new RuntimeException(e); + } }); } - private void lingeringTransaction(long id) { + private void lingeringTransaction(long id) throws ConfigurationException, ProducerException { submitTransaction( id, producer -> { - producer.initTransactions(); - producer.beginTransaction(); - producer.send(new ProducerRecord<>(TOPIC_NAME, 0, null, null, 1)); - producer.flush(); + try { + producer.beginTransaction(); + producer.send(new PscProducerMessage<>(TOPIC_URI_STR, 0, null, 1, null)); + producer.flush(); + } catch (ConfigurationException | ProducerException e) { + throw new RuntimeException(e); + } }); } - private void abortedTransaction(long id) { + private void abortedTransaction(long id) throws ConfigurationException, ProducerException { submitTransaction( id, producer -> { - producer.initTransactions(); - producer.beginTransaction(); - producer.send(new ProducerRecord<>(TOPIC_NAME, 0, null, null, 1)); - producer.flush(); - producer.abortTransaction(); - producer.flush(); + try { + producer.beginTransaction(); + producer.send(new PscProducerMessage<>(TOPIC_URI_STR, 0, null, 1, null)); + producer.flush(); + producer.abortTransaction(); + producer.flush(); + } catch (ProducerException | ConfigurationException e) { + throw new RuntimeException(e); + } }); } - private void submitTransaction(long id, Consumer> producerAction) { - Producer producer = createProducer(buildTransactionalId(id)); + private void submitTransaction(long id, Consumer> producerAction) throws ConfigurationException, ProducerException { + PscProducer producer = createProducer(buildTransactionalId(id)); openProducers.add(producer); producerAction.accept(producer); // don't close here for lingering transactions @@ -142,23 +161,31 @@ private static String buildTransactionalId(long id) { return TRANSACTIONAL_ID_PREFIX + id; } - private static Producer createProducer(String transactionalId) { + private static PscProducer createProducer(String transactionalId) throws ConfigurationException, ProducerException { final Properties producerProperties = getKafkaClientConfiguration(); producerProperties.put( - ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); + PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName()); producerProperties.put( - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); - producerProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - return new KafkaProducer<>(producerProperties); + PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, IntegerSerializer.class.getName()); + producerProperties.put(PscConfiguration.PSC_PRODUCER_TRANSACTIONAL_ID, transactionalId); + producerProperties.put(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString()); + return new PscProducer<>(PscConfigurationUtils.propertiesToPscConfiguration(producerProperties)); } private static Properties getKafkaClientConfiguration() { final Properties standardProps = new Properties(); - standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); - standardProps.put("group.id", "flink-tests"); - standardProps.put("enable.auto.commit", false); - standardProps.put("auto.id.reset", "earliest"); - standardProps.put("max.partition.fetch.bytes", 256); +// standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); + standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "flink-tests"); + standardProps.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false); + standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST); + standardProps.put(PscConfiguration.PSC_CONSUMER_PARTITION_FETCH_MAX_BYTES, 256); + standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "flink-tests"); + injectDiscoveryConfigs(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString()); + +// standardProps.put("group.id", "flink-tests"); +// standardProps.put("enable.auto.commit", false); +// standardProps.put("auto.id.reset", "earliest"); +// standardProps.put("max.partition.fetch.bytes", 256); return standardProps; } } diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscUtil.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscUtil.java index cbabb05..ee6988b 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscUtil.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/testutils/PscUtil.java @@ -123,6 +123,7 @@ public static List> drainAllRecordsFromTopic( consumerConfig.put( PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL, committed ? PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL_TRANSACTIONAL : PscConfiguration.PSC_CONSUMER_ISOLATION_LEVEL_NON_TRANSACTIONAL); + consumerConfig.put(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "psc-test-consumer"); return drainAllRecordsFromTopic(topicUriStr, consumerConfig); } diff --git a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java index 3541f53..1fe75af 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java +++ b/psc-flink/src/test/java/com/pinterest/flink/streaming/connectors/psc/PscTestEnvironmentWithKafkaAsPubSub.java @@ -55,7 +55,7 @@ public abstract class PscTestEnvironmentWithKafkaAsPubSub { static { try { - PSC_TEST_CLUSTER_URI = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_PREFIX + "cluster1"); + PSC_TEST_CLUSTER_URI = KafkaTopicUri.validate(PSC_TEST_TOPIC_URI_PREFIX); } catch (TopicUriSyntaxException e) { throw new RuntimeException("Unable to validate clusterUri", e); }