From 602c3456270fddc684b932e8b8813f57148ee6ef Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Wed, 25 Sep 2024 22:48:39 -0400 Subject: [PATCH] WIP finish PscSinkITCase --- .../psc/sink/FlinkPscInternalProducer.java | 20 +++---- .../connector/psc/sink/PscCommitter.java | 3 +- .../flink/connector/psc/sink/PscWriter.java | 1 + .../connector/psc/sink/PscSinkITCase.java | 57 ++++++++++--------- .../KafkaTransactionManagerOperator.java | 8 +++ 5 files changed, 48 insertions(+), 41 deletions(-) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java index 5d1d16b..999dc5b 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/FlinkPscInternalProducer.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.time.Duration; +import java.util.Collections; import java.util.Properties; import java.util.concurrent.Future; @@ -51,12 +52,13 @@ class FlinkPscInternalProducer extends PscProducer { @Nullable private String transactionalId; private volatile boolean inTransaction; private volatile boolean closed; + private TopicUri clusterUri = null; public FlinkPscInternalProducer(Properties properties, @Nullable String transactionalId) throws ConfigurationException, ProducerException, TopicUriSyntaxException { super(PscConfigurationUtils.propertiesToPscConfiguration(withTransactionalId(properties, transactionalId))); if (transactionalId != null) { // Producer is transactional, so the backend producer should be immediately initialized given the ClusterUri in the properties - TopicUri clusterUri = PscFlinkConfiguration.validateAndGetBaseClusterUri(properties); + this.clusterUri = PscFlinkConfiguration.validateAndGetBaseClusterUri(properties); getBackendProducerForTopicUri(validateTopicUri(clusterUri.getTopicUriAsString())); } this.transactionalId = transactionalId; @@ -115,7 +117,7 @@ public void close() throws IOException { // If this producer is still in transaction, it should be committing. // However, at this point, we cannot decide that and we shouldn't prolong cancellation. // So hard kill this producer with all resources. - super.close(); + super.close(Duration.ZERO); } else { // If this is outside of a transaction, we should be able to cleanly shutdown. super.close(Duration.ofHours(1)); @@ -209,6 +211,7 @@ private void flushNewPartitions() throws ProducerException { public void resumeTransaction(long producerId, short epoch) throws ProducerException { ensureOnlyOneBackendProducer(); checkState(!inTransaction, "Already in transaction %s", transactionalId); + checkState(clusterUri != null, "ClusterUri is not set"); checkState( producerId >= 0 && epoch >= 0, "Incorrect values for producerId %s and epoch %s", @@ -220,18 +223,9 @@ public void resumeTransaction(long producerId, short epoch) throws ProducerExcep producerId, epoch); - Object transactionManager = getTransactionManager(); PscProducerTransactionalProperties pscProducerTransactionalProperties = new PscProducerTransactionalProperties(producerId, epoch); - synchronized (transactionManager) { - TransactionManagerUtils.resumeTransaction( - transactionManager, - pscProducerTransactionalProperties - ); - PscBackendProducer backendProducer = getBackendProducers().iterator().next(); - setBackendProducerTransactionalState(backendProducer, TransactionalState.IN_TRANSACTION); - setTransactionalState(TransactionalState.INIT_AND_BEGUN); - this.inTransaction = true; - } + super.resumeTransaction(pscProducerTransactionalProperties, Collections.singleton(clusterUri.getTopicUriAsString())); + this.inTransaction = true; } private void ensureOnlyOneBackendProducer() { diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java index 2307440..4f75934 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscCommitter.java @@ -63,7 +63,7 @@ public void commit(Collection> requests) for (CommitRequest request : requests) { final PscCommittable committable = request.getCommittable(); final String transactionalId = committable.getTransactionalId(); - LOG.debug("Committing Kafka transaction {}", transactionalId); + LOG.debug("Committing transaction {}", transactionalId); Optional>> recyclable = committable.getProducer(); FlinkPscInternalProducer producer; @@ -83,6 +83,7 @@ public void commit(Collection> requests) recyclable.ifPresent(Recyclable::close); } catch (Exception ex) { // TODO: make exception handling backend-agnostic + LOG.warn("Caught exception while committing transaction {}", transactionalId, ex); Throwable cause = ex.getCause(); try { if (cause instanceof Exception) diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java index 7b41a04..1fb8a39 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java @@ -52,6 +52,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java index 6f0218f..43fb295 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscSinkITCase.java @@ -244,7 +244,10 @@ public void testRecoveryWithExactlyOnceGuarantee() throws Exception { contains( LongStream.range(1, lastCheckpointedRecord.get().get() + 1) .boxed() - .toArray()))); + .toArray() + ) + ) + ); } @Test @@ -291,7 +294,7 @@ public void testAbortTransactionsOfPendingCheckpointsAfterFailure() throws Excep executeWithMapper( new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, "newPrefix"); final List> collectedRecords = - drainAllRecordsFromTopic(topic, true); + drainAllRecordsFromTopic(topicUriStr, true); assertThat( deserializeValues(collectedRecords), contains( @@ -312,7 +315,7 @@ public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce e.getCause().getCause().getMessage(), containsString("Exceeded checkpoint tolerable failure")); } - assertTrue(deserializeValues(drainAllRecordsFromTopic(topic, true)).isEmpty()); + assertTrue(deserializeValues(drainAllRecordsFromTopic(topicUriStr, true)).isEmpty()); // Second job aborts all transactions from previous runs with higher parallelism config.set(CoreOptions.DEFAULT_PARALLELISM, 1); @@ -320,7 +323,7 @@ public void testAbortTransactionsAfterScaleInBeforeFirstCheckpoint() throws Exce executeWithMapper( new FailingCheckpointMapper(failed, lastCheckpointedRecord), config, null); final List> collectedRecords = - drainAllRecordsFromTopic(topic, true); + drainAllRecordsFromTopic(topicUriStr, true); assertThat( deserializeValues(collectedRecords), contains( @@ -342,14 +345,14 @@ private void executeWithMapper( final PscSinkBuilder builder = new PscSinkBuilder() .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) -// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) + .setPscProducerConfig(getPscClientConfiguration()) .setRecordSerializer( PscRecordSerializationSchema.builder() - .setTopicUriString(topic) + .setTopicUriString(topicUriStr) .setValueSerializationSchema(new RecordSerializer()) .build()); if (transactionalIdPrefix == null) { - transactionalIdPrefix = "kafka-sink"; + transactionalIdPrefix = "psc-sink"; } builder.setTransactionalIdPrefix(transactionalIdPrefix); stream.sinkTo(builder.build()); @@ -368,22 +371,21 @@ private void testRecoveryWithAssertion( DataStreamSource source = env.fromSequence(1, 10); DataStream stream = source.map(new FailingCheckpointMapper(failed, lastCheckpointedRecord)); - stream.sinkTo( new PscSinkBuilder() .setDeliverGuarantee(guarantee) -// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) + .setPscProducerConfig(getPscClientConfiguration()) .setRecordSerializer( PscRecordSerializationSchema.builder() .setTopicUriString(topicUriStr) .setValueSerializationSchema(new RecordSerializer()) .build()) - .setTransactionalIdPrefix("kafka-sink") + .setTransactionalIdPrefix("psc-sink") .build()); env.execute(); final List> collectedRecords = - drainAllRecordsFromTopic(topic, guarantee == DeliveryGuarantee.EXACTLY_ONCE); + drainAllRecordsFromTopic(topicUriStr, guarantee == DeliveryGuarantee.EXACTLY_ONCE); recordsAssertion.accept(deserializeValues(collectedRecords)); checkProducerLeak(); } @@ -404,20 +406,19 @@ private void writeRecordsToKafka( source.sinkTo( new PscSinkBuilder() .setPscProducerConfig(producerProperties) -// .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) .setDeliverGuarantee(deliveryGuarantee) .setRecordSerializer( PscRecordSerializationSchema.builder() .setTopicUriString(topicUriStr) .setValueSerializationSchema(new RecordSerializer()) .build()) - .setTransactionalIdPrefix("kafka-sink") + .setTransactionalIdPrefix("psc-sink") .build()); env.execute(); final List> collectedRecords = drainAllRecordsFromTopic( - topic, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE); + topicUriStr, deliveryGuarantee == DeliveryGuarantee.EXACTLY_ONCE); final long recordsCount = expectedRecords.get().get(); assertEquals(collectedRecords.size(), recordsCount); assertThat( @@ -439,16 +440,18 @@ record -> { .collect(Collectors.toList()); } - private static Properties getKafkaClientConfiguration() { - final Properties standardProps = new Properties(); - standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); - standardProps.put("group.id", UUID.randomUUID().toString()); - standardProps.put("enable.auto.commit", false); - standardProps.put("auto.offset.reset", "earliest"); - standardProps.put("max.partition.fetch.bytes", 256); - standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS); - standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS); - return standardProps; + private static Properties getPscClientConfiguration() { + Properties properties = new Properties(); + properties.setProperty(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "PscSinkITCase"); + properties.setProperty(PscConfiguration.PSC_PRODUCER_BATCH_DURATION_MAX_MS, "0"); + properties.setProperty(PscConfiguration.PSC_PRODUCER_BUFFER_SEND_BYTES, "131072"); + properties.setProperty(PscConfiguration.PSC_PRODUCER_BUFFER_RECEIVE_BYTES, "32768"); + properties.setProperty(PscConfiguration.PSC_PRODUCER_RETRIES, "2147483647"); + properties.setProperty(PscConfiguration.PSC_CONSUMER_CLIENT_ID, "PscSinkITCase"); + properties.setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, "PscSinkITCase"); + properties.setProperty(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + injectDiscoveryConfigs(properties, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + return properties; } private static PscConsumer createTestConsumer( @@ -479,9 +482,9 @@ private void deleteTestTopic(String topic) } private List> drainAllRecordsFromTopic( - String topic, boolean committed) throws ConfigurationException, ConsumerException { - Properties properties = getKafkaClientConfiguration(); - return PscUtil.drainAllRecordsFromTopic(topic, properties, committed); + String topicUriStr, boolean committed) throws ConfigurationException, ConsumerException { + Properties properties = getPscClientConfiguration(); + return PscUtil.drainAllRecordsFromTopic(topicUriStr, properties, committed); } private static class RecordSerializer implements SerializationSchema { diff --git a/psc/src/main/java/com/pinterest/psc/producer/transaction/kafka/KafkaTransactionManagerOperator.java b/psc/src/main/java/com/pinterest/psc/producer/transaction/kafka/KafkaTransactionManagerOperator.java index f59c5c2..e093721 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/transaction/kafka/KafkaTransactionManagerOperator.java +++ b/psc/src/main/java/com/pinterest/psc/producer/transaction/kafka/KafkaTransactionManagerOperator.java @@ -286,4 +286,12 @@ private void transitionTransactionManagerStateTo( Object transactionManager, String state) { PscCommon.invoke(transactionManager, "transitionTo", getTransactionManagerState(state)); } + + private String getCurrentTransactionManagerState(Object transactionManager) { + return PscCommon.getField(transactionManager, "currentState").toString(); + } + + private String getCurrentTransactionalId(Object transactionManager) { + return PscCommon.getField(transactionManager, "transactionalId").toString(); + } }