Skip to content

Commit

Permalink
WIP refactored FlinkPscInternalProducer (sink) to use abstracted logic
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Aug 13, 2024
1 parent 82342af commit 5986f99
Showing 1 changed file with 17 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,15 @@
import com.pinterest.psc.exception.producer.ProducerException;
import com.pinterest.psc.exception.startup.ConfigurationException;
import com.pinterest.psc.producer.PscProducer;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import com.pinterest.psc.producer.transaction.TransactionManagerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Future;

import static org.apache.flink.util.Preconditions.checkState;

Expand All @@ -47,9 +43,6 @@
class FlinkPscInternalProducer<K, V> extends PscProducer<K, V> {

private static final Logger LOG = LoggerFactory.getLogger(FlinkPscInternalProducer.class);
private static final String TRANSACTION_STATE_ENUM =
"com.pinterest.psc.producer.PscProducer$TransactionalState";
private static final String KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
@Nullable private String transactionalId;
private volatile boolean inTransaction;
private volatile boolean closed;
Expand Down Expand Up @@ -142,27 +135,12 @@ public String getTransactionalId() {
return transactionalId;
}

private static Object getProducerIdAndEpoch(Object transactionManager) {
return getField(transactionManager, KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME);
}

private static void verifyTransactionManagerCompatibility(Object transactionManager) throws ProducerException {
if (!(transactionManager instanceof TransactionManager)) {
// only Kafka transaction managers are supported at the moment
throw new ProducerException("Unsupported transaction manager: " + transactionManager);
}
}

public short getEpoch() throws ProducerException {
Object transactionManager = getTransactionManager();
Object producerIdAndEpoch = getProducerIdAndEpoch(transactionManager);
return (short) getField(producerIdAndEpoch, "epoch");
return TransactionManagerUtils.getEpoch(getTransactionManager());
}

public long getProducerId() throws ProducerException {
Object transactionManager = getTransactionManager();
Object producerIdAndEpoch = getProducerIdAndEpoch(transactionManager);
return (long) getField(producerIdAndEpoch, "producerId");
return TransactionManagerUtils.getProducerId(getTransactionManager());
}

public void initTransactionId(String transactionalId) throws ProducerException {
Expand All @@ -180,11 +158,7 @@ public void setTransactionId(String transactionalId) throws ProducerException {
LOG.debug("Change transaction id from {} to {}", this.transactionalId, transactionalId);
Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
setField(transactionManager, "transactionalId", transactionalId);
setField(
transactionManager,
"currentState",
getTransactionManagerState("UNINITIALIZED"));
TransactionManagerUtils.setTransactionId(transactionManager, transactionalId);
this.transactionalId = transactionalId;
}
}
Expand All @@ -198,89 +172,15 @@ public void setTransactionId(String transactionalId) throws ProducerException {
*/
private void flushNewPartitions() throws ProducerException {
LOG.info("Flushing new partitions");
TransactionalRequestResult result = enqueueNewPartitions();
Future<Boolean> future = TransactionManagerUtils.enqueueInFlightTransactions(getTransactionManager());
super.wakeup();
result.await();
}

/**
* Enqueues new transactions at the transaction manager and returns a {@link
* TransactionalRequestResult} that allows waiting on them.
*
* <p>If there are no new transactions we return a {@link TransactionalRequestResult} that is
* already done.
*/
private TransactionalRequestResult enqueueNewPartitions() throws ProducerException {
Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
Object newPartitionsInTransaction =
getField(transactionManager, "newPartitionsInTransaction");
Object newPartitionsInTransactionIsEmpty =
invoke(newPartitionsInTransaction, "isEmpty");
TransactionalRequestResult result;
if (newPartitionsInTransactionIsEmpty instanceof Boolean
&& !((Boolean) newPartitionsInTransactionIsEmpty)) {
Object txnRequestHandler =
invoke(transactionManager, "addPartitionsToTransactionHandler");
invoke(
transactionManager,
"enqueueRequest",
new Class[] {txnRequestHandler.getClass().getSuperclass()},
new Object[] {txnRequestHandler});
result =
(TransactionalRequestResult)
getField(
txnRequestHandler,
txnRequestHandler.getClass().getSuperclass(),
"result");
} else {
// we don't have an operation but this operation string is also used in
// addPartitionsToTransactionHandler.
result = new TransactionalRequestResult("AddPartitionsToTxn");
result.done();
}
return result;
}
}

private static Object invoke(Object object, String methodName, Object... args) {
Class<?>[] argTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
argTypes[i] = args[i].getClass();
}
return invoke(object, methodName, argTypes, args);
}

private static Object invoke(
Object object, String methodName, Class<?>[] argTypes, Object[] args) {
try {
Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
method.setAccessible(true);
return method.invoke(object, args);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Incompatible PscProducer version", e);
}
}

/**
* Gets and returns the field {@code fieldName} from the given Object {@code object} using
* reflection.
*/
private static Object getField(Object object, String fieldName) {
return getField(object, object.getClass(), fieldName);
}

/**
* Gets and returns the field {@code fieldName} from the given Object {@code object} using
* reflection.
*/
private static Object getField(Object object, Class<?> clazz, String fieldName) {
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(object);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible PscProducer version", e);
boolean isSuccessful = future.get();
if (!isSuccessful) {
throw new ProducerException("Flushing in-flight transactions failed");
}
} catch (Exception e) {
throw new ProducerException("Failed to flush new partitions", e);
}
}

Expand All @@ -304,84 +204,16 @@ public void resumeTransaction(long producerId, short epoch) throws ProducerExcep

Object transactionManager = getTransactionManager();
synchronized (transactionManager) {
Object topicPartitionBookkeeper =
getField(transactionManager, "topicPartitionBookkeeper");

transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
invoke(topicPartitionBookkeeper, "reset");

setField(
TransactionManagerUtils.resumeTransaction(
transactionManager,
KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME,
createProducerIdAndEpoch(producerId, epoch));

transitionTransactionManagerStateTo(transactionManager, "READY");

transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
setField(transactionManager, "transactionStarted", true);
new PscProducerTransactionalProperties(producerId, epoch)
);
this.inTransaction = true;
}
}

private static Object createProducerIdAndEpoch(long producerId, short epoch) {
try {
Field field =
TransactionManager.class.getDeclaredField(KAFKA_TXN_MANAGER_PRODUCER_ID_AND_EPOCH_FIELD_NAME);
Class<?> clazz = field.getType();
Constructor<?> constructor = clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
constructor.setAccessible(true);
return constructor.newInstance(producerId, epoch);
} catch (InvocationTargetException
| InstantiationException
| IllegalAccessException
| NoSuchFieldException
| NoSuchMethodException e) {
throw new RuntimeException("Incompatible PscProducer version", e);
}
}

/**
* Sets the field {@code fieldName} on the given Object {@code object} to {@code value} using
* reflection.
*/
private static void setField(Object object, String fieldName, Object value) {
setField(object, object.getClass(), fieldName, value);
}

private static void setField(Object object, Class<?> clazz, String fieldName, Object value) {
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(object, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible PscProducer version", e);
}
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static Enum<?> getTransactionManagerState(String enumName) {
try {
Class<Enum> cl = (Class<Enum>) Class.forName(TRANSACTION_STATE_ENUM);
return Enum.valueOf(cl, enumName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Incompatible PscProducer version", e);
}
}

private Object getTransactionManager() throws ProducerException {
Set<Object> txnManagers = super.getTransactionManagers();
if (txnManagers.size() != 1) {
// This should never happen unless the same PscProducer spun up multiple BackendProducers
throw new ProducerException("Expected exactly one transaction manager, but found " + txnManagers.size());
}
Object txnManager = txnManagers.iterator().next();
verifyTransactionManagerCompatibility(txnManager);
return txnManager;
}

private static void transitionTransactionManagerStateTo(
Object transactionManager, String state) {
invoke(transactionManager, "transitionTo", getTransactionManagerState(state));
return super.getExactlyOneTransactionManager();
}

@Override
Expand Down

0 comments on commit 5986f99

Please sign in to comment.