diff --git a/pom.xml b/pom.xml index 48ce35c..ed6cec8 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.pinterest.psc psc-java-oss - 3.1.1 + 3.1.2-SNAPSHOT pom psc-java-oss diff --git a/psc-common/pom.xml b/psc-common/pom.xml index 6eb9ce0..494ff0c 100644 --- a/psc-common/pom.xml +++ b/psc-common/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 3.1.1 + 3.1.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/psc-examples/pom.xml b/psc-examples/pom.xml index b7a1963..5629709 100644 --- a/psc-examples/pom.xml +++ b/psc-examples/pom.xml @@ -5,13 +5,13 @@ psc-java-oss com.pinterest.psc - 3.1.1 + 3.1.2-SNAPSHOT ../pom.xml 4.0.0 psc-examples - 3.1.1 + 3.1.2-SNAPSHOT psc-examples diff --git a/psc-flink-logging/pom.xml b/psc-flink-logging/pom.xml index 2b2b0cf..f9c6604 100644 --- a/psc-flink-logging/pom.xml +++ b/psc-flink-logging/pom.xml @@ -5,13 +5,13 @@ psc-java-oss com.pinterest.psc - 3.1.1 + 3.1.2-SNAPSHOT ../pom.xml 4.0.0 psc-flink-logging - 3.1.1 + 3.1.2-SNAPSHOT diff --git a/psc-flink/pom.xml b/psc-flink/pom.xml index a7f42b4..a874345 100644 --- a/psc-flink/pom.xml +++ b/psc-flink/pom.xml @@ -5,7 +5,7 @@ com.pinterest.psc psc-java-oss - 3.1.1 + 3.1.2-SNAPSHOT ../pom.xml psc-flink diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java index ba87d2f..4a81933 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java @@ -39,7 +39,6 @@ import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; import com.pinterest.psc.producer.PscProducerTransactionalProperties; -import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties; import com.pinterest.psc.serde.ByteArraySerializer; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; @@ -1063,25 +1062,28 @@ protected void recoverAndCommit(FlinkPscProducer.PscTransactionState transaction producer.resumeTransaction(pscProducerTransactionalProperties, topicUris); commitTransaction(producer); } catch (FlinkPscException | ProducerException e) { - // That means we may have committed this transaction before. - if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) { - LOG.warn( - "Unable to commit recovered transaction ({}) because it's in an invalid state. " - + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", - transaction, - e); - } - if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) { - LOG.warn( - "Unable to commit recovered transaction ({}) because its producer is already fenced." - + " This means that you either have a different producer with the same '{}' or" - + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," - + " please consult the Flink documentation for more details.", - transaction, - ProducerConfig.TRANSACTIONAL_ID_CONFIG, - ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, - producerConfig.getProperty("psc.producer.transaction.timeout.ms"), - e); + LOG.warn("Encountered exception during recoverAndCommit()", e); + if (e.getCause() != null) { + // That means we may have committed this transaction before. + if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) { + LOG.warn( + "Unable to commit recovered transaction ({}) because it's in an invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", + transaction, + e); + } + if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) { + LOG.warn( + "Unable to commit recovered transaction ({}) because its producer is already fenced." + + " This means that you either have a different producer with the same '{}' or" + + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," + + " please consult the Flink documentation for more details.", + transaction, + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, + producerConfig.getProperty("psc.producer.transaction.timeout.ms"), + e); + } } } finally { if (producer != null) { diff --git a/psc-integration-test/pom.xml b/psc-integration-test/pom.xml index 3bcf40e..9c11190 100644 --- a/psc-integration-test/pom.xml +++ b/psc-integration-test/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 3.1.1 + 3.1.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java index 11506f1..331fe8e 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java @@ -21,9 +21,9 @@ import com.pinterest.psc.producer.PscBackendProducer; import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; +import com.pinterest.psc.producer.PscProducerTransactionalProperties; import com.pinterest.psc.producer.PscProducerUtils; import com.pinterest.psc.producer.creation.PscKafkaProducerCreator; -import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties; import com.pinterest.psc.serde.ByteArraySerializer; import com.pinterest.psc.serde.IntegerDeserializer; import com.pinterest.psc.serde.IntegerSerializer; @@ -434,7 +434,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept Collection backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer); assertEquals(1, backendProducers.size()); PscBackendProducer backendProducer = backendProducers.iterator().next(); - KafkaProducerTransactionalProperties transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + PscProducerTransactionalProperties transactionalProperties = backendProducer.getTransactionalProperties(); long producerId = transactionalProperties.getProducerId(); assertEquals(0, transactionalProperties.getEpoch()); @@ -450,7 +450,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer2); assertEquals(1, backendProducers.size()); backendProducer = backendProducers.iterator().next(); - transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + transactionalProperties = backendProducer.getTransactionalProperties(); // it should bump the epoch each time for the same producer id assertEquals(producerId, transactionalProperties.getProducerId()); assertEquals(1, transactionalProperties.getEpoch()); @@ -463,7 +463,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer3); assertEquals(1, backendProducers.size()); backendProducer = backendProducers.iterator().next(); - transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + transactionalProperties = backendProducer.getTransactionalProperties(); assertEquals(producerId, transactionalProperties.getProducerId()); assertEquals(2, transactionalProperties.getEpoch()); pscProducer3.abortTransaction(); diff --git a/psc-logging/pom.xml b/psc-logging/pom.xml index f64fc00..6906f04 100644 --- a/psc-logging/pom.xml +++ b/psc-logging/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 3.1.1 + 3.1.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/psc/pom.xml b/psc/pom.xml index 0770f9c..32f966f 100644 --- a/psc/pom.xml +++ b/psc/pom.xml @@ -5,7 +5,7 @@ psc-java-oss com.pinterest.psc - 3.1.1 + 3.1.2-SNAPSHOT ../pom.xml 4.0.0 diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java index 8b8b41e..6cfdd0b 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java +++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java @@ -2,6 +2,10 @@ import com.pinterest.psc.producer.PscProducerTransactionalProperties; +/** + * Moved to PscProducerTransactionalProperties + */ +@Deprecated public class KafkaProducerTransactionalProperties extends PscProducerTransactionalProperties { public KafkaProducerTransactionalProperties(long producerId, short epoch) { diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java index ec1c627..9bbe30c 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java +++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java @@ -722,7 +722,7 @@ public void resumeTransaction(PscBackendProducer otherBackendProducer) throws Pr ); } - resumeTransaction(new KafkaProducerTransactionalProperties(producerId, epoch)); + resumeTransaction(new PscProducerTransactionalProperties(producerId, epoch)); } @Override @@ -730,17 +730,6 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran if (kafkaProducer == null) handleUninitializedKafkaProducer("resumeTransaction()"); - if (!(pscProducerTransactionalProperties instanceof KafkaProducerTransactionalProperties)) { - handleException( - new BackendProducerException( - "[Kafka] Unexpected producer transaction state type: " + pscProducerTransactionalProperties.getClass().getCanonicalName(), - PscUtils.BACKEND_TYPE_KAFKA - ), true - ); - } - - KafkaProducerTransactionalProperties kafkaProducerTransactionalProperties = (KafkaProducerTransactionalProperties) pscProducerTransactionalProperties; - try { Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager"); synchronized (kafkaProducer) { @@ -758,8 +747,8 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran PscCommon.invoke(topicPartitionBookkeeper, "reset"); Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch"); - PscCommon.setField(producerIdAndEpoch, "producerId", kafkaProducerTransactionalProperties.getProducerId()); - PscCommon.setField(producerIdAndEpoch, "epoch", kafkaProducerTransactionalProperties.getEpoch()); + PscCommon.setField(producerIdAndEpoch, "producerId", pscProducerTransactionalProperties.getProducerId()); + PscCommon.setField(producerIdAndEpoch, "epoch", pscProducerTransactionalProperties.getEpoch()); PscCommon.invoke( transactionManager, @@ -791,7 +780,7 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager"); Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch"); - return new KafkaProducerTransactionalProperties( + return new PscProducerTransactionalProperties( (long) PscCommon.getField(producerIdAndEpoch, "producerId"), (short) PscCommon.getField(producerIdAndEpoch, "epoch") );