diff --git a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java
index ba87d2f..ab2409a 100644
--- a/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java
+++ b/psc-flink/src/main/java/com/pinterest/flink/streaming/connectors/psc/FlinkPscProducer.java
@@ -39,7 +39,6 @@
 import com.pinterest.psc.producer.PscProducer;
 import com.pinterest.psc.producer.PscProducerMessage;
 import com.pinterest.psc.producer.PscProducerTransactionalProperties;
-import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties;
 import com.pinterest.psc.serde.ByteArraySerializer;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
diff --git a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java
index 11506f1..331fe8e 100644
--- a/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java
+++ b/psc-integration-test/src/test/java/com/pinterest/psc/integration/producer/TestOneKafkaBackend.java
@@ -21,9 +21,9 @@
 import com.pinterest.psc.producer.PscBackendProducer;
 import com.pinterest.psc.producer.PscProducer;
 import com.pinterest.psc.producer.PscProducerMessage;
+import com.pinterest.psc.producer.PscProducerTransactionalProperties;
 import com.pinterest.psc.producer.PscProducerUtils;
 import com.pinterest.psc.producer.creation.PscKafkaProducerCreator;
-import com.pinterest.psc.producer.kafka.KafkaProducerTransactionalProperties;
 import com.pinterest.psc.serde.ByteArraySerializer;
 import com.pinterest.psc.serde.IntegerDeserializer;
 import com.pinterest.psc.serde.IntegerSerializer;
@@ -434,7 +434,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
         Collection<PscBackendProducer> backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer);
         assertEquals(1, backendProducers.size());
         PscBackendProducer backendProducer = backendProducers.iterator().next();
-        KafkaProducerTransactionalProperties transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
+        PscProducerTransactionalProperties transactionalProperties = backendProducer.getTransactionalProperties();
         long producerId = transactionalProperties.getProducerId();
         assertEquals(0, transactionalProperties.getEpoch());
 
@@ -450,7 +450,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
         backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer2);
         assertEquals(1, backendProducers.size());
         backendProducer = backendProducers.iterator().next();
-        transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
+        transactionalProperties = backendProducer.getTransactionalProperties();
         // it should bump the epoch each time for the same producer id
         assertEquals(producerId, transactionalProperties.getProducerId());
         assertEquals(1, transactionalProperties.getEpoch());
@@ -463,7 +463,7 @@ public void testInitTransactions() throws ConfigurationException, ProducerExcept
         backendProducers = PscProducerUtils.getBackendProducersOf(pscProducer3);
         assertEquals(1, backendProducers.size());
         backendProducer = backendProducers.iterator().next();
-        transactionalProperties = (KafkaProducerTransactionalProperties) backendProducer.getTransactionalProperties();
+        transactionalProperties = backendProducer.getTransactionalProperties();
         assertEquals(producerId, transactionalProperties.getProducerId());
         assertEquals(2, transactionalProperties.getEpoch());
         pscProducer3.abortTransaction();
diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java
index 8b8b41e..6cfdd0b 100644
--- a/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java
+++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/KafkaProducerTransactionalProperties.java
@@ -2,6 +2,10 @@
 
 import com.pinterest.psc.producer.PscProducerTransactionalProperties;
 
+/**
+ * Moved to PscProducerTransactionalProperties
+ */
+@Deprecated
 public class KafkaProducerTransactionalProperties extends PscProducerTransactionalProperties {
 
     public KafkaProducerTransactionalProperties(long producerId, short epoch) {
diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java
index ec1c627..9bbe30c 100644
--- a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java
+++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java
@@ -722,7 +722,7 @@ public void resumeTransaction(PscBackendProducer otherBackendProducer) throws Pr
             );
         }
 
-        resumeTransaction(new KafkaProducerTransactionalProperties(producerId, epoch));
+        resumeTransaction(new PscProducerTransactionalProperties(producerId, epoch));
     }
 
     @Override
@@ -730,17 +730,6 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran
         if (kafkaProducer == null)
             handleUninitializedKafkaProducer("resumeTransaction()");
 
-        if (!(pscProducerTransactionalProperties instanceof KafkaProducerTransactionalProperties)) {
-            handleException(
-                    new BackendProducerException(
-                            "[Kafka] Unexpected producer transaction state type: " + pscProducerTransactionalProperties.getClass().getCanonicalName(),
-                            PscUtils.BACKEND_TYPE_KAFKA
-                    ), true
-            );
-        }
-
-        KafkaProducerTransactionalProperties kafkaProducerTransactionalProperties = (KafkaProducerTransactionalProperties) pscProducerTransactionalProperties;
-
         try {
             Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
             synchronized (kafkaProducer) {
@@ -758,8 +747,8 @@ public void resumeTransaction(PscProducerTransactionalProperties pscProducerTran
                 PscCommon.invoke(topicPartitionBookkeeper, "reset");
 
                 Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
-                PscCommon.setField(producerIdAndEpoch, "producerId", kafkaProducerTransactionalProperties.getProducerId());
-                PscCommon.setField(producerIdAndEpoch, "epoch", kafkaProducerTransactionalProperties.getEpoch());
+                PscCommon.setField(producerIdAndEpoch, "producerId", pscProducerTransactionalProperties.getProducerId());
+                PscCommon.setField(producerIdAndEpoch, "epoch", pscProducerTransactionalProperties.getEpoch());
 
                 PscCommon.invoke(
                         transactionManager,
@@ -791,7 +780,7 @@ public PscProducerTransactionalProperties getTransactionalProperties() throws Pr
 
         Object transactionManager = PscCommon.getField(kafkaProducer, "transactionManager");
         Object producerIdAndEpoch = PscCommon.getField(transactionManager, "producerIdAndEpoch");
-        return new KafkaProducerTransactionalProperties(
+        return new PscProducerTransactionalProperties(
                 (long) PscCommon.getField(producerIdAndEpoch, "producerId"),
                 (short) PscCommon.getField(producerIdAndEpoch, "epoch")
         );