From b31fd21e5e0ff537c305a06f0d57969d497f9d97 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 7 Dec 2023 11:58:54 -0500 Subject: [PATCH] Try to fix NPE in checkpoint recovery by deprecating KafkaProducerTransactionalProperties --- .../connectors/psc/FlinkPscProducer.java | 1 - .../producer/TestOneKafkaBackend.java | 8 ++++---- .../KafkaProducerTransactionalProperties.java | 4 ++++ .../psc/producer/kafka/PscKafkaProducer.java | 19 ++++--------------- 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java index 8d3b91c..68f9b6b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java @@ -39,7 +39,6 @@ import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; import com.pinterest.psc.producer.PscProducerTransactionalProperties; -import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties; import com.pinterest.psc.serde.ByteArraySerializer; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java index f9da346..5591598 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java @@ -21,9 +21,9 @@ import com.pinterest.psc.producer.PscBackendProducer; import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; +import com.pinterest.psc.producer.PscProducerTransactionalProperties; import com.pinterest.psc.producer.PscProducerUtils; import com.pinterest.psc.producer.creation.PscKafkaProducerCreator; -import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties; import com.pinterest.psc.serde.ByteArraySerializer; import com.pinterest.psc.serde.IntegerDeserializer; import com.pinterest.psc.serde.IntegerSerializer; @@ -434,7 +434,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept Collection backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer); assertEquals(1, backendProducers.size()); PscBackendProducer backendProducer = backendProducers.iterator().next(); - KafkaProducerTransactionalProperties transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + PscProducerTransactionalProperties transactionalProperties = backendProducer.getTransactionalProperties(); long producerId = transactionalProperties.getProducerId(); assertEquals(0, transactionalProperties.getEpoch()); @@ -450,7 +450,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer2); assertEquals(1, backendProducers.size()); backendProducer = backendProducers.iterator().next(); - transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + transactionalProperties = backendProducer.getTransactionalProperties(); // it should bump the epoch each time for the same producer id assertEquals(producerId, transactionalProperties.getProducerId()); assertEquals(1, transactionalProperties.getEpoch()); @@ -463,7 +463,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer3); assertEquals(1, backendProducers.size()); backendProducer = backendProducers.iterator().next(); - transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + transactionalProperties = backendProducer.getTransactionalProperties(); assertEquals(producerId, transactionalProperties.getProducerId()); assertEquals(2, transactionalProperties.getEpoch()); pscProducer3.abortTransaction(); diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java index 8b8b41e..6cfdd0b 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java +++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java @@ -2,6 +2,10 @@ import com.pinterest.psc.producer.PscProducerTransactionalProperties; +/** + * Moved to PscProducerTransactionalProperties + */ +@Deprecated public class KafkaProducerTransactionalProperties extends PscProducerTransactionalProperties { public KafkaProducerTransactionalProperties(long producerId, short epoch) { diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java index 3632cb9..f666104 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java +++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java @@ -716,7 +716,7 @@ public void resumeTransaction(PscBackendProducer otherBackendProducer) throws Pr ); } - resumeTransaction(new KafkaProducerTransactionalProperties(producerId, epoch)); + resumeTransaction(new PscProducerTransactionalProperties(producerId, epoch)); } @Override @@ -724,17 +724,6 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran if (kafkaProducer == null) handleUninitializedKafkaProducer("resumeTransaction()"); - if (!(pscProducerTransactionalProperties instanceof KafkaProducerTransactionalProperties)) { - handleException( - new BackendProducerException( - "[Kafka] Unexpected producer transaction state type: " + pscProducerTransactionalProperties.getClass().getCanonicalName(), - PscUtils.BACKEND_TYPE_KAFKA - ), true - ); - } - - KafkaProducerTransactionalProperties kafkaProducerTransactionalProperties = (KafkaProducerTransactionalProperties) pscProducerTransactionalProperties; - try { Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager"); synchronized (kafkaProducer) { @@ -752,8 +741,8 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran PscCommon.invoke(topicPartitionBookkeeper, "reset"); Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch"); - PscCommon.setField(producerIdAndEpoch, "producerId", kafkaProducerTransactionalProperties.getProducerId()); - PscCommon.setField(producerIdAndEpoch, "epoch", kafkaProducerTransactionalProperties.getEpoch()); + PscCommon.setField(producerIdAndEpoch, "producerId", pscProducerTransactionalProperties.getProducerId()); + PscCommon.setField(producerIdAndEpoch, "epoch", pscProducerTransactionalProperties.getEpoch()); PscCommon.invoke( transactionManager, @@ -785,7 +774,7 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager"); Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch"); - return new KafkaProducerTransactionalProperties( + return new PscProducerTransactionalProperties( (long) PscCommon.getField(producerIdAndEpoch, "producerId"), (short) PscCommon.getField(producerIdAndEpoch, "epoch") );