Skip to content

Commit

Permalink
WIP finish PscSinkITCase
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Sep 26, 2024
1 parent 424b901 commit 602c345
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;

Expand All @@ -51,12 +52,13 @@ class FlinkPscInternalProducer<K, V> extends PscProducer<K, V> {
@Nullable private String transactionalId;
private volatile boolean inTransaction;
private volatile boolean closed;
private TopicUri clusterUri = null;

public FlinkPscInternalProducer(Properties properties, @Nullable String transactionalId) throws ConfigurationException, ProducerException, TopicUriSyntaxException {
super(PscConfigurationUtils.propertiesToPscConfiguration(withTransactionalId(properties, transactionalId)));
if (transactionalId != null) {
// Producer is transactional, so the backend producer should be immediately initialized given the ClusterUri in the properties
TopicUri clusterUri = PscFlinkConfiguration.validateAndGetBaseClusterUri(properties);
this.clusterUri = PscFlinkConfiguration.validateAndGetBaseClusterUri(properties);
getBackendProducerForTopicUri(validateTopicUri(clusterUri.getTopicUriAsString()));
}
this.transactionalId = transactionalId;
Expand Down Expand Up @@ -115,7 +117,7 @@ public void close() throws IOException {
// If this producer is still in transaction, it should be committing.
// However, at this point, we cannot decide that and we shouldn't prolong cancellation.
// So hard kill this producer with all resources.
super.close();
super.close(Duration.ZERO);
} else {
// If this is outside of a transaction, we should be able to cleanly shutdown.
super.close(Duration.ofHours(1));
Expand Down Expand Up @@ -209,6 +211,7 @@ private void flushNewPartitions() throws ProducerException {
public void resumeTransaction(long producerId, short epoch) throws ProducerException {
ensureOnlyOneBackendProducer();
checkState(!inTransaction, "Already in transaction %s", transactionalId);
checkState(clusterUri != null, "ClusterUri is not set");
checkState(
producerId >= 0 && epoch >= 0,
"Incorrect values for producerId %s and epoch %s",
Expand All @@ -220,18 +223,9 @@ public void resumeTransaction(long producerId, short epoch) throws ProducerExcep
producerId,
epoch);

Object transactionManager = getTransactionManager();
PscProducerTransactionalProperties pscProducerTransactionalProperties = new PscProducerTransactionalProperties(producerId, epoch);
synchronized (transactionManager) {
TransactionManagerUtils.resumeTransaction(
transactionManager,
pscProducerTransactionalProperties
);
PscBackendProducer<K, V> backendProducer = getBackendProducers().iterator().next();
setBackendProducerTransactionalState(backendProducer, TransactionalState.IN_TRANSACTION);
setTransactionalState(TransactionalState.INIT_AND_BEGUN);
this.inTransaction = true;
}
super.resumeTransaction(pscProducerTransactionalProperties, Collections.singleton(clusterUri.getTopicUriAsString()));
this.inTransaction = true;
}

private void ensureOnlyOneBackendProducer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void commit(Collection<CommitRequest<PscCommittable>> requests)
for (CommitRequest<PscCommittable> request : requests) {
final PscCommittable committable = request.getCommittable();
final String transactionalId = committable.getTransactionalId();
LOG.debug("Committing Kafka transaction {}", transactionalId);
LOG.debug("Committing transaction {}", transactionalId);
Optional<Recyclable<? extends FlinkPscInternalProducer<?, ?>>> recyclable =
committable.getProducer();
FlinkPscInternalProducer<?, ?> producer;
Expand All @@ -83,6 +83,7 @@ public void commit(Collection<CommitRequest<PscCommittable>> requests)
recyclable.ifPresent(Recyclable::close);
} catch (Exception ex) {
// TODO: make exception handling backend-agnostic
LOG.warn("Caught exception while committing transaction {}", transactionalId, ex);
Throwable cause = ex.getCause();
try {
if (cause instanceof Exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,10 @@ public void testRecoveryWithExactlyOnceGuarantee() throws Exception {
contains(
LongStream.range(1, lastCheckpointedRecord.get().get() + 1)
.boxed()
.toArray())));
.toArray()
)
)
);
}

@Test
Expand Down Expand Up @@ -291,7 +294,7 @@ public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep
executeWithMapper(
new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix");
final List<PscConsumerMessage<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, true);
drainAllRecordsFromTopic(topicUriStr, true);
assertThat(
deserializeValues(collectedRecords),
contains(
Expand All @@ -312,15 +315,15 @@ public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce
e.getCause().getCause().getMessage(),
containsString("Exceeded checkpoint tolerable failure"));
}
assertTrue(deserializeValues(drainAllRecordsFromTopic(topic, true)).isEmpty());
assertTrue(deserializeValues(drainAllRecordsFromTopic(topicUriStr, true)).isEmpty());

// Second job aborts all transactions from previous runs with higher parallelism
config.set(CoreOptions.DEFAULT_PARALLELISM, 1);
failed.get().set(true);
executeWithMapper(
new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, null);
final List<PscConsumerMessage<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, true);
drainAllRecordsFromTopic(topicUriStr, true);
assertThat(
deserializeValues(collectedRecords),
contains(
Expand All @@ -342,14 +345,14 @@ private void executeWithMapper(
final PscSinkBuilder<Long> builder =
new PscSinkBuilder<Long>()
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setPscProducerConfig(getPscClientConfiguration())
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString(topic)
.setTopicUriString(topicUriStr)
.setValueSerializationSchema(new RecordSerializer())
.build());
if (transactionalIdPrefix == null) {
transactionalIdPrefix = "kafka-sink";
transactionalIdPrefix = "psc-sink";
}
builder.setTransactionalIdPrefix(transactionalIdPrefix);
stream.sinkTo(builder.build());
Expand All @@ -368,22 +371,21 @@ private void testRecoveryWithAssertion(
DataStreamSource<Long> source = env.fromSequence(1, 10);
DataStream<Long> stream =
source.map(new FailingCheckpointMapper(failed, lastCheckpointedRecord));

stream.sinkTo(
new PscSinkBuilder<Long>()
.setDeliverGuarantee(guarantee)
// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setPscProducerConfig(getPscClientConfiguration())
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString(topicUriStr)
.setValueSerializationSchema(new RecordSerializer())
.build())
.setTransactionalIdPrefix("kafka-sink")
.setTransactionalIdPrefix("psc-sink")
.build());
env.execute();

final List<PscConsumerMessage<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(topic, guarantee == DeliveryGuarantee.EXACTLY_ONCE);
drainAllRecordsFromTopic(topicUriStr, guarantee == DeliveryGuarantee.EXACTLY_ONCE);
recordsAssertion.accept(deserializeValues(collectedRecords));
checkProducerLeak();
}
Expand All @@ -404,20 +406,19 @@ private void writeRecordsToKafka(
source.sinkTo(
new PscSinkBuilder<Long>()
.setPscProducerConfig(producerProperties)
// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers())
.setDeliverGuarantee(deliveryGuarantee)
.setRecordSerializer(
PscRecordSerializationSchema.builder()
.setTopicUriString(topicUriStr)
.setValueSerializationSchema(new RecordSerializer())
.build())
.setTransactionalIdPrefix("kafka-sink")
.setTransactionalIdPrefix("psc-sink")
.build());
env.execute();

final List<PscConsumerMessage<byte[], byte[]>> collectedRecords =
drainAllRecordsFromTopic(
topic, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
topicUriStr, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE);
final long recordsCount = expectedRecords.get().get();
assertEquals(collectedRecords.size(), recordsCount);
assertThat(
Expand All @@ -439,16 +440,18 @@ record -> {
.collect(Collectors.toList());
}

private static Properties getKafkaClientConfiguration() {
final Properties standardProps = new Properties();
standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
standardProps.put("group.id", UUID.randomUUID().toString());
standardProps.put("enable.auto.commit", false);
standardProps.put("auto.offset.reset", "earliest");
standardProps.put("max.partition.fetch.bytes", 256);
standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS);
return standardProps;
private static Properties getPscClientConfiguration() {
Properties properties = new Properties();
properties.setProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "PscSinkITCase");
properties.setProperty(PscConfiguration.PSC_PRODUCER_BATCH_DURATION_MAX_MS, "0");
properties.setProperty(PscConfiguration.PSC_PRODUCER_BUFFER_SEND_BYTES, "131072");
properties.setProperty(PscConfiguration.PSC_PRODUCER_BUFFER_RECEIVE_BYTES, "32768");
properties.setProperty(PscConfiguration.PSC_PRODUCER_RETRIES, "2147483647");
properties.setProperty(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "PscSinkITCase");
properties.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, "PscSinkITCase");
properties.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
injectDiscoveryConfigs(properties, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX);
return properties;
}

private static PscConsumer<byte[], byte[]> createTestConsumer(
Expand Down Expand Up @@ -479,9 +482,9 @@ private void deleteTestTopic(String topic)
}

private List<PscConsumerMessage<byte[], byte[]>> drainAllRecordsFromTopic(
String topic, boolean committed) throws ConfigurationException, ConsumerException {
Properties properties = getKafkaClientConfiguration();
return PscUtil.drainAllRecordsFromTopic(topic, properties, committed);
String topicUriStr, boolean committed) throws ConfigurationException, ConsumerException {
Properties properties = getPscClientConfiguration();
return PscUtil.drainAllRecordsFromTopic(topicUriStr, properties, committed);
}

private static class RecordSerializer implements SerializationSchema<Long> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,12 @@ private void transitionTransactionManagerStateTo(
Object transactionManager, String state) {
PscCommon.invoke(transactionManager, "transitionTo", getTransactionManagerState(state));
}

private String getCurrentTransactionManagerState(Object transactionManager) {
return PscCommon.getField(transactionManager, "currentState").toString();
}

private String getCurrentTransactionalId(Object transactionManager) {
return PscCommon.getField(transactionManager, "transactionalId").toString();
}
}

0 comments on commit 602c345

Please sign in to comment.