diff --git a/pom.xml b/pom.xml
index 48ce35c..ed6cec8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
4.0.0
com.pinterest.psc
psc-java-oss
- 3.1.1
+ 3.1.2-SNAPSHOT
pom
psc-java-oss
diff --git a/psc-common/pom.xml b/psc-common/pom.xml
index 6eb9ce0..494ff0c 100644
--- a/psc-common/pom.xml
+++ b/psc-common/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 3.1.1
+ 3.1.2-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc-examples/pom.xml b/psc-examples/pom.xml
index b7a1963..5629709 100644
--- a/psc-examples/pom.xml
+++ b/psc-examples/pom.xml
@@ -5,13 +5,13 @@
psc-java-oss
com.pinterest.psc
- 3.1.1
+ 3.1.2-SNAPSHOT
../pom.xml
4.0.0
psc-examples
- 3.1.1
+ 3.1.2-SNAPSHOT
psc-examples
diff --git a/psc-flink-logging/pom.xml b/psc-flink-logging/pom.xml
index 2b2b0cf..f9c6604 100644
--- a/psc-flink-logging/pom.xml
+++ b/psc-flink-logging/pom.xml
@@ -5,13 +5,13 @@
psc-java-oss
com.pinterest.psc
- 3.1.1
+ 3.1.2-SNAPSHOT
../pom.xml
4.0.0
psc-flink-logging
- 3.1.1
+ 3.1.2-SNAPSHOT
diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml
index a7f42b4..a874345 100644
--- a/psc-flink/pom.xml
+++ b/psc-flink/pom.xml
@@ -5,7 +5,7 @@
com.pinterest.psc
psc-java-oss
- 3.1.1
+ 3.1.2-SNAPSHOT
../pom.xml
psc-flink
diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java
index ba87d2f..4a81933 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java
@@ -39,7 +39,6 @@
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
-import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties;
import com.pinterest.psc.serde.ByteArraySerializer;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
@@ -1063,25 +1062,28 @@ protected void recoverAndCommit(FlinkPscProducer.PscTransactionState transaction
producer.resumeTransaction(pscProducerTransactionalProperties, topicUris);
commitTransaction(producer);
} catch (FlinkPscException | ProducerException e) {
- // That means we may have committed this transaction before.
- if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) {
- LOG.warn(
- "Unable to commit recovered transaction ({}) because it's in an invalid state. "
- + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
- transaction,
- e);
- }
- if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) {
- LOG.warn(
- "Unable to commit recovered transaction ({}) because its producer is already fenced."
- + " This means that you either have a different producer with the same '{}' or"
- + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
- + " please consult the Flink documentation for more details.",
- transaction,
- ProducerConfig.TRANSACTIONAL_ID_CONFIG,
- ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
- producerConfig.getProperty("psc.producer.transaction.timeout.ms"),
- e);
+ LOG.warn("Encountered exception during recoverAndCommit()", e);
+ if (e.getCause() != null) {
+ // That means we may have committed this transaction before.
+ if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) {
+ LOG.warn(
+ "Unable to commit recovered transaction ({}) because it's in an invalid state. "
+ + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
+ transaction,
+ e);
+ }
+ if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) {
+ LOG.warn(
+ "Unable to commit recovered transaction ({}) because its producer is already fenced."
+ + " This means that you either have a different producer with the same '{}' or"
+ + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ + " please consult the Flink documentation for more details.",
+ transaction,
+ ProducerConfig.TRANSACTIONAL_ID_CONFIG,
+ ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
+ producerConfig.getProperty("psc.producer.transaction.timeout.ms"),
+ e);
+ }
}
} finally {
if (producer != null) {
diff --git a/psc-integration-test/pom.xml b/psc-integration-test/pom.xml
index 3bcf40e..9c11190 100644
--- a/psc-integration-test/pom.xml
+++ b/psc-integration-test/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 3.1.1
+ 3.1.2-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java
index 11506f1..331fe8e 100644
--- a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java
+++ b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java
@@ -21,9 +21,9 @@
import com.pinterest.psc.producer.PscBackendProducer;
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
+import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import com.pinterest.psc.producer.PscProducerUtils;
import com.pinterest.psc.producer.creation.PscKafkaProducerCreator;
-import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties;
import com.pinterest.psc.serde.ByteArraySerializer;
import com.pinterest.psc.serde.IntegerDeserializer;
import com.pinterest.psc.serde.IntegerSerializer;
@@ -434,7 +434,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
Collection backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer);
assertEquals(1, backendProducers.size());
PscBackendProducer backendProducer = backendProducers.iterator().next();
- KafkaProducerTransactionalProperties transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
+ PscProducerTransactionalProperties transactionalProperties = backendProducer.getTransactionalProperties();
long producerId = transactionalProperties.getProducerId();
assertEquals(0, transactionalProperties.getEpoch());
@@ -450,7 +450,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer2);
assertEquals(1, backendProducers.size());
backendProducer = backendProducers.iterator().next();
- transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
+ transactionalProperties = backendProducer.getTransactionalProperties();
// it should bump the epoch each time for the same producer id
assertEquals(producerId, transactionalProperties.getProducerId());
assertEquals(1, transactionalProperties.getEpoch());
@@ -463,7 +463,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer3);
assertEquals(1, backendProducers.size());
backendProducer = backendProducers.iterator().next();
- transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
+ transactionalProperties = backendProducer.getTransactionalProperties();
assertEquals(producerId, transactionalProperties.getProducerId());
assertEquals(2, transactionalProperties.getEpoch());
pscProducer3.abortTransaction();
diff --git a/psc-logging/pom.xml b/psc-logging/pom.xml
index f64fc00..6906f04 100644
--- a/psc-logging/pom.xml
+++ b/psc-logging/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 3.1.1
+ 3.1.2-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc/pom.xml b/psc/pom.xml
index 0770f9c..32f966f 100644
--- a/psc/pom.xml
+++ b/psc/pom.xml
@@ -5,7 +5,7 @@
psc-java-oss
com.pinterest.psc
- 3.1.1
+ 3.1.2-SNAPSHOT
../pom.xml
4.0.0
diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java
index 8b8b41e..6cfdd0b 100644
--- a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java
+++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java
@@ -2,6 +2,10 @@
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
+/**
+ * Moved to PscProducerTransactionalProperties
+ */
+@Deprecated
public class KafkaProducerTransactionalProperties extends PscProducerTransactionalProperties {
public KafkaProducerTransactionalProperties(long producerId, short epoch) {
diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java
index ec1c627..9bbe30c 100644
--- a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java
+++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java
@@ -722,7 +722,7 @@ public void resumeTransaction(PscBackendProducer otherBackendProducer) throws Pr
);
}
- resumeTransaction(new KafkaProducerTransactionalProperties(producerId, epoch));
+ resumeTransaction(new PscProducerTransactionalProperties(producerId, epoch));
}
@Override
@@ -730,17 +730,6 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran
if (kafkaProducer == null)
handleUninitializedKafkaProducer("resumeTransaction()");
- if (!(pscProducerTransactionalProperties instanceof KafkaProducerTransactionalProperties)) {
- handleException(
- new BackendProducerException(
- "[Kafka] Unexpected producer transaction state type: " + pscProducerTransactionalProperties.getClass().getCanonicalName(),
- PscUtils.BACKEND_TYPE_KAFKA
- ), true
- );
- }
-
- KafkaProducerTransactionalProperties kafkaProducerTransactionalProperties = (KafkaProducerTransactionalProperties) pscProducerTransactionalProperties;
-
try {
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
synchronized (kafkaProducer) {
@@ -758,8 +747,8 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran
PscCommon.invoke(topicPartitionBookkeeper, "reset");
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
- PscCommon.setField(producerIdAndEpoch, "producerId", kafkaProducerTransactionalProperties.getProducerId());
- PscCommon.setField(producerIdAndEpoch, "epoch", kafkaProducerTransactionalProperties.getEpoch());
+ PscCommon.setField(producerIdAndEpoch, "producerId", pscProducerTransactionalProperties.getProducerId());
+ PscCommon.setField(producerIdAndEpoch, "epoch", pscProducerTransactionalProperties.getEpoch());
PscCommon.invoke(
transactionManager,
@@ -791,7 +780,7 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr
Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
- return new KafkaProducerTransactionalProperties(
+ return new PscProducerTransactionalProperties(
(long) PscCommon.getField(producerIdAndEpoch, "producerId"),
(short) PscCommon.getField(producerIdAndEpoch, "epoch")
);