Skip to content

Commit

Permalink
Make catch block non terminal
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Dec 7, 2023
1 parent 661d3e6 commit 01030ad
Showing 1 changed file with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1062,25 +1062,28 @@ protected void recoverAndCommit(FlinkPscProducer.PscTransactionState transaction
producer.resumeTransaction(pscProducerTransactionalProperties, topicUris);
commitTransaction(producer);
} catch (FlinkPscException | ProducerException e) {
// That means we may have committed this transaction before.
if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) {
LOG.warn(
"Unable to commit recovered transaction ({}) because it's in an invalid state. "
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
transaction,
e);
}
if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) {
LOG.warn(
"Unable to commit recovered transaction ({}) because its producer is already fenced."
+ " This means that you either have a different producer with the same '{}' or"
+ " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ " please consult the Flink documentation for more details.",
transaction,
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
producerConfig.getProperty("psc.producer.transaction.timeout.ms"),
e);
LOG.warn("Encountered exception during recoverAndCommit()", e);
if (e.getCause() != null) {
// That means we may have committed this transaction before.
if (e.getCause().getClass().isAssignableFrom(InvalidTxnStateException.class)) {
LOG.warn(
"Unable to commit recovered transaction ({}) because it's in an invalid state. "
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
transaction,
e);
}
if (e.getCause().getClass().isAssignableFrom(ProducerFencedException.class)) {
LOG.warn(
"Unable to commit recovered transaction ({}) because its producer is already fenced."
+ " This means that you either have a different producer with the same '{}' or"
+ " recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ " please consult the Flink documentation for more details.",
transaction,
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
producerConfig.getProperty("psc.producer.transaction.timeout.ms"),
e);
}
}
} finally {
if (producer != null) {
Expand Down

0 comments on commit 01030ad

Please sign in to comment.