From b9a0d20781330c054892300a5f2eb48ed6a69303 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 25 Sep 2023 12:42:27 -0400 Subject: [PATCH 01/12] README update --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b2591fe..94d8bc2 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Some highlights of PSC include: ***PSC is currently under active development.*** -PSC currently supports [Apache Kafka](https://github.com/apache/kafka) and [MemQ](https://github.com/pinterest/memq) PubSub systems in Java, with support for more languages and PubSub systems coming soon. +PSC currently supports [Apache Kafka](https://github.com/apache/kafka) and [MemQ](https://github.com/pinterest/memq) PubSub systems in Java, with support for more languages coming soon. Contributions to adding support for other PubSub systems is welcome! ## Compatibility Matrix From 0ce712617652c1a39bca4663a834168955564139 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Wed, 25 Oct 2023 11:54:19 -0700 Subject: [PATCH 02/12] Remove IMDSv1 usage --- .../com/pinterest/psc/common/PscUtils.java | 64 +++++++++++-------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/common/PscUtils.java b/psc/src/main/java/com/pinterest/psc/common/PscUtils.java index 1cecee7..85c9325 100644 --- a/psc/src/main/java/com/pinterest/psc/common/PscUtils.java +++ b/psc/src/main/java/com/pinterest/psc/common/PscUtils.java @@ -1,16 +1,13 @@ package com.pinterest.psc.common; import com.pinterest.psc.exception.startup.ConfigurationException; -import com.pinterest.psc.exception.startup.ServiceDiscoveryException; import com.pinterest.psc.logging.PscLogger; -import software.amazon.awssdk.core.SdkSystemSetting; +import java.io.BufferedReader; +import java.io.FileReader; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; -import java.net.ConnectException; -import java.net.HttpURLConnection; -import java.net.URL; import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; @@ -35,35 +32,36 @@ public static T instantiateFromClass(String fqdn, Class targetClass) thro } public static boolean isEc2Host() { + return doesEc2MetadataExist() || isSysVendorAws() || IsAwsOsDetected(); + } + + protected static boolean doesEc2MetadataExist() { try { - String hostAddressForEC2MetadataService = SdkSystemSetting.AWS_EC2_METADATA_SERVICE_ENDPOINT.getStringValueOrThrow(); - if (hostAddressForEC2MetadataService == null) - return false; - URL url = new URL(hostAddressForEC2MetadataService + "/latest/dynamic/instance-identity/document"); - HttpURLConnection con = (HttpURLConnection) url.openConnection(); - con.setRequestMethod("GET"); - con.setConnectTimeout(1000); - con.setReadTimeout(1000); - con.connect(); - con.disconnect(); - return con.getResponseCode() == 200; - } catch (ConnectException connectException) { - return isEc2HostAlternate(); - } catch (Exception exception) { - logger.warn("Error occurred when determining the host type.", new ServiceDiscoveryException(exception)); + ProcessBuilder processBuilder = new ProcessBuilder("ec2metadata"); + processBuilder.redirectErrorStream(true); + Process process = processBuilder.start(); + return process.waitFor() == 0; + } catch (Exception e) { + logger.info("Could not detect if host is EC2 from ec2metadata.", e); return false; } } - protected static boolean isEc2HostAlternate() { - ProcessBuilder processBuilder = new ProcessBuilder("ec2metadata"); - processBuilder.redirectErrorStream(true); + protected static boolean isSysVendorAws() { + try { + return getFileContent("/sys/devices/virtual/dmi/id/sys_vendor").trim().equals("Amazon EC2"); + } catch (Exception e) { + logger.info("Could not detect if host is EC2 from sys vendor.", e); + return false; + } + } + + protected static boolean IsAwsOsDetected() { try { - Process process = processBuilder.start(); - return process.waitFor() == 0; - } catch (IOException | InterruptedException e) { - logger.info("Error occurred when running the `ec2metadata` command. Will check OS version as last resort."); return System.getProperty("os.version").contains("aws"); + } catch (Exception e) { + logger.info("Could not detect if host is EC2 from os version.", e); + return false; } } @@ -89,4 +87,14 @@ public static String getStackTraceAsString(Exception exception) { exception.printStackTrace(printWriter); return stringWriter.toString(); } -} \ No newline at end of file + + public static String getFileContent(String path) throws IOException { + StringBuilder content = new StringBuilder(); + BufferedReader reader = new BufferedReader(new FileReader(path)); + String line; + while ((line = reader.readLine()) != null) { + content.append(line).append(System.lineSeparator()); + } + return content.toString(); + } +} From 3a580f12d2b88298e738e9a44eb51688e286ab6e Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Wed, 25 Oct 2023 12:11:52 -0700 Subject: [PATCH 03/12] Add code owners --- .github/CODEOWNERS | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 .github/CODEOWNERS diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..82d4cfe --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,2 @@ +# All owners +* @pinterest/logging From a99e04e6da822d08a4aa28d597181ce483a8873b Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Wed, 25 Oct 2023 13:22:54 -0700 Subject: [PATCH 04/12] Update method name --- psc/src/main/java/com/pinterest/psc/common/PscUtils.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/common/PscUtils.java b/psc/src/main/java/com/pinterest/psc/common/PscUtils.java index 85c9325..bcd57cc 100644 --- a/psc/src/main/java/com/pinterest/psc/common/PscUtils.java +++ b/psc/src/main/java/com/pinterest/psc/common/PscUtils.java @@ -32,7 +32,7 @@ public static T instantiateFromClass(String fqdn, Class targetClass) thro } public static boolean isEc2Host() { - return doesEc2MetadataExist() || isSysVendorAws() || IsAwsOsDetected(); + return doesEc2MetadataExist() || isSysVendorAws() || isAwsOsDetected(); } protected static boolean doesEc2MetadataExist() { @@ -56,7 +56,7 @@ protected static boolean isSysVendorAws() { } } - protected static boolean IsAwsOsDetected() { + protected static boolean isAwsOsDetected() { try { return System.getProperty("os.version").contains("aws"); } catch (Exception e) { From 2a9f0d20c737f77cfee6a9d315ae681e078cf3e8 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 15:51:18 -0500 Subject: [PATCH 05/12] Disable SSL reset by default --- .../java/com/pinterest/psc/config/PscConfiguration.java | 5 +++++ .../pinterest/psc/config/PscConfigurationInternal.java | 9 +++++++++ .../pinterest/psc/consumer/kafka/PscKafkaConsumer.java | 6 ++++++ 3 files changed, 20 insertions(+) diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java index 59f9dec..62e5e88 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfiguration.java @@ -89,6 +89,11 @@ public class PscConfiguration extends PropertiesConfiguration { */ public final static String PCS_AUTO_RESOLUTION_RETRY_COUNT = "psc.auto.resolution.retry.count"; + /** + * Whether to proactively reset consumer or producer based on approaching SSL certificate expiry + */ + public final static String PSC_PROACTIVE_SSL_RESET_ENABLED = "psc.proactive.ssl.reset.enabled"; + private final static String PSC_CLIENT_TYPE = "psc.client.type"; public final static String PSC_CLIENT_TYPE_CONSUMER = "consumer"; public final static String PSC_CLIENT_TYPE_PRODUCER = "producer"; diff --git a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java index ed4384f..7086c16 100644 --- a/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java +++ b/psc/src/main/java/com/pinterest/psc/config/PscConfigurationInternal.java @@ -53,6 +53,7 @@ public class PscConfigurationInternal { private boolean autoResolutionEnabled; private int autoResolutionRetryCount; private MetricsReporterConfiguration metricsReporterConfiguration; + private boolean proactiveSslResetEnabled; public PscConfigurationInternal() { } @@ -206,6 +207,10 @@ private void validateGenericConfiguration(Map invalidConfigs) Integer autoResolutionRetryCount = verifyConfigHasValue(pscConfiguration, PscConfiguration.PCS_AUTO_RESOLUTION_RETRY_COUNT, Integer.class, invalidConfigs); this.autoResolutionRetryCount = autoResolutionRetryCount != null ? autoResolutionRetryCount : 5; } + + // SSL reset + Boolean proactiveSslResetEnabled = verifyConfigHasValue(pscConfiguration, PscConfiguration.PSC_PROACTIVE_SSL_RESET_ENABLED, Boolean.class, invalidConfigs); + this.proactiveSslResetEnabled = proactiveSslResetEnabled != null ? proactiveSslResetEnabled : false; // false by default } public void logConfiguration() { @@ -733,6 +738,10 @@ public boolean isAutoResolutionEnabled() { return autoResolutionEnabled; } + public boolean isProactiveSslResetEnabled() { + return proactiveSslResetEnabled; + } + public int getAutoResolutionRetryCount() { return autoResolutionRetryCount; } diff --git a/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java b/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java index 388e50b..ca43310 100644 --- a/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java +++ b/psc/src/main/java/com/pinterest/psc/consumer/kafka/PscKafkaConsumer.java @@ -95,6 +95,7 @@ public void initializeBackend(ServiceDiscoveryConfig discoveryConfig, TopicUri t properties, pscConfigurationInternal, Collections.singleton(topicUri)); logger.info("Initialized PscKafkaConsumer with SSL cert expiry time at " + sslCertificateExpiryTimeInMillis); } + logger.info("Proactive SSL reset enabled: {}", pscConfigurationInternal.isProactiveSslResetEnabled()); } @Override @@ -1042,6 +1043,11 @@ protected void maybeResetBackendClient() throws ConsumerException { // reset if SSL enabled && cert is expired if (isSslEnabledInAnyActiveSusbcriptionOrAssignment && (System.currentTimeMillis() >= sslCertificateExpiryTimeInMillis)) { + if (!pscConfigurationInternal.isProactiveSslResetEnabled()) { + logger.info("Skipping reset of client even though SSL certificate is approaching expiry at {}" + + " because proactive reset is disabled", sslCertificateExpiryTimeInMillis); + return; + } if (KafkaSslUtils.keyStoresExist(properties)) { logger.info("Resetting backend Kafka client due to cert expiry at " + sslCertificateExpiryTimeInMillis); From 4d9a99bd03f679665d4c3d5a0ad2ab012a2485b2 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 15:58:47 -0500 Subject: [PATCH 06/12] Swallow ConcurrentModificationException when stack trace matches Kafka JmxReporter in consumer poll --- .../com/pinterest/psc/common/kafka/KafkaErrors.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java b/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java index 417cde5..c6a2329 100644 --- a/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java +++ b/psc/src/main/java/com/pinterest/psc/common/kafka/KafkaErrors.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.WakeupException; +import java.util.ConcurrentModificationException; import java.util.LinkedHashMap; import java.util.Map; @@ -165,6 +166,17 @@ ImmutableMap., Map(1) {{ + put( + "org.apache.kafka.common.metrics.JmxReporter.getMBeanName", // known case of CME - we will swallow it + new PscErrorHandler.ConsumerAction(PscErrorHandler.ActionType.NONE, ConsumerException.class) + ); + }} + ) + .build(); /** From c266781a5432d2e59a47988a7a6107ad63dda202 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 16:10:16 -0500 Subject: [PATCH 07/12] Add config into psc.conf --- psc/src/main/resources/psc.conf | 1 + 1 file changed, 1 insertion(+) diff --git a/psc/src/main/resources/psc.conf b/psc/src/main/resources/psc.conf index 724b8a1..fb4ffda 100644 --- a/psc/src/main/resources/psc.conf +++ b/psc/src/main/resources/psc.conf @@ -6,6 +6,7 @@ psc.config.topic.uri= psc.auto.resolution.enabled=true psc.auto.resolution.retry.count=5 +psc.proactive.ssl.reset.enabled=false #psc.metrics #valid options com.pinterest.psc.metrics.NullMetricsReporter, com.pinterest.psc.metrics.OpenTSDBMetricsReporter From fccbf7fdd96d543c13ea85d19f777675cf52fd71 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 17:36:02 -0500 Subject: [PATCH 08/12] Add build action --- .github/workflows/maven.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .github/workflows/maven.yml diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml new file mode 100644 index 0000000..827cdfb --- /dev/null +++ b/.github/workflows/maven.yml @@ -0,0 +1,16 @@ +name: PSC-Java Build + +on: [pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Build with Maven + run: mvn -B package --file pom.xml \ No newline at end of file From 9195a718d06387ed1c974c11a933f63db8592523 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 17:38:56 -0500 Subject: [PATCH 09/12] Revert "Add build action" This reverts commit fccbf7fdd96d543c13ea85d19f777675cf52fd71. --- .github/workflows/maven.yml | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 .github/workflows/maven.yml diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml deleted file mode 100644 index 827cdfb..0000000 --- a/.github/workflows/maven.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: PSC-Java Build - -on: [pull_request] - -jobs: - build: - - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v1 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - name: Build with Maven - run: mvn -B package --file pom.xml \ No newline at end of file From 359d12307b4c9ef8c9c581ac8153d47c9ea4f6c3 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 17:40:12 -0500 Subject: [PATCH 10/12] Add build action upon PR --- .github/workflows/maven.yml | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 .github/workflows/maven.yml diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml new file mode 100644 index 0000000..827cdfb --- /dev/null +++ b/.github/workflows/maven.yml @@ -0,0 +1,16 @@ +name: PSC-Java Build + +on: [pull_request] + +jobs: + build: + + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v1 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Build with Maven + run: mvn -B package --file pom.xml \ No newline at end of file From 5fcc4bb0a6ee8a9e28988d99da2cf6e4d4160b9f Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Mon, 6 Nov 2023 20:10:09 -0500 Subject: [PATCH 11/12] Remove duplicate metrics reporting for psc.producer.produce.messages --- .../main/java/com/pinterest/psc/producer/PscProducer.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java b/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java index dc55288..8ff1fcc 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java +++ b/psc/src/main/java/com/pinterest/psc/producer/PscProducer.java @@ -316,13 +316,6 @@ public Future send(PscProducerMessage pscProducerMessage, Callb Future future = backendProducer.send(pscProducerMessage, callback); - PscMetricRegistryManager.getInstance().incrementCounterMetric( - pscProducerMessage.getTopicUriPartition().getTopicUri(), - pscProducerMessage.getPartition(), - PscMetrics.PSC_PRODUCER_PRODUCE_MESSAGES_METRIC, - pscConfigurationInternal - ); - return future; } From e6b4a15575855fced4591f53444dac367ef42f5c Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Thu, 7 Dec 2023 11:58:54 -0500 Subject: [PATCH 12/12] Try to fix NPE in checkpoint recovery by deprecating KafkaProducerTransactionalProperties --- .../connectors/psc/FlinkPscProducer.java | 1 - .../producer/TestOneKafkaBackend.java | 8 ++++---- .../KafkaProducerTransactionalProperties.java | 4 ++++ .../psc/producer/kafka/PscKafkaProducer.java | 19 ++++--------------- 4 files changed, 12 insertions(+), 20 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java index 8d3b91c..68f9b6b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java @@ -39,7 +39,6 @@ import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; import com.pinterest.psc.producer.PscProducerTransactionalProperties; -import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties; import com.pinterest.psc.serde.ByteArraySerializer; import org.apache.commons.lang3.StringUtils; import org.apache.flink.annotation.Internal; diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java index f9da346..5591598 100644 --- a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java +++ b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java @@ -21,9 +21,9 @@ import com.pinterest.psc.producer.PscBackendProducer; import com.pinterest.psc.producer.PscProducer; import com.pinterest.psc.producer.PscProducerMessage; +import com.pinterest.psc.producer.PscProducerTransactionalProperties; import com.pinterest.psc.producer.PscProducerUtils; import com.pinterest.psc.producer.creation.PscKafkaProducerCreator; -import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties; import com.pinterest.psc.serde.ByteArraySerializer; import com.pinterest.psc.serde.IntegerDeserializer; import com.pinterest.psc.serde.IntegerSerializer; @@ -434,7 +434,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept Collection backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer); assertEquals(1, backendProducers.size()); PscBackendProducer backendProducer = backendProducers.iterator().next(); - KafkaProducerTransactionalProperties transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + PscProducerTransactionalProperties transactionalProperties = backendProducer.getTransactionalProperties(); long producerId = transactionalProperties.getProducerId(); assertEquals(0, transactionalProperties.getEpoch()); @@ -450,7 +450,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer2); assertEquals(1, backendProducers.size()); backendProducer = backendProducers.iterator().next(); - transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + transactionalProperties = backendProducer.getTransactionalProperties(); // it should bump the epoch each time for the same producer id assertEquals(producerId, transactionalProperties.getProducerId()); assertEquals(1, transactionalProperties.getEpoch()); @@ -463,7 +463,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer3); assertEquals(1, backendProducers.size()); backendProducer = backendProducers.iterator().next(); - transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties(); + transactionalProperties = backendProducer.getTransactionalProperties(); assertEquals(producerId, transactionalProperties.getProducerId()); assertEquals(2, transactionalProperties.getEpoch()); pscProducer3.abortTransaction(); diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java index 8b8b41e..6cfdd0b 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java +++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java @@ -2,6 +2,10 @@ import com.pinterest.psc.producer.PscProducerTransactionalProperties; +/** + * Moved to PscProducerTransactionalProperties + */ +@Deprecated public class KafkaProducerTransactionalProperties extends PscProducerTransactionalProperties { public KafkaProducerTransactionalProperties(long producerId, short epoch) { diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java index 3632cb9..f666104 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java +++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java @@ -716,7 +716,7 @@ public void resumeTransaction(PscBackendProducer otherBackendProducer) throws Pr ); } - resumeTransaction(new KafkaProducerTransactionalProperties(producerId, epoch)); + resumeTransaction(new PscProducerTransactionalProperties(producerId, epoch)); } @Override @@ -724,17 +724,6 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran if (kafkaProducer == null) handleUninitializedKafkaProducer("resumeTransaction()"); - if (!(pscProducerTransactionalProperties instanceof KafkaProducerTransactionalProperties)) { - handleException( - new BackendProducerException( - "[Kafka] Unexpected producer transaction state type: " + pscProducerTransactionalProperties.getClass().getCanonicalName(), - PscUtils.BACKEND_TYPE_KAFKA - ), true - ); - } - - KafkaProducerTransactionalProperties kafkaProducerTransactionalProperties = (KafkaProducerTransactionalProperties) pscProducerTransactionalProperties; - try { Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager"); synchronized (kafkaProducer) { @@ -752,8 +741,8 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran PscCommon.invoke(topicPartitionBookkeeper, "reset"); Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch"); - PscCommon.setField(producerIdAndEpoch, "producerId", kafkaProducerTransactionalProperties.getProducerId()); - PscCommon.setField(producerIdAndEpoch, "epoch", kafkaProducerTransactionalProperties.getEpoch()); + PscCommon.setField(producerIdAndEpoch, "producerId", pscProducerTransactionalProperties.getProducerId()); + PscCommon.setField(producerIdAndEpoch, "epoch", pscProducerTransactionalProperties.getEpoch()); PscCommon.invoke( transactionManager, @@ -785,7 +774,7 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager"); Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch"); - return new KafkaProducerTransactionalProperties( + return new PscProducerTransactionalProperties( (long) PscCommon.getField(producerIdAndEpoch, "producerId"), (short) PscCommon.getField(producerIdAndEpoch, "epoch") );