diff --git a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java index 1fb8a39..03a6e7d 100644 --- a/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java +++ b/psc-flink/src/main/java/com/pinterest/flink/connector/psc/sink/PscWriter.java @@ -97,6 +97,7 @@ class PscWriter private final SinkWriterMetricGroup metricGroup; private final boolean disabledMetrics; private final Counter numRecordsOutCounter; + private final Counter numRecordsSendCounter; private final Counter numBytesOutCounter; private final Counter numRecordsOutErrorsCounter; private final ProcessingTimeService timeService; @@ -158,7 +159,8 @@ class PscWriter this.timeService = sinkInitContext.getProcessingTimeService(); this.metricGroup = sinkInitContext.metricGroup(); this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); - this.numRecordsOutCounter = metricGroup.getNumRecordsSendCounter(); + this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter(); this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter(); this.pscSinkContext = new DefaultPscSinkContext( @@ -204,6 +206,7 @@ public void write(IN element, Context context) throws IOException { throw new RuntimeException(e); } numRecordsOutCounter.inc(); + numRecordsSendCounter.inc(); } @Override diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java index 73217f2..f41ba3b 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscTransactionLogITCase.java @@ -85,7 +85,7 @@ public void testGetTransactions() throws ConfigurationException, ConsumerExcepti lingeringTransaction(4); final PscTransactionLog transactionLog = - new PscTransactionLog(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString(), getKafkaClientConfiguration()); + new PscTransactionLog(PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_CLUSTER_URI.getTopicUriAsString(), getPscClientConfiguration()); final List transactions = transactionLog.getTransactions(); assertThat( transactions, @@ -162,7 +162,7 @@ private static String buildTransactionalId(long id) { } private static PscProducer createProducer(String transactionalId) throws ConfigurationException, ProducerException { - final Properties producerProperties = getKafkaClientConfiguration(); + final Properties producerProperties = getPscClientConfiguration(); producerProperties.put( PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName()); producerProperties.put( @@ -172,7 +172,7 @@ private static PscProducer createProducer(String transactionalI return new PscProducer<>(PscConfigurationUtils.propertiesToPscConfiguration(producerProperties)); } - private static Properties getKafkaClientConfiguration() { + private static Properties getPscClientConfiguration() { final Properties standardProps = new Properties(); // standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "flink-tests"); diff --git a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java index e2856a7..379a4c2 100644 --- a/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java +++ b/psc-flink/src/test/java/com/pinterest/flink/connector/psc/sink/PscWriterITCase.java @@ -17,7 +17,10 @@ package com.pinterest.flink.connector.psc.sink; +import com.pinterest.flink.connector.psc.PscFlinkConfiguration; +import com.pinterest.flink.streaming.connectors.psc.PscTestEnvironmentWithKafkaAsPubSub; import com.pinterest.psc.common.MessageId; +import com.pinterest.psc.config.PscConfiguration; import com.pinterest.psc.consumer.PscConsumerMessage; import com.pinterest.psc.exception.ClientException; import com.pinterest.psc.exception.startup.ConfigurationException; @@ -73,6 +76,7 @@ import java.util.function.Consumer; import java.util.stream.IntStream; +import static com.pinterest.flink.connector.psc.sink.testutils.PscTestUtils.injectDiscoveryConfigs; import static com.pinterest.flink.connector.psc.testutils.PscUtil.createKafkaContainer; import static com.pinterest.flink.connector.psc.testutils.PscUtil.drainAllRecordsFromTopic; import static org.apache.flink.util.DockerImageVersions.KAFKA; @@ -85,9 +89,10 @@ public class PscWriterITCase { private static final Logger LOG = LoggerFactory.getLogger(PscWriterITCase.class); private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; private static final Network NETWORK = Network.newNetwork(); - private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total"; + private static final String PSC_METRIC_WITH_GROUP_NAME = "PscProducer.incoming-byte-total"; private static final SinkWriter.Context SINK_WRITER_CONTEXT = new DummySinkWriterContext(); private String topic; + private String topicUriStr; private MetricListener metricListener; private TriggerTimeService timeService; @@ -113,14 +118,16 @@ public void setUp(TestInfo testInfo) { metricListener = new MetricListener(); timeService = new TriggerTimeService(); topic = testInfo.getDisplayName().replaceAll("\\W", ""); + topicUriStr = PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX + topic; } @ParameterizedTest @EnumSource(DeliveryGuarantee.class) public void testRegisterMetrics(DeliveryGuarantee guarantee) throws Exception { try (final PscWriter ignored = - createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) { - assertThat(metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()).isTrue(); + createWriterWithConfiguration(getPscClientConfiguration(), guarantee)) { + ignored.write(1, SINK_WRITER_CONTEXT); // write one record to trigger backendProducer creation + assertThat(metricListener.getGauge(PSC_METRIC_WITH_GROUP_NAME).isPresent()).isTrue(); } } @@ -140,7 +147,7 @@ public void testIncreasingRecordBasedCounters() throws Exception { metricListener.getMetricGroup(), operatorIOMetricGroup); try (final PscWriter writer = createWriterWithConfiguration( - getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { + getPscClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); @@ -165,13 +172,9 @@ public void testCurrentSendTimeMetric() throws Exception { InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); try (final PscWriter writer = createWriterWithConfiguration( - getKafkaClientConfiguration(), + getPscClientConfiguration(), DeliveryGuarantee.AT_LEAST_ONCE, metricGroup)) { - final Optional> currentSendTime = - metricListener.getGauge("currentSendTime"); - assertThat(currentSendTime.isPresent()).isTrue(); - assertThat(currentSendTime.get().getValue()).isEqualTo(0L); IntStream.range(0, 100) .forEach( (run) -> { @@ -185,13 +188,17 @@ public void testCurrentSendTimeMetric() throws Exception { throw new RuntimeException("Failed writing Kafka record."); } }); + Thread.sleep(500L); + final Optional> currentSendTime = + metricListener.getGauge("currentSendTime"); + assertThat(currentSendTime.isPresent()).isTrue(); assertThat(currentSendTime.get().getValue()).isGreaterThan(0L); } } @Test void testNumRecordsOutErrorsCounterMetric() throws Exception { - Properties properties = getKafkaClientConfiguration(); + Properties properties = getPscClientConfiguration(); final InternalSinkWriterMetricGroup metricGroup = InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); @@ -210,8 +217,8 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception { new FlinkPscInternalProducer<>(properties, transactionalId)) { producer.initTransactions(); -// producer.beginTransaction(); - producer.send(new PscProducerMessage<>(topic, "2".getBytes())); + producer.beginTransaction(); + producer.send(new PscProducerMessage<>(topicUriStr, "2".getBytes())); producer.commitTransaction(); } @@ -227,14 +234,14 @@ public void testMetadataPublisher() throws Exception { List metadataList = new ArrayList<>(); try (final PscWriter writer = createWriterWithConfiguration( - getKafkaClientConfiguration(), + getPscClientConfiguration(), DeliveryGuarantee.AT_LEAST_ONCE, InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), - meta -> metadataList.add(meta.toString()))) { + meta -> metadataList.add(meta.getTopicUriPartition().getTopicUriAsString() + "-" + meta.getTopicUriPartition().getPartition() + "@" + meta.getOffset()))) { List expected = new ArrayList<>(); for (int i = 0; i < 100; i++) { writer.write(1, SINK_WRITER_CONTEXT); - expected.add("testMetadataPublisher-0@" + i); + expected.add(topicUriStr + "-0@" + i); } writer.flush(false); assertThat(metadataList).usingRecursiveComparison().isEqualTo(expected); @@ -246,7 +253,7 @@ public void testMetadataPublisher() throws Exception { void testLingeringTransaction() throws Exception { final PscWriter failedWriter = createWriterWithConfiguration( - getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE); + getPscClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE); // create two lingering transactions failedWriter.flush(false); @@ -258,7 +265,7 @@ void testLingeringTransaction() throws Exception { try (final PscWriter recoveredWriter = createWriterWithConfiguration( - getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { + getPscClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { recoveredWriter.write(1, SINK_WRITER_CONTEXT); recoveredWriter.flush(false); @@ -271,7 +278,7 @@ void testLingeringTransaction() throws Exception { committable.getProducer().get().getObject().commitTransaction(); List> records = - drainAllRecordsFromTopic(topic, getKafkaClientConfiguration(), true); + drainAllRecordsFromTopic(topicUriStr, getPscClientConfiguration(), true); assertThat(records).hasSize(1); } @@ -286,7 +293,7 @@ void testLingeringTransaction() throws Exception { mode = EnumSource.Mode.EXCLUDE) void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exception { try (final PscWriter writer = - createWriterWithConfiguration(getKafkaClientConfiguration(), guarantee)) { + createWriterWithConfiguration(getPscClientConfiguration(), guarantee)) { assertThat(writer.getProducerPool()).hasSize(0); FlinkPscInternalProducer firstProducer = writer.getCurrentProducer(); @@ -307,7 +314,7 @@ void useSameProducerForNonTransactional(DeliveryGuarantee guarantee) throws Exce void usePoolForTransactional() throws Exception { try (final PscWriter writer = createWriterWithConfiguration( - getKafkaClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { + getPscClientConfiguration(), DeliveryGuarantee.EXACTLY_ONCE)) { assertThat(writer.getProducerPool()).hasSize(0); writer.flush(false); @@ -348,11 +355,11 @@ void usePoolForTransactional() throws Exception { */ @Test void testAbortOnClose() throws Exception { - Properties properties = getKafkaClientConfiguration(); + Properties properties = getPscClientConfiguration(); try (final PscWriter writer = createWriterWithConfiguration(properties, DeliveryGuarantee.EXACTLY_ONCE)) { writer.write(1, SINK_WRITER_CONTEXT); - assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(0); + assertThat(drainAllRecordsFromTopic(topicUriStr, properties, true)).hasSize(0); } try (final PscWriter writer = @@ -372,18 +379,18 @@ void testAbortOnClose() throws Exception { producer.commitTransaction(); } - assertThat(drainAllRecordsFromTopic(topic, properties, true)).hasSize(1); + assertThat(drainAllRecordsFromTopic(topicUriStr, properties, true)).hasSize(1); } } private void assertKafkaMetricNotPresent( DeliveryGuarantee guarantee, String configKey, String configValue) throws Exception { - final Properties config = getKafkaClientConfiguration(); + final Properties config = getPscClientConfiguration(); config.put(configKey, configValue); try (final PscWriter ignored = createWriterWithConfiguration(config, guarantee)) { Assertions.assertFalse( - metricListener.getGauge(KAFKA_METRIC_WITH_GROUP_NAME).isPresent()); + metricListener.getGauge(PSC_METRIC_WITH_GROUP_NAME).isPresent()); } } @@ -421,14 +428,17 @@ private PscWriter createWriterWithConfiguration( } } - private static Properties getKafkaClientConfiguration() { + private static Properties getPscClientConfiguration() { final Properties standardProps = new Properties(); - standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); - standardProps.put("group.id", "kafkaWriter-tests"); - standardProps.put("enable.auto.commit", false); - standardProps.put("key.serializer", ByteArraySerializer.class.getName()); - standardProps.put("value.serializer", ByteArraySerializer.class.getName()); - standardProps.put("auto.offset.reset", "earliest"); + standardProps.put(PscConfiguration.PSC_CONSUMER_GROUP_ID, "pscWriter-tests"); + standardProps.put(PscConfiguration.PSC_CONSUMER_COMMIT_AUTO_ENABLED, false); + standardProps.put(PscConfiguration.PSC_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class.getName()); + standardProps.put(PscConfiguration.PSC_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class.getName()); + standardProps.put(PscConfiguration.PSC_PRODUCER_CLIENT_ID, "pscWriter-tests"); + standardProps.put(PscConfiguration.PSC_AUTO_RESOLUTION_ENABLED, false); + standardProps.put(PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET, PscConfiguration.PSC_CONSUMER_OFFSET_AUTO_RESET_EARLIEST); + standardProps.put(PscFlinkConfiguration.CLUSTER_URI_CONFIG, PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); + injectDiscoveryConfigs(standardProps, KAFKA_CONTAINER.getBootstrapServers(), PscTestEnvironmentWithKafkaAsPubSub.PSC_TEST_TOPIC_URI_PREFIX); return standardProps; } @@ -498,7 +508,7 @@ private class DummyRecordSerializer implements PscRecordSerializationSchema serialize( Integer element, PscSinkContext context, Long timestamp) { - return new PscProducerMessage<>(topic, ByteBuffer.allocate(4).putInt(element).array()); + return new PscProducerMessage<>(topicUriStr, ByteBuffer.allocate(4).putInt(element).array()); } } diff --git a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java index d597051..079a487 100644 --- a/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java +++ b/psc/src/main/java/com/pinterest/psc/producer/kafka/PscKafkaProducer.java @@ -780,10 +780,7 @@ private void handleNullTransactionManager() throws ProducerException { public Map metrics() throws ProducerException { if (kafkaProducer == null) handleUninitializedKafkaProducer("metrics()"); - - if (metricValueProvider.getMetrics().isEmpty()) { - reportProducerMetrics(); - } + reportProducerMetrics(); return metricValueProvider.getMetrics(); }