diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java index 8ce6ba3..a35862b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java @@ -22,19 +22,15 @@ import com.pinterest.psc.exception.producer.ProducerException; import com.pinterest.psc.exception.startup.ConfigurationException; import com.pinterest.psc.producer.PscProducer; -import org.apache.kafka.clients.producer.internals.TransactionManager; -import org.apache.kafka.clients.producer.internals.TransactionalRequestResult; +import com.pinterest.psc.producer.PscProducerTransactionalProperties; +import com.pinterest.psc.producer.transaction.TransactionManagerUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; -import java.lang.reflect.Constructor; -import java.lang.reflect.Field; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.time.Duration; import java.util.Properties; -import java.util.Set; +import java.util.concurrent.Future; import static org.apache.flink.util.Preconditions.checkState; @@ -47,9 +43,6 @@ class FlinkPscInternalProducer extends PscProducer { private static final Logger LOG = LoggerFactory.getLogger(FlinkPscInternalProducer.class); - private static final String TRANSACTION_STATE_ENUM = - "com.pinterest.psc.producer.PscProducer$TransactionalState"; - private static final String KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch"; @Nullable private String transactionalId; private volatile boolean inTransaction; private volatile boolean closed; @@ -142,27 +135,12 @@ public String getTransactionalId() { return transactionalId; } - private static Object getProducerIdAndEpoch(Object transactionManager) { - return getField(transactionManager, KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME); - } - - private static void verifyTransactionManagerCompatibility(Object transactionManager) throws ProducerException { - if (!(transactionManager instanceof TransactionManager)) { - // only Kafka transaction managers are supported at the moment - throw new ProducerException("Unsupported transaction manager: " + transactionManager); - } - } - public short getEpoch() throws ProducerException { - Object transactionManager = getTransactionManager(); - Object producerIdAndEpoch = getProducerIdAndEpoch(transactionManager); - return (short) getField(producerIdAndEpoch, "epoch"); + return TransactionManagerUtils.getEpoch(getTransactionManager()); } public long getProducerId() throws ProducerException { - Object transactionManager = getTransactionManager(); - Object producerIdAndEpoch = getProducerIdAndEpoch(transactionManager); - return (long) getField(producerIdAndEpoch, "producerId"); + return TransactionManagerUtils.getProducerId(getTransactionManager()); } public void initTransactionId(String transactionalId) throws ProducerException { @@ -180,11 +158,7 @@ public void setTransactionId(String transactionalId) throws ProducerException { LOG.debug("Change transaction id from {} to {}", this.transactionalId, transactionalId); Object transactionManager = getTransactionManager(); synchronized (transactionManager) { - setField(transactionManager, "transactionalId", transactionalId); - setField( - transactionManager, - "currentState", - getTransactionManagerState("UNINITIALIZED")); + TransactionManagerUtils.setTransactionId(transactionManager, transactionalId); this.transactionalId = transactionalId; } } @@ -198,89 +172,15 @@ public void setTransactionId(String transactionalId) throws ProducerException { */ private void flushNewPartitions() throws ProducerException { LOG.info("Flushing new partitions"); - TransactionalRequestResult result = enqueueNewPartitions(); + Future future = TransactionManagerUtils.enqueueInFlightTransactions(getTransactionManager()); super.wakeup(); - result.await(); - } - - /** - * Enqueues new transactions at the transaction manager and returns a {@link - * TransactionalRequestResult} that allows waiting on them. - * - *

If there are no new transactions we return a {@link TransactionalRequestResult} that is - * already done. - */ - private TransactionalRequestResult enqueueNewPartitions() throws ProducerException { - Object transactionManager = getTransactionManager(); - synchronized (transactionManager) { - Object newPartitionsInTransaction = - getField(transactionManager, "newPartitionsInTransaction"); - Object newPartitionsInTransactionIsEmpty = - invoke(newPartitionsInTransaction, "isEmpty"); - TransactionalRequestResult result; - if (newPartitionsInTransactionIsEmpty instanceof Boolean - && !((Boolean) newPartitionsInTransactionIsEmpty)) { - Object txnRequestHandler = - invoke(transactionManager, "addPartitionsToTransactionHandler"); - invoke( - transactionManager, - "enqueueRequest", - new Class[] {txnRequestHandler.getClass().getSuperclass()}, - new Object[] {txnRequestHandler}); - result = - (TransactionalRequestResult) - getField( - txnRequestHandler, - txnRequestHandler.getClass().getSuperclass(), - "result"); - } else { - // we don't have an operation but this operation string is also used in - // addPartitionsToTransactionHandler. - result = new TransactionalRequestResult("AddPartitionsToTxn"); - result.done(); - } - return result; - } - } - - private static Object invoke(Object object, String methodName, Object... args) { - Class[] argTypes = new Class[args.length]; - for (int i = 0; i < args.length; i++) { - argTypes[i] = args[i].getClass(); - } - return invoke(object, methodName, argTypes, args); - } - - private static Object invoke( - Object object, String methodName, Class[] argTypes, Object[] args) { - try { - Method method = object.getClass().getDeclaredMethod(methodName, argTypes); - method.setAccessible(true); - return method.invoke(object, args); - } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { - throw new RuntimeException("Incompatible PscProducer version", e); - } - } - - /** - * Gets and returns the field {@code fieldName} from the given Object {@code object} using - * reflection. - */ - private static Object getField(Object object, String fieldName) { - return getField(object, object.getClass(), fieldName); - } - - /** - * Gets and returns the field {@code fieldName} from the given Object {@code object} using - * reflection. - */ - private static Object getField(Object object, Class clazz, String fieldName) { try { - Field field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - return field.get(object); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Incompatible PscProducer version", e); + boolean isSuccessful = future.get(); + if (!isSuccessful) { + throw new ProducerException("Flushing in-flight transactions failed"); + } + } catch (Exception e) { + throw new ProducerException("Failed to flush new partitions", e); } } @@ -304,84 +204,16 @@ public void resumeTransaction(long producerId, short epoch) throws ProducerExcep Object transactionManager = getTransactionManager(); synchronized (transactionManager) { - Object topicPartitionBookkeeper = - getField(transactionManager, "topicPartitionBookkeeper"); - - transitionTransactionManagerStateTo(transactionManager, "INITIALIZING"); - invoke(topicPartitionBookkeeper, "reset"); - - setField( + TransactionManagerUtils.resumeTransaction( transactionManager, - KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME, - createProducerIdAndEpoch(producerId, epoch)); - - transitionTransactionManagerStateTo(transactionManager, "READY"); - - transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION"); - setField(transactionManager, "transactionStarted", true); + new PscProducerTransactionalProperties(producerId, epoch) + ); this.inTransaction = true; } } - private static Object createProducerIdAndEpoch(long producerId, short epoch) { - try { - Field field = - TransactionManager.class.getDeclaredField(KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME); - Class clazz = field.getType(); - Constructor constructor = clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE); - constructor.setAccessible(true); - return constructor.newInstance(producerId, epoch); - } catch (InvocationTargetException - | InstantiationException - | IllegalAccessException - | NoSuchFieldException - | NoSuchMethodException e) { - throw new RuntimeException("Incompatible PscProducer version", e); - } - } - - /** - * Sets the field {@code fieldName} on the given Object {@code object} to {@code value} using - * reflection. - */ - private static void setField(Object object, String fieldName, Object value) { - setField(object, object.getClass(), fieldName, value); - } - - private static void setField(Object object, Class clazz, String fieldName, Object value) { - try { - Field field = clazz.getDeclaredField(fieldName); - field.setAccessible(true); - field.set(object, value); - } catch (NoSuchFieldException | IllegalAccessException e) { - throw new RuntimeException("Incompatible PscProducer version", e); - } - } - - @SuppressWarnings({"unchecked", "rawtypes"}) - private static Enum getTransactionManagerState(String enumName) { - try { - Class cl = (Class) Class.forName(TRANSACTION_STATE_ENUM); - return Enum.valueOf(cl, enumName); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Incompatible PscProducer version", e); - } - } - private Object getTransactionManager() throws ProducerException { - Set txnManagers = super.getTransactionManagers(); - if (txnManagers.size() != 1) { - // This should never happen unless the same PscProducer spun up multiple BackendProducers - throw new ProducerException("Expected exactly one transaction manager, but found " + txnManagers.size()); - } - Object txnManager = txnManagers.iterator().next(); - verifyTransactionManagerCompatibility(txnManager); - return txnManager; - } - - private static void transitionTransactionManagerStateTo( - Object transactionManager, String state) { - invoke(transactionManager, "transitionTo", getTransactionManagerState(state)); + return super.getExactlyOneTransactionManager(); } @Override