Skip to content

Commit

Permalink
Merge branch 'txn_manager_utils' into full_1_15_upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Aug 26, 2024
2 parents 5986f99 + ae4b690 commit baba790
Show file tree
Hide file tree
Showing 16 changed files with 639 additions and 186 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-java-oss</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<packaging>pom</packaging>
<name>psc-java-oss</name>
<modules>
Expand Down
2 changes: 1 addition & 1 deletion psc-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
4 changes: 2 additions & 2 deletions psc-examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>psc-examples</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<name>psc-examples</name>

<properties>
Expand Down
4 changes: 2 additions & 2 deletions psc-flink-logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>psc-flink-logging</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>

<dependencies>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion psc-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>com.pinterest.psc</groupId>
<artifactId>psc-java-oss</artifactId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>psc-flink</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.pinterest.psc.producer.PscProducer;
import com.pinterest.psc.producer.PscProducerMessage;
import com.pinterest.psc.producer.PscProducerTransactionalProperties;
import com.pinterest.psc.producer.transaction.TransactionManagerUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
Expand Down Expand Up @@ -165,22 +166,20 @@ public String getTransactionalId() {

public long getProducerId(PscProducerMessage pscProducerMessage) throws ProducerException {
Object transactionManager = super.getTransactionManager(pscProducerMessage);
Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch");
return (long) getField(producerIdAndEpoch, "producerId");
return TransactionManagerUtils.getProducerId(transactionManager);
}

public short getEpoch(PscProducerMessage pscProducerMessage) throws ProducerException {
Object transactionManager = super.getTransactionManager(pscProducerMessage);
Object producerIdAndEpoch = getField(transactionManager, "producerIdAndEpoch");
return (short) getField(producerIdAndEpoch, "epoch");
return TransactionManagerUtils.getEpoch(transactionManager);
}

@VisibleForTesting
public Set<Integer> getTransactionCoordinatorIds() throws ProducerException {
Set<Integer> coordinatorIds = new HashSet<>();
super.getTransactionManagers().forEach(transactionManager ->
coordinatorIds.add(
((Node) invoke(transactionManager, "coordinator", FindCoordinatorRequest.CoordinatorType.TRANSACTION)).id()
TransactionManagerUtils.getTransactionCoordinatorId(transactionManager)
)
);
return coordinatorIds;
Expand All @@ -200,9 +199,15 @@ private void ensureNotClosed() {
*/
private void flushNewPartitions() throws ProducerException {
LOG.info("Flushing new partitions");
Set<TransactionalRequestResult> results = enqueueNewPartitions();
Set<Future<Boolean>> results = enqueueNewPartitions();
super.wakeup();
results.forEach(TransactionalRequestResult::await);
results.forEach(future -> {
try {
future.get();
} catch (Exception e) {
throw new RuntimeException("Error while flushing new partitions", e);
}
});
}

/**
Expand All @@ -212,97 +217,17 @@ private void flushNewPartitions() throws ProducerException {
* <p>If there are no new transactions we return a {@link TransactionalRequestResult} that is
* already done.
*/
private Set<TransactionalRequestResult> enqueueNewPartitions() throws ProducerException {
Set<TransactionalRequestResult> transactionalRequestResults = new HashSet<>();
private Set<Future<Boolean>> enqueueNewPartitions() throws ProducerException {
Set<Future<Boolean>> transactionalRequestResults = new HashSet<>();
Set<Object> transactionManagers = super.getTransactionManagers();
for (Object transactionManager : transactionManagers) {
synchronized (transactionManager) {
Object newPartitionsInTransaction = getField(transactionManager, "newPartitionsInTransaction");
Object newPartitionsInTransactionIsEmpty = invoke(newPartitionsInTransaction, "isEmpty");
TransactionalRequestResult result;
if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean) newPartitionsInTransactionIsEmpty)) {
Object txnRequestHandler = invoke(transactionManager, "addPartitionsToTransactionHandler");
invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
result = (TransactionalRequestResult) getField(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
} else {
// we don't have an operation but this operation string is also used in
// addPartitionsToTransactionHandler.
result = new TransactionalRequestResult("AddPartitionsToTxn");
result.done();
}
transactionalRequestResults.add(result);
Future<Boolean> transactionalRequestResultFuture =
TransactionManagerUtils.enqueueInFlightTransactions(transactionManager);
transactionalRequestResults.add(transactionalRequestResultFuture);
}
}
return transactionalRequestResults;
}

protected static Enum<?> getEnum(String enumFullName) {
String[] x = enumFullName.split("\\.(?=[^\\.]+$)");
if (x.length == 2) {
String enumClassName = x[0];
String enumName = x[1];
try {
Class<Enum> cl = (Class<Enum>) Class.forName(enumClassName);
return Enum.valueOf(cl, enumName);
} catch (ClassNotFoundException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}
return null;
}

protected static Object invoke(Object object, String methodName, Object... args) {
Class<?>[] argTypes = new Class[args.length];
for (int i = 0; i < args.length; i++) {
argTypes[i] = args[i].getClass();
}
return invoke(object, methodName, argTypes, args);
}

private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
try {
Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
method.setAccessible(true);
return method.invoke(object, args);
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
throw new RuntimeException("Incompatible PscProducer version", e);
}
}

/**
* Gets and returns the field {@code fieldName} from the given Object {@code object} using
* reflection.
*/
protected static Object getField(Object object, String fieldName) {
return getField(object, object.getClass(), fieldName);
}

/**
* Gets and returns the field {@code fieldName} from the given Object {@code object} using
* reflection.
*/
private static Object getField(Object object, Class<?> clazz, String fieldName) {
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
return field.get(object);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}

/**
* Sets the field {@code fieldName} on the given Object {@code object} to {@code value} using
* reflection.
*/
protected static void setField(Object object, String fieldName, Object value) {
try {
Field field = object.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(object, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException("Incompatible KafkaProducer version", e);
}
}

}
2 changes: 1 addition & 1 deletion psc-integration-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ public class TestMultiKafkaClusterBackends {
private static final int partitions1 = 12;
private static final String topic2 = "topic2";
private static final int partitions2 = 24;
private static final String topic3 = "topic3";
private static final int partitions3 = 36;
private KafkaCluster kafkaCluster1, kafkaCluster2;
private String topicUriStr1, topicUriStr2;
private String topicUriStr1, topicUriStr2, topicUriStr3;

/**
* Initializes two Kafka clusters that are commonly used by all tests, and creates a single topic on each.
Expand All @@ -61,10 +63,14 @@ public void setup() throws IOException, InterruptedException {

kafkaCluster2 = new KafkaCluster("plaintext", "region2", "cluster2", 9092);
topicUriStr2 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s",
kafkaCluster2.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster2.getRegion(), kafkaCluster2.getCluster(), topic1);
kafkaCluster2.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster2.getRegion(), kafkaCluster2.getCluster(), topic2);

topicUriStr3 = String.format("%s:%s%s:kafka:env:cloud_%s::%s:%s",
kafkaCluster1.getTransport(), TopicUri.SEPARATOR, TopicUri.STANDARD, kafkaCluster1.getRegion(), kafkaCluster1.getCluster(), topic3);

PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic1, partitions1);
PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic2, partitions2);
PscTestUtils.createTopicAndVerify(sharedKafkaTestResource1, topic3, partitions3);
}

/**
Expand All @@ -78,12 +84,16 @@ public void setup() throws IOException, InterruptedException {
public void tearDown() throws ExecutionException, InterruptedException {
PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic1);
PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic2);
PscTestUtils.deleteTopicAndVerify(sharedKafkaTestResource1, topic3);
Thread.sleep(1000);
}

/**
* Verifies that backend producers each have their own transactional states that could be different at times.
*
* Also, verifies that the PscProducer throws the appropriate exception when trying to send messages via a
* new backend producer while the PscProducer is already transactional.
*
* @throws ConfigurationException
* @throws ProducerException
*/
Expand All @@ -100,19 +110,43 @@ public void testTransactionalProducersStates() throws ConfigurationException, Pr
PscBackendProducer<Integer, Integer> backendProducer1 = pscProducer.getBackendProducer(topicUriStr1);
assertEquals(PscProducer.TransactionalState.BEGUN, pscProducer.getBackendProducerState(backendProducer1));

Exception e = assertThrows(ProducerException.class, () -> pscProducer.beginTransaction());
assertEquals("Invalid transaction state: consecutive calls to beginTransaction().", e.getMessage());
PscProducerMessage<Integer, Integer> producerMessageTopic1 = new PscProducerMessage<>(topicUriStr1, 0);
pscProducer.send(producerMessageTopic1);
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));

PscProducerMessage<Integer, Integer> producerMessageTopic3 = new PscProducerMessage<>(topicUriStr3, 1);
pscProducer.send(producerMessageTopic3);
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));

assertEquals(1, pscProducer.getBackendProducers().size()); // topic1 and topic3 belong to same cluster so there should only be one backend producer at this point
assertEquals(backendProducer1, pscProducer.getBackendProducers().iterator().next());

assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));
assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());

pscProducer.commitTransaction();

assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.READY, pscProducer.getBackendProducerState(backendProducer1));

pscProducer.beginTransaction();

assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.BEGUN, pscProducer.getBackendProducerState(backendProducer1));

PscProducerMessage<Integer, Integer> producerMessage = new PscProducerMessage<>(topicUriStr2, 0);
pscProducer.send(producerMessage);
PscProducerMessage<Integer, Integer> producerMessageTopic2 = new PscProducerMessage<>(topicUriStr2, 0);
Exception e = assertThrows(ProducerException.class, () -> pscProducer.send(producerMessageTopic2));
assertEquals("Invalid call to send() which would have created a new backend producer. This is not allowed when the PscProducer is already transactional.", e.getMessage());

assertEquals(2, pscProducer.getBackendProducers().size());
assertEquals(1, pscProducer.getBackendProducers().size());

PscBackendProducer<Integer, Integer> backendProducer2 = pscProducer.getBackendProducer(topicUriStr2);
assertNotEquals(backendProducer1, backendProducer2);
pscProducer.send(producerMessageTopic1); // this should go through
assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer1));

pscProducer.commitTransaction();
assertEquals(PscProducer.TransactionalState.INIT_AND_BEGUN, pscProducer.getTransactionalState());
assertEquals(PscProducer.TransactionalState.READY, pscProducer.getBackendProducerState(backendProducer1));
assertEquals(PscProducer.TransactionalState.IN_TRANSACTION, pscProducer.getBackendProducerState(backendProducer2));

pscProducer.close();
}
Expand Down
2 changes: 1 addition & 1 deletion psc-logging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion psc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>psc-java-oss</artifactId>
<groupId>com.pinterest.psc</groupId>
<version>3.2.0</version>
<version>3.2.1-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Loading

0 comments on commit baba790

Please sign in to comment.