Skip to content

Commit

Permalink
Merge pull request #38 from ambud/master
Browse files Browse the repository at this point in the history
Create monitor for Kafka Producer metrics and expose them as singer metrics
  • Loading branch information
ambud authored Feb 13, 2020
2 parents 4d4cef2 + f6cee83 commit 22a66a2
Show file tree
Hide file tree
Showing 11 changed files with 183 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.1</version>
<version>0.8.0.2</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.1</version>
<version>0.8.0.2</version>
<relativePath>../pom.xml</relativePath>
</parent>
<developers>
Expand Down
2 changes: 1 addition & 1 deletion singer/deb.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.8.0.1
0.8.0.2
8 changes: 7 additions & 1 deletion singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.1</version>
<version>0.8.0.2</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down Expand Up @@ -189,6 +189,12 @@
<artifactId>aws-java-sdk-s3</artifactId>
<version>1.11.35</version>
</dependency>
<dependency>
<groupId>com.salesforce.kafka.test</groupId>
<artifactId>kafka-junit4</artifactId>
<version>3.2.0</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
8 changes: 8 additions & 0 deletions singer/src/main/java/com/pinterest/singer/SingerMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ public void run() {
} catch (Throwable t) {
LOG.error("Shutdown failure: heartbeat generator : ", t);
}

try {
if (SingerSettings.getKafkaProducerMonitorThread() != null) {
SingerSettings.getKafkaProducerMonitorThread().interrupt();
}
}catch(Throwable t) {
LOG.error("Shutdown error: kafka producer metrics monitor : ", t);
}

try {
KafkaProducerManager.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import com.pinterest.singer.monitor.LogStreamManager;
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.thrift.configuration.SingerLogConfig;

import com.pinterest.singer.writer.KafkaProducerMetricsMonitor;
import com.twitter.ostrich.stats.Stats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
Expand Down Expand Up @@ -98,6 +98,8 @@ public final class SingerSettings {
// loggingAuditClient is used to send LoggingAuditEvent if LoggingAudit feature is enabled and
// a TopicAuditConfig is set for a given logStream.
private static LoggingAuditClient loggingAuditClient = null;

private static Thread kafkaProducerMonitorThread;

private SingerSettings() {
}
Expand Down Expand Up @@ -155,6 +157,10 @@ public static void initialize(SingerConfig config)

logWritingExecutors.put(clusterSig, threadPool);
}

kafkaProducerMonitorThread = new Thread(new KafkaProducerMetricsMonitor());
kafkaProducerMonitorThread.setDaemon(true);
kafkaProducerMonitorThread.start();

if (loggingAuditClient != null && logConfig.isEnableLoggingAudit() &&
logConfig.getAuditConfig() != null){
Expand Down Expand Up @@ -352,4 +358,8 @@ public static LoggingAuditClient getLoggingAuditClient() {
public static void setLoggingAuditClient(LoggingAuditClient loggingAuditClient) {
SingerSettings.loggingAuditClient = loggingAuditClient;
}

public static Thread getKafkaProducerMonitorThread() {
return kafkaProducerMonitorThread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
public class LogConfigUtils {

private static final Logger LOG = LoggerFactory.getLogger(LogConfigUtils.class);
private static final String DEFAULT_SERVERSET_DIR = "/var/serverset";
public static final String DEFAULT_SERVERSET_DIR = "/var/serverset";
private static final String DEFAULT_ACKS = "1";
private static final String ACKS_ALL = "all";
private static final long MaximumProcessingTimeSliceInMilliseconds = 864000000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -125,4 +126,8 @@ private void shutdownInternal() {
}
}
}

public Map<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> getProducers() {
return producers;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/**
* Copyright 2020 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.singer.writer;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
import com.pinterest.singer.utils.LogConfigUtils;

/**
* Responsible for pulling metrics from {@link KafkaProducer} and copying them
* to Ostrich so they can be accessed and forwarded. This helps provide
* additional instrumentation on Singer and how it's performing.
*/
public class KafkaProducerMetricsMonitor implements Runnable {

private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerMetricsMonitor.class);
public static final Set<String> PRODUCER_METRICS_WHITELIST = new HashSet<>(
Arrays.asList("buffer-total-bytes", "buffer-available-bytes"));
// sample every 60seconds
private static final int SAMPLING_INTERVAL = 60_000;

@Override
public void run() {
while (true) {
try {
publishKafkaProducerMetricsToOstrich();
} catch (Exception e) {
LOG.warn("Error publishing KafkaProducer metrics", e);
}
try {
Thread.sleep(SAMPLING_INTERVAL);
} catch (InterruptedException e) {
LOG.warn("KafkaProducerMetricsMonitor thread interrupted, exiting");
break;
}
}
}

@SuppressWarnings({ "deprecation" })
protected void publishKafkaProducerMetricsToOstrich() {
Map<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> producers = KafkaProducerManager
.getInstance().getProducers();
for (Entry<KafkaProducerConfig, KafkaProducer<byte[], byte[]>> kafkaProducerEntry : producers
.entrySet()) {
KafkaProducerConfig key = kafkaProducerEntry.getKey();
String signature = convertSignatureToTag(key);
Map<MetricName, ? extends Metric> metrics = kafkaProducerEntry.getValue().metrics();
for (Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
if (PRODUCER_METRICS_WHITELIST.contains(entry.getKey().name())) {
OpenTsdbMetricConverter.gauge("kafkaproducer." + entry.getKey().name(),
entry.getValue().value(), "cluster=" + signature);
}
}
}
}

public static String convertSignatureToTag(KafkaProducerConfig key) {
return key.getKafkaClusterSignature()
.replaceAll("(" + LogConfigUtils.DEFAULT_SERVERSET_DIR + "|discovery|/|prod|\\.)", "");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Copyright 2020 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.singer.writer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.util.Arrays;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.junit.Test;

import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
import com.twitter.ostrich.stats.Stats;

public class TestKafkaProducerMetricsMonitor {

@Test
public void testSignatureTagExtractionTLS() {
KafkaProducerConfig config = new KafkaProducerConfig("/var/serverset/discovery/kafka_tls/prod",
Arrays.asList(), "-1");
assertEquals("kafka_tls", KafkaProducerMetricsMonitor.convertSignatureToTag(config));
}

@Test
public void testSignatureTagExtraction() {
KafkaProducerConfig config = new KafkaProducerConfig("/var/serverset/discovery/kafka/prod",
Arrays.asList(), "-1");
assertEquals("kafka", KafkaProducerMetricsMonitor.convertSignatureToTag(config));
}

@Test
public void testPublishMetrics() {
KafkaProducerConfig config = new KafkaProducerConfig("/var/serverset/discovery.kafka.prod",
Arrays.asList("localhost:9092"), "-1");
KafkaProducerManager.getInstance().getProducers().clear();
KafkaProducer<byte[], byte[]> producer = KafkaProducerManager.getProducer(config);
KafkaProducerMetricsMonitor monitor = new KafkaProducerMetricsMonitor();
monitor.publishKafkaProducerMetricsToOstrich();
producer.close();
for (String metricName : KafkaProducerMetricsMonitor.PRODUCER_METRICS_WHITELIST) {
Object gauge = Stats.getGauge("kafkaproducer." + metricName + " cluster=kafka").get();
assertNotNull(gauge);
}
}

}
2 changes: 1 addition & 1 deletion thrift-logger/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.8.0.1</version>
<version>0.8.0.2</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>thrift-logger</artifactId>
Expand Down

0 comments on commit 22a66a2

Please sign in to comment.