diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java index ab2409a..4a81933 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java @@ -1062,25 +1062,28 @@ protected void recoverAndCommit(FlinkPscProducer.PscTransactionState transaction producer.resumeTransaction(pscProducerTransactionalProperties, topicUris); commitTransaction(producer); } catch (FlinkPscException | ProducerException e) { - // That means we may have committed this transaction before. - if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) { - LOG.warn( - "Unable to commit recovered transaction ({}) because it's in an invalid state. " - + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", - transaction, - e); - } - if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) { - LOG.warn( - "Unable to commit recovered transaction ({}) because its producer is already fenced." - + " This means that you either have a different producer with the same '{}' or" - + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," - + " please consult the Flink documentation for more details.", - transaction, - ProducerConfig.TRANSACTIONAL_ID_CONFIG, - ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, - producerConfig.getProperty("psc.producer.transaction.timeout.ms"), - e); + LOG.warn("Encountered exception during recoverAndCommit()", e); + if (e.getCause() != null) { + // That means we may have committed this transaction before. + if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) { + LOG.warn( + "Unable to commit recovered transaction ({}) because it's in an invalid state. " + + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.", + transaction, + e); + } + if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) { + LOG.warn( + "Unable to commit recovered transaction ({}) because its producer is already fenced." + + " This means that you either have a different producer with the same '{}' or" + + " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss," + + " please consult the Flink documentation for more details.", + transaction, + ProducerConfig.TRANSACTIONAL_ID_CONFIG, + ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, + producerConfig.getProperty("psc.producer.transaction.timeout.ms"), + e); + } } } finally { if (producer != null) {