Skip to content

Commit

Permalink
WIP finished PscSinkITCase integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Sep 24, 2024
1 parent 864ea72 commit 424b901
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 333 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class PscWriter<IN>

private boolean closed = false;
private long lastSync = System.currentTimeMillis();
private boolean isMetricsInitialized = false;

/**
* Constructor creating a PSC writer.
Expand Down Expand Up @@ -156,7 +157,7 @@ class PscWriter<IN>
this.timeService = sinkInitContext.getProcessingTimeService();
this.metricGroup = sinkInitContext.metricGroup();
this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
this.numRecordsOutCounter = metricGroup.getNumRecordsSendCounter();
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.pscSinkContext =
new DefaultPscSinkContext(
Expand All @@ -183,13 +184,10 @@ class PscWriter<IN>
|| deliveryGuarantee == DeliveryGuarantee.NONE) {
this.currentProducer = new FlinkPscInternalProducer<>(this.pscProducerConfig, null);
closer.register(this.currentProducer);
initPscMetrics(this.currentProducer);
} else {
throw new UnsupportedOperationException(
"Unsupported PSC writer semantic " + this.deliveryGuarantee);
}

initFlinkMetrics();
}

@Override
Expand All @@ -198,7 +196,10 @@ public void write(IN element, Context context) throws IOException {
recordSerializer.serialize(element, pscSinkContext, context.timestamp());
try {
currentProducer.send(record, deliveryCallback);
} catch (ProducerException | ConfigurationException e) {
if (!isMetricsInitialized) {
initPscAndFlinkMetrics(currentProducer);
}
} catch (ConfigurationException | ClientException e) {
throw new RuntimeException(e);
}
numRecordsOutCounter.inc();
Expand Down Expand Up @@ -345,7 +346,8 @@ private FlinkPscInternalProducer<byte[], byte[]> getOrCreateTransactionalProduce
producer = new FlinkPscInternalProducer<>(pscProducerConfig, transactionalId);
closer.register(producer);
producer.initTransactions();
initPscMetrics(producer);
if (!isMetricsInitialized)
initPscAndFlinkMetrics(producer);
} else {
producer.initTransactionId(transactionalId);
}
Expand Down Expand Up @@ -387,6 +389,16 @@ private void initPscMetrics(FlinkPscInternalProducer<byte[], byte[]> producer) t
};
}

private void initPscAndFlinkMetrics(FlinkPscInternalProducer<byte[], byte[]> producer) {
try {
initPscMetrics(producer);
} catch (ClientException e) {
throw new RuntimeException("Failed to initialize PSC metrics", e);
}
initFlinkMetrics();
isMetricsInitialized = true;
}

private long computeSendTime() {
FlinkPscInternalProducer<byte[], byte[]> producer = this.currentProducer;
if (producer == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import com.pinterest.flink.connector.psc.PscFlinkConfiguration;
import com.pinterest.flink.connector.psc.sink.testutils.PscSinkExternalContextFactory;
import com.pinterest.flink.connector.psc.sink.testutils.PscSinkTestSuiteBase;
import com.pinterest.flink.connector.psc.testutils.PscUtil;
import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub;
import com.pinterest.psc.config.PscConfiguration;
Expand Down Expand Up @@ -182,7 +181,7 @@ public void tearDown() throws ExecutionException, InterruptedException, TimeoutE

/** Integration test based on connector testing framework. */
@Nested
class IntegrationTests extends PscSinkTestSuiteBase<String> {
class IntegrationTests extends SinkTestSuiteBase<String> {
// Defines test environment on Flink MiniCluster
@SuppressWarnings("unused")
@TestEnv
Expand Down
Loading

0 comments on commit 424b901

Please sign in to comment.