diff --git a/pom.xml b/pom.xml
index 4467edbd..ad0a496e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
4.0.0
com.pinterest.singer
singer-package
- 0.7.3.21
+ 0.7.3.25
pom
Singer Logging Agent modules
2013
diff --git a/singer/deb.version b/singer/deb.version
index b4eb0c59..fa388981 100644
--- a/singer/deb.version
+++ b/singer/deb.version
@@ -1 +1 @@
-0.7.3.16
\ No newline at end of file
+0.7.3.25
\ No newline at end of file
diff --git a/singer/pom.xml b/singer/pom.xml
index 3534d6a8..94366b89 100644
--- a/singer/pom.xml
+++ b/singer/pom.xml
@@ -7,7 +7,7 @@
com.pinterest.singer
singer-package
- 0.7.3.21
+ 0.7.3.25
../pom.xml
@@ -200,7 +200,7 @@
org.apache.pulsar
pulsar-client
- 2.2.1
+ 2.3.2
org.mockito
diff --git a/singer/src/main/java/com/pinterest/singer/SingerMain.java b/singer/src/main/java/com/pinterest/singer/SingerMain.java
index e9721451..5c3ced91 100644
--- a/singer/src/main/java/com/pinterest/singer/SingerMain.java
+++ b/singer/src/main/java/com/pinterest/singer/SingerMain.java
@@ -20,8 +20,8 @@
import com.pinterest.singer.config.DirectorySingerConfigurator;
import com.pinterest.singer.config.SingerConfigurator;
import com.pinterest.singer.heartbeat.HeartbeatGenerator;
+import com.pinterest.singer.metrics.StatsPusher;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
-import com.pinterest.singer.metrics.OpenTsdbMetricsPusher;
import com.pinterest.singer.metrics.OstrichAdminService;
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.utils.SingerUtils;
@@ -34,10 +34,11 @@
public final class SingerMain {
+ private static final String SINGER_METRICS_PREFIX = "singer";
private static final Logger LOG = LoggerFactory.getLogger(SingerMain.class);
- private static final int TSDB_METRICS_PUSH_INTERVAL_IN_MILLISECONDS = 10 * 1000;
+ private static final int STATS_PUSH_INTERVAL_IN_MILLISECONDS = 10 * 1000;
protected static final String hostName = SingerUtils.getHostname();
- private static OpenTsdbMetricsPusher metricsPusher = null;
+ private static StatsPusher statsPusher = null;
private static String singerPath = "";
static class SingerCleanupThread extends Thread {
@@ -70,8 +71,8 @@ public void run() {
try {
OpenTsdbMetricConverter.incr("singer.shutdown", 1);
- if (metricsPusher!= null) {
- metricsPusher.sendMetrics(false);
+ if (statsPusher!= null) {
+ statsPusher.sendMetrics(false);
} else {
LOG.error("metricsPusher was not initialized properly.");
}
@@ -81,24 +82,28 @@ public void run() {
}
}
- static void startOstrichService(SingerConfig singerConfig) {
+ public static void startOstrichService(SingerConfig singerConfig) {
// do not start ostrich if Ostrich server is disabled
if (System.getenv(SingerMetrics.DISABLE_SINGER_OSTRICH) == null) {
OstrichAdminService ostrichService = new OstrichAdminService(singerConfig.getOstrichPort());
ostrichService.start();
}
+ // enable high granularity metrics we are running in canary
if (singerConfig.isSetStatsPusherHostPort()) {
- LOG.info("Starting the OpenTsdb metrics pusher");
+ LOG.info("Starting the stats pusher");
try {
+ @SuppressWarnings("unchecked")
+ Class pusherClass = (Class) Class.forName(singerConfig.getStatsPusherClass());
+ statsPusher = pusherClass.newInstance();
HostAndPort pushHostPort = HostAndPort.fromString(singerConfig.getStatsPusherHostPort());
- metricsPusher = new OpenTsdbMetricsPusher(
- pushHostPort.getHost(),
- pushHostPort.getPort(),
- // TODO: make the following 'prefix' and 'interval' configurable.
- new OpenTsdbMetricConverter("singer", hostName),
- TSDB_METRICS_PUSH_INTERVAL_IN_MILLISECONDS);
- metricsPusher.start();
- LOG.info("OpenTsdb metrics pusher started!");
+ // allows hostname to be overridden based on environment
+ statsPusher.configure(SingerSettings.getEnvironment().getHostname()
+ , SINGER_METRICS_PREFIX
+ , pushHostPort.getHost()
+ , pushHostPort.getPort()
+ , STATS_PUSH_INTERVAL_IN_MILLISECONDS);
+ statsPusher.start();
+ LOG.info("Stats pusher started!");
} catch (Throwable t) {
// pusher fail is OK, do
LOG.error("Exception when starting stats pusher: ", t);
diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
index 55772730..1d569855 100644
--- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
+++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java
@@ -66,6 +66,9 @@ public class SingerMetrics {
public static final String NUM_KAFKA_MESSAGES = SINGER_WRITER + "num_kafka_messages_delivery_success";
public static final String OVERSIZED_MESSAGES = SINGER_WRITER + "num_oversized_messages";
public static final String WRITE_FAILURE = SINGER_WRITER + "kafka_write_failure";
+ public static final String BROKER_WRITE_FAILURE = SINGER_WRITER + "broker_write_failure";
+ public static final String BROKER_WRITE_SUCCESS = SINGER_WRITER + "broker_write_success";
+ public static final String BROKER_WRITE_LATENCY = SINGER_WRITER + "broker_write_latency";
public static final String WRITER_BATCH_SIZE = SINGER_WRITER + "message_batch_size";
public static final String WRITER_SSL_EXCEPTION = SINGER_WRITER + "ssl_exception";
public static final String KAFKA_THROUGHPUT = SINGER_WRITER + "topic_kafka_throughput";
@@ -93,5 +96,7 @@ public class SingerMetrics {
public static final String SINGER_CONFIGURATOR_CONFIG_ERRORS_UNKNOWN = "singer.configurator.unexpected_config_errors";
public static final String LOCALITY_MISSING = "singer.locality.missing";
public static final String DISABLE_SINGER_OSTRICH = "DISABLE_OSTRICH_METRICS";
+ public static final String LEADER_INFO_EXCEPTION = SINGER_WRITER + "leader_info_exception";
+ public static final String MISSING_LOCAL_PARTITIONS = "singer.locality.missing_local_partitions";
}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java b/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java
index 7977f0c0..a2585a4f 100644
--- a/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java
+++ b/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java
@@ -17,6 +17,8 @@
import com.pinterest.singer.common.errors.SingerLogException;
import com.pinterest.singer.config.SingerDirectoryWatcher;
+import com.pinterest.singer.environment.Environment;
+import com.pinterest.singer.environment.EnvironmentProvider;
import com.pinterest.singer.heartbeat.HeartbeatGenerator;
import com.pinterest.singer.kubernetes.KubeService;
import com.pinterest.singer.monitor.FileSystemMonitor;
@@ -24,6 +26,7 @@
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
import com.twitter.ostrich.stats.Stats;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,6 +89,9 @@ public final class SingerSettings {
// initialized here so that unit tests aren't required to call the initialize method below
private static SortedMap> logConfigMap = new TreeMap<>();
+
+ //environment is production by default
+ private static Environment environment = new Environment();
private SingerSettings() {
}
@@ -98,6 +104,9 @@ public static void initialize(SingerConfig config)
NoSuchMethodException,
SingerLogException {
setSingerConfig(config);
+
+ loadAndSetSingerEnvironmentIfConfigured(config);
+ LOG.warn("Singer environment has been configured to:" + environment);
SingerSettings.logProcessorExecutor = Executors.newScheduledThreadPool(
singerConfig.getThreadPoolSize(),
@@ -224,6 +233,25 @@ public static synchronized FileSystemMonitor getOrCreateFileSystemMonitor(String
return mon;
}
+ protected static void loadAndSetSingerEnvironmentIfConfigured(SingerConfig config) {
+ if (config.getEnvironmentProviderClass() != null) {
+ try {
+ String environmentProviderClass = config.getEnvironmentProviderClass();
+ @SuppressWarnings("unchecked")
+ Class providerClass = (Class)
+ Class.forName(environmentProviderClass);
+ EnvironmentProvider provider = providerClass.newInstance();
+ Environment env = provider.getEnvironment();
+ if (env != null) {
+ environment = env;
+ return;
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to load Singer Environment configuration", e);
+ }
+ }
+ }
+
public static Map getFsMonitorMap() {
return fsMonitorMap;
}
@@ -282,4 +310,13 @@ public static ScheduledExecutorService getLogProcessorExecutor() {
public static void initializeConfigMap(SingerConfig config) {
logConfigMap = loadLogConfigMap(config);
}
+
+ public static Environment getEnvironment() {
+ return environment;
+ }
+
+ @VisibleForTesting
+ public static void setEnvironment(Environment environment) {
+ SingerSettings.environment = environment;
+ }
}
diff --git a/singer/src/main/java/com/pinterest/singer/environment/EnvVariableBasedEnvironmentProvider.java b/singer/src/main/java/com/pinterest/singer/environment/EnvVariableBasedEnvironmentProvider.java
new file mode 100644
index 00000000..07031711
--- /dev/null
+++ b/singer/src/main/java/com/pinterest/singer/environment/EnvVariableBasedEnvironmentProvider.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.environment;
+
+/**
+ * Uses environment variables to specify the provider. This class defaults to
+ * Environment.PROD unless the environment variable is explicitly configured.
+ */
+public class EnvVariableBasedEnvironmentProvider extends EnvironmentProvider {
+
+ private static final String DEPLOYMENT_STAGE = "DEPLOYMENT_STAGE";
+ private static final String LOCALITY = "LOCALITY";
+
+ @Override
+ protected String getLocality() {
+ String locality = System.getenv(LOCALITY);
+ if (locality != null) {
+ return locality;
+ } else {
+ return Environment.LOCALITY_NOT_AVAILABLE;
+ }
+ }
+
+ @Override
+ protected String getDeploymentStage () {
+ return System.getenv(DEPLOYMENT_STAGE);
+ }
+
+}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/environment/Environment.java b/singer/src/main/java/com/pinterest/singer/environment/Environment.java
new file mode 100644
index 00000000..7690a22e
--- /dev/null
+++ b/singer/src/main/java/com/pinterest/singer/environment/Environment.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.environment;
+
+import com.pinterest.singer.utils.SingerUtils;
+
+/**
+ * This indicates what environment is Singer running in.
+ *
+ * Singer Environment indicator can subsequently be used by any component that
+ * needs to switch functionality based on the environment it is running in.
+ *
+ * NOTE: all variable MUST have default initialized in case the loader doesn't
+ * work, all getters must return a NON-NULL value unless NULLs are expected.
+ */
+public class Environment {
+
+ public static final String LOCALITY_NOT_AVAILABLE = "n/a";
+ public static final String DEFAULT_HOSTNAME = SingerUtils.getHostname();
+ private String locality = LOCALITY_NOT_AVAILABLE;
+ private String deploymentStage;
+ private String hostname = DEFAULT_HOSTNAME;
+
+ /**
+ * @return the locality
+ */
+ public String getLocality() {
+ return locality;
+ }
+
+ /**
+ * @param locality the locality to set
+ */
+ public void setLocality(String locality) {
+ this.locality = locality;
+ }
+
+ /**
+ * @return the deploymentStage
+ */
+ public String getDeploymentStage() {
+ return deploymentStage;
+ }
+
+ /**
+ * @param deploymentStage the deploymentStage to set
+ */
+ public void setDeploymentStage(String deploymentStage) {
+ this.deploymentStage = deploymentStage;
+ }
+
+ /**
+ * @return the hostname
+ */
+ public String getHostname() {
+ return hostname;
+ }
+
+ /**
+ * @param hostname the hostname to set
+ */
+ public void setHostname(String hostname) {
+ this.hostname = hostname;
+ }
+
+}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/environment/EnvironmentProvider.java b/singer/src/main/java/com/pinterest/singer/environment/EnvironmentProvider.java
new file mode 100644
index 00000000..d8e83f3b
--- /dev/null
+++ b/singer/src/main/java/com/pinterest/singer/environment/EnvironmentProvider.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.environment;
+
+/**
+ * Environment provider for Singer, defaults to production. This class can be
+ * extended to change the behavior of environment provider.
+ */
+public abstract class EnvironmentProvider {
+
+ protected Environment environment = new Environment();
+
+ public EnvironmentProvider() {
+ environment.setDeploymentStage(getDeploymentStage());
+ environment.setLocality(getLocality());
+ environment.setHostname(getHostname());
+ }
+
+ protected abstract String getLocality();
+
+ protected abstract String getDeploymentStage();
+
+ protected String getHostname() {
+ return Environment.DEFAULT_HOSTNAME;
+ }
+
+ public Environment getEnvironment() {
+ return environment;
+ }
+
+}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java
index 491f4dd2..e9d20d99 100644
--- a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java
+++ b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java
@@ -21,17 +21,11 @@
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
-import java.security.KeyManagementException;
-import java.security.KeyStoreException;
-import java.security.NoSuchAlgorithmException;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -265,7 +259,7 @@ public void updatePodWatchers(String podUid, boolean isDelete) {
*/
public Set fetchPodUidsFromMetadata() throws IOException {
LOG.debug("Attempting to make pod md request");
- String response = makeGetRequest(PODS_MD_URL);
+ String response = SingerUtils.makeGetRequest(PODS_MD_URL);
LOG.debug("Received pod md response:" + response);
Gson gson = new Gson();
JsonObject obj = gson.fromJson(response, JsonObject.class);
@@ -283,30 +277,6 @@ public Set fetchPodUidsFromMetadata() throws IOException {
return podUids;
}
- /**
- * Make an HTTP Get request on the supplied URI and return the response entity
- * as {@link String}
- *
- * @param uri
- * @return
- * @throws IOException
- */
- private String makeGetRequest(String uri) throws IOException {
- HttpGet getPodRequest = new HttpGet(uri);
- try {
- CloseableHttpResponse response = SingerUtils.makeRequest(getPodRequest);
- if (response.getStatusLine().getStatusCode() != 200) {
- LOG.warn("Non-200 status code(" + response.getStatusLine().getStatusCode() + ") reason:"
- + response.getStatusLine().getReasonPhrase());
- }
- String entity = EntityUtils.toString(response.getEntity());
- response.close();
- return entity;
- } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) {
- throw new IOException(e);
- }
- }
-
/**
* Return the poll frequency in milliseconds
*
diff --git a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java
index 6b143a5e..8b5375fb 100644
--- a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java
+++ b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java
@@ -533,7 +533,6 @@ public void reportStats() {
"log=" + log.replace(":", "/")), maxLatency);
}
}
- Stats.setGauge(SingerMetrics.CURRENT_PROCESSOR_LATENCY, overallMaxLatency);
for (String log : perLogStuck.keySet()) {
OpenTsdbMetricConverter
.incr("singer.processor.stuck", perLogStuck.get(log), "log=" + log, "host=" + HOSTNAME);
diff --git a/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java b/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java
index 1025f29e..b11205c0 100644
--- a/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java
+++ b/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java
@@ -17,8 +17,21 @@
import java.io.File;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.kafka.clients.admin.AdminClient;
import com.pinterest.singer.config.DirectorySingerConfigurator;
+import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
+import com.pinterest.singer.thrift.configuration.KafkaWriterConfig;
+import com.pinterest.singer.thrift.configuration.SingerLogConfig;
+import com.pinterest.singer.thrift.configuration.WriterType;
import com.pinterest.singer.utils.LogConfigUtils;
/**
@@ -26,7 +39,7 @@
*/
public class LogConfigCheckTool {
- public static void main(String[] args) {
+ public static void main(String[] args) throws InterruptedException, ExecutionException {
if (args.length < 1) {
System.err.println("Must specify the configuration directory");
System.exit(-1);
@@ -47,17 +60,59 @@ public static void main(String[] args) {
}
}
+ Map> topicMap = new HashMap<>();
File[] listFiles = confDir.listFiles();
Arrays.sort(listFiles);
for (File file : listFiles) {
try {
- LogConfigUtils.parseLogConfigFromFile(file);
+ SingerLogConfig slc = LogConfigUtils.parseLogConfigFromFile(file);
+ KafkaWriterConfig kafkaWriterConfig = slc.getLogStreamWriterConfig().getKafkaWriterConfig();
+ if ((slc.getLogStreamWriterConfig().getType() == WriterType.KAFKA
+ || slc.getLogStreamWriterConfig().getType() == WriterType.KAFKA08)) {
+ if (kafkaWriterConfig == null) {
+ System.err
+ .println("Kafka writer specified with missing Kafka config:" + file.getName());
+ System.exit(-1);
+ } else {
+ KafkaProducerConfig producerConfig = kafkaWriterConfig.getProducerConfig();
+ if (producerConfig.getBrokerLists().isEmpty()) {
+ System.err
+ .println("No brokers in the kafka configuration specified:" + file.getName());
+ System.exit(-1);
+ }
+ String broker = producerConfig.getBrokerLists().get(0);
+ Set topics = topicMap.get(broker);
+ if (topics == null) {
+ topics = new HashSet<>();
+ topicMap.put(broker, topics);
+ }
+ topics.add(kafkaWriterConfig.getTopic() + " " + file.getName());
+ }
+ }
} catch (Exception e) {
e.printStackTrace();
System.err.println("Bad configuration file:" + file.getName());
System.exit(-1);
}
}
+ for (Entry> entry : topicMap.entrySet()) {
+ String bootstrapBroker = entry.getKey();
+ Properties properties = new Properties();
+ bootstrapBroker = bootstrapBroker.replace("9093", "9092");
+ properties.put("bootstrap.servers", bootstrapBroker);
+ AdminClient cl = AdminClient.create(properties);
+ Set topicSet = cl.listTopics().names().get();
+ for (String topicFileMap : entry.getValue()) {
+ String[] splits = topicFileMap.split(" ");
+ String topic = splits[0];
+ if (!topicSet.contains(topic)) {
+ System.err.println("Topic:" + topic + " doesn't exist for file:" + splits[1]);
+ cl.close();
+ System.exit(-1);
+ }
+ }
+ cl.close();
+ }
}
}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java
index 68a7681a..91f8b167 100644
--- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java
+++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java
@@ -18,6 +18,8 @@
import com.pinterest.singer.common.SingerConfigDef;
import com.pinterest.singer.config.ConfigFileServerSet;
import com.pinterest.singer.config.DirectorySingerConfigurator;
+import com.pinterest.singer.environment.EnvironmentProvider;
+import com.pinterest.singer.metrics.StatsPusher;
import com.pinterest.singer.thrift.configuration.DummyWriteConfig;
import com.pinterest.singer.thrift.configuration.FileNameMatchMode;
import com.pinterest.singer.thrift.configuration.HeartbeatWriterConfig;
@@ -143,11 +145,6 @@ public static SingerConfig parseDirBasedSingerConfigHeader(String singerConfigFi
errors.add("singer.setLogConfigPollIntervalSecs " + "is required for Singer Configuration");
}
- String statsPusherHostPort = singerConfiguration.getString("statsPusherHostPort");
- if (statsPusherHostPort != null) {
- result.setStatsPusherHostPort(statsPusherHostPort);
- }
-
try {
result.singerRestartConfig = parseSingerRestartConfig(configHeader);
} catch (ConfigurationException x) {
@@ -391,11 +388,16 @@ public static SingerLogConfig[] parseLogStreamConfig(AbstractConfiguration confi
/**
* Parse the common singer configuration
+ * @throws ConfigurationException
*/
- private static SingerConfig parseCommonSingerConfigHeader(AbstractConfiguration singerConfiguration) {
+ public static SingerConfig parseCommonSingerConfigHeader(AbstractConfiguration singerConfiguration) throws ConfigurationException {
SingerConfig singerConfig = new SingerConfig();
- singerConfig.setThreadPoolSize(singerConfiguration.getInt("threadPoolSize"));
- singerConfig.setOstrichPort(singerConfiguration.getInt("ostrichPort"));
+ if (singerConfiguration.containsKey("threadPoolSize")) {
+ singerConfig.setThreadPoolSize(singerConfiguration.getInt("threadPoolSize"));
+ }
+ if (singerConfiguration.containsKey("ostrichPort")) {
+ singerConfig.setOstrichPort(singerConfiguration.getInt("ostrichPort"));
+ }
if (singerConfiguration.containsKey("writerThreadPoolSize")) {
int writerThreadPoolSize = singerConfiguration.getInt("writerThreadPoolSize");
singerConfig.setWriterThreadPoolSize(writerThreadPoolSize);
@@ -418,7 +420,41 @@ private static SingerConfig parseCommonSingerConfigHeader(AbstractConfiguration
if (singerConfiguration.containsKey("heartbeatEnabled")) {
singerConfig.setHeartbeatEnabled(singerConfiguration.getBoolean("heartbeatEnabled"));
}
+
+ if (singerConfiguration.containsKey("environmentProviderClass")) {
+ String envProviderClass = singerConfiguration.getString("environmentProviderClass");
+ singerConfig.setEnvironmentProviderClass(envProviderClass);
+ // environmentProviderClass can be null therefore validation is only needed
+ // if user has configured it
+ try {
+ Class> cls = Class.forName(envProviderClass);
+ if (!EnvironmentProvider.class.isAssignableFrom(cls)) {
+ throw new ConfigurationException("environmentProviderClass " + envProviderClass
+ + " doesn't extend " + EnvironmentProvider.class.getName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new ConfigurationException("Couldn't find environmentProviderClass " + envProviderClass);
+ }
+ }
+ String statsPusherHostPort = singerConfiguration.getString("statsPusherHostPort");
+ if (statsPusherHostPort != null) {
+ singerConfig.setStatsPusherHostPort(statsPusherHostPort);
+
+ String statsPusherClass = singerConfiguration.getString("statsPusherClass");
+ if (statsPusherClass != null) {
+ singerConfig.setStatsPusherClass(statsPusherClass);
+ }
+ try {
+ Class> cls = Class.forName(singerConfig.getStatsPusherClass());
+ if (!StatsPusher.class.isAssignableFrom(cls)) {
+ throw new ConfigurationException("statsPusherClass " + singerConfig.getStatsPusherClass()
+ + " doesn't extend " + StatsPusher.class.getName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new ConfigurationException("Couldn't find statsPusherClass " + statsPusherClass);
+ }
+ }
return singerConfig;
}
@@ -466,7 +502,7 @@ private static PulsarProducerConfig parsePulsarProducerConfig(SubsetConfiguratio
.setCompressionType(subsetConfiguration.getString(SingerConfigDef.COMPRESSION_TYPE));
}
if (subsetConfiguration.containsKey(SingerConfigDef.PARTITIONER_CLASS)) {
- pulsarProducerConfig.setPartitionerClass(SingerConfigDef.PARTITIONER_CLASS);
+ pulsarProducerConfig.setPartitionerClass(subsetConfiguration.getString(SingerConfigDef.PARTITIONER_CLASS));
}
if (subsetConfiguration.containsKey(SingerConfigDef.PULSAR_SERVICE_URL)) {
pulsarProducerConfig
diff --git a/singer/src/main/java/com/pinterest/singer/utils/PartitionComparator.java b/singer/src/main/java/com/pinterest/singer/utils/PartitionComparator.java
new file mode 100644
index 00000000..4c7c9f32
--- /dev/null
+++ b/singer/src/main/java/com/pinterest/singer/utils/PartitionComparator.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.utils;
+
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.Comparator;
+
+/**
+ * Used to sort {@link PartitionInfo} by partitionid
+ */
+public class PartitionComparator implements Comparator {
+
+ @Override
+ public int compare(PartitionInfo p1, PartitionInfo p2) {
+ return Integer.compare(p1.partition(), p2.partition());
+ }
+
+}
diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
index c8a93067..e33cb8bf 100644
--- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
+++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java
@@ -55,10 +55,12 @@
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpRequestBase;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
/**
* The utility methods for Singer
@@ -274,4 +276,28 @@ public int compare(File file1, File file2) {
return NameFileComparator.NAME_REVERSE.compare(file1, file2);
}
}
+
+ /**
+ * Make an HTTP Get request on the supplied URI and return the response entity
+ * as {@link String}
+ *
+ * @param uri
+ * @return
+ * @throws IOException
+ */
+ public static String makeGetRequest(String uri) throws IOException {
+ HttpGet getPodRequest = new HttpGet(uri);
+ try {
+ CloseableHttpResponse response = SingerUtils.makeRequest(getPodRequest);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ LOG.warn("Non-200 status code(" + response.getStatusLine().getStatusCode() + ") reason:"
+ + response.getStatusLine().getReasonPhrase());
+ }
+ String entity = EntityUtils.toString(response.getEntity());
+ response.close();
+ return entity;
+ } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) {
+ throw new IOException(e);
+ }
+ }
}
diff --git a/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java b/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java
index ef7cf9ae..5aa7b91e 100644
--- a/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java
+++ b/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java
@@ -33,4 +33,8 @@ public static String getLogStreamStatName(LogStream logStream, String group, Str
public static String getLogStatName(SingerLog singerLog, String stat) {
return String.format("singer.%s.%s", singerLog.getLogName(), stat);
}
+
+ public static String pulsarTopicToMetricTag(String pulsarTopic) {
+ return pulsarTopic.replaceAll("persistent://", "");
+ }
}
diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java
index 9246cefb..54aabf9b 100644
--- a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java
+++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java
@@ -85,6 +85,11 @@ private KafkaProducer getProducerInternal(KafkaProducerConfig co
result = producers.get(config);
return result;
}
+
+
+ public static void injectTestProducer(KafkaProducerConfig config, KafkaProducer producer) {
+ KafkaProducerManager.getInstance().producers.put(config, producer);
+ }
/**
* Reset the kafka producer
diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java
index 86444c17..f00069d6 100644
--- a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java
+++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java
@@ -25,7 +25,7 @@
import com.pinterest.singer.thrift.LogMessage;
import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
import com.pinterest.singer.thrift.configuration.SingerRestartConfig;
-import com.pinterest.singer.utils.SingerUtils;
+import com.pinterest.singer.utils.PartitionComparator;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -37,6 +37,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -52,8 +53,9 @@
*/
public class KafkaWriter implements LogStreamWriter {
- public static final String HOSTNAME = SingerUtils.getHostname();
+ public static final String HOSTNAME = SingerSettings.getEnvironment().getHostname();
private static final Logger LOG = LoggerFactory.getLogger(KafkaWriter.class);
+ private static final PartitionComparator COMPARATOR = new PartitionComparator();
// Counter for the number of batch message writing failures
private static AtomicInteger failureCounter = new AtomicInteger(0);
@@ -118,20 +120,23 @@ public KafkaWriter(LogStream logStream,
this.serializer = new TSerializer();
}
- protected KafkaWriter(KafkaMessagePartitioner partitioner,
- boolean skipNoLeaderPartitions) {
+ protected KafkaWriter(KafkaProducerConfig producerConfig,
+ KafkaMessagePartitioner partitioner,
+ String topic,
+ boolean skipNoLeaderPartitions,
+ ExecutorService clusterThreadPool) {
this.partitioner = partitioner;
this.skipNoLeaderPartitions = skipNoLeaderPartitions;
+ this.producerConfig = producerConfig;
+ this.clusterThreadPool = clusterThreadPool;
+ this.topic = topic;
logStream = null;
logName = null;
- topic = null;
writeTimeoutInSeconds = 0;
serializer = null;
auditingEnabled = false;
auditTopic = null;
- producerConfig = null;
kafkaClusterSig = null;
- clusterThreadPool = null;
}
@Override
@@ -167,20 +172,19 @@ private void includeAuditMessageInBatch(List> mes
*
* If skipNoLeaderPartitions flag is set, we will skip the partitions that have no leader.
*
- * @param producer kafkaProducer object
+ * @param partitions unordered list of partitionInfo to be used for partitioning this batch
* @param topic the kafka topic
* @param logMessages the messages that will be written to kafka
* @return a list of message lists that are classified based on partitions.
*/
List>> messageCollation(
- KafkaProducer producer,
+ List partitions,
String topic,
List logMessages) throws Exception {
LOG.info("Collate " + logMessages.size() + " messages");
List>> buckets = new ArrayList<>();
try {
- List partitions = producer.partitionsFor(topic);
List validPartitions = partitions;
if (skipNoLeaderPartitions) {
validPartitions = new ArrayList<>();
@@ -219,7 +223,7 @@ List>> messageCollation(
@Override
public void writeLogMessages(List logMessages) throws LogStreamWriterException {
- Set> set = new HashSet<>();
+ Set> resultSet = new HashSet<>();
KafkaProducer producer = KafkaProducerManager.getProducer(producerConfig);
Preconditions.checkNotNull(producer);
@@ -227,22 +231,30 @@ public void writeLogMessages(List logMessages) throws LogStreamWrite
producer.beginTransaction();
}
try {
+ List partitions = producer.partitionsFor(topic);
List>>
- buckets = messageCollation(producer, topic, logMessages);
+ buckets = messageCollation(partitions, topic, logMessages);
+
+ // we sort this info after, we have to create a copy of the data since
+ // the returned list is immutable
+ List sortedPartitions = new ArrayList<>(partitions);
+ Collections.sort(sortedPartitions, COMPARATOR);
for (List> msgs : buckets) {
- if (auditingEnabled && msgs.size() > 0) {
- includeAuditMessageInBatch(msgs);
+ if (msgs.size() > 0) {
+ if (auditingEnabled) {
+ includeAuditMessageInBatch(msgs);
+ }
+ Callable worker =
+ new KafkaWritingTask(producer, msgs, writeTimeoutInSeconds, sortedPartitions);
+ Future future = clusterThreadPool.submit(worker);
+ resultSet.add(future);
}
- Callable worker =
- new KafkaWritingTask(producer, msgs, writeTimeoutInSeconds);
- Future future = clusterThreadPool.submit(worker);
- set.add(future);
}
int bytesWritten = 0;
int maxKafkaBatchWriteLatency = 0;
- for (Future f : set) {
+ for (Future f : resultSet) {
KafkaWritingTaskResult result = f.get();
if (!result.success) {
LOG.error("Failed to write messages to kafka topic {}", topic, result.exception);
@@ -286,7 +298,7 @@ public void writeLogMessages(List logMessages) throws LogStreamWrite
throw new LogStreamWriterException("Failed to write messages to topic " + topic, e);
} finally {
- for (Future f : set) {
+ for (Future f : resultSet) {
if (!f.isDone() && !f.isCancelled()) {
f.cancel(true);
}
diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java
index 849eabd1..702eb445 100644
--- a/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java
+++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java
@@ -15,20 +15,21 @@
*/
package com.pinterest.singer.writer;
-import com.pinterest.singer.common.SingerMetrics;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
-import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import com.pinterest.singer.common.SingerMetrics;
+import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
public class KafkaWritingTask implements Callable {
@@ -38,14 +39,23 @@ public class KafkaWritingTask implements Callable {
private List> messages;
private int writeTimeoutInSeconds;
private long taskCreationTimeInMillis;
+ private String leaderNode;
public KafkaWritingTask(KafkaProducer producer,
List> msgs,
- int writeTimeoutInSeconds) {
+ int writeTimeoutInSeconds,
+ List sortedPartitions) {
this.producer = producer;
this.messages = msgs;
this.writeTimeoutInSeconds = writeTimeoutInSeconds;
this.taskCreationTimeInMillis = System.currentTimeMillis();
+ try {
+ leaderNode = sortedPartitions.get(msgs.get(0).partition()).leader().host();
+ } catch (Exception e) {
+ LOG.error("Error getting leader node from partition metadata", e);
+ OpenTsdbMetricConverter.incr(SingerMetrics.LEADER_INFO_EXCEPTION, 1, "host=" + KafkaWriter.HOSTNAME);
+ leaderNode = "n/a";
+ }
}
@Override
@@ -81,10 +91,13 @@ public KafkaWritingTaskResult call() {
}
if (result == null) {
- long kafkaLatency = System.currentTimeMillis() - taskCreationTimeInMillis;
+ // we can down convert since latency should be less that Integer.MAX_VALUE
+ int kafkaLatency = (int)(System.currentTimeMillis() - taskCreationTimeInMillis);
// we shouldn't have latency creater than 2B milliseoncds so it should be okay
// to downcast to integer
result = new KafkaWritingTaskResult(true, bytesWritten, (int) kafkaLatency);
+ OpenTsdbMetricConverter.incrGranular(SingerMetrics.BROKER_WRITE_SUCCESS, 1, "broker=" + leaderNode);
+ OpenTsdbMetricConverter.addGranularMetric(SingerMetrics.BROKER_WRITE_LATENCY, kafkaLatency, "broker=" + leaderNode);
}
} catch (org.apache.kafka.common.errors.RecordTooLargeException e) {
LOG.error("Kafka write failure due to excessively large message size", e);
@@ -100,6 +113,7 @@ public KafkaWritingTaskResult call() {
String errorMsg = "Failed to write " + messages.size() + " messages to kafka";
LOG.error(errorMsg, e);
OpenTsdbMetricConverter.incr(SingerMetrics.WRITE_FAILURE, 1, "topic=" + topic, "host=" + KafkaWriter.HOSTNAME);
+ OpenTsdbMetricConverter.incrGranular(SingerMetrics.BROKER_WRITE_FAILURE, 1, "broker=" + leaderNode);
result = new KafkaWritingTaskResult(false, e);
} finally {
if (!result.success) {
diff --git a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwarePartitioner.java b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwarePartitioner.java
new file mode 100644
index 00000000..e2eca80a
--- /dev/null
+++ b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwarePartitioner.java
@@ -0,0 +1,117 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.writer.partitioners;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.pinterest.singer.common.SingerMetrics;
+import com.pinterest.singer.common.SingerSettings;
+import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
+import com.pinterest.singer.writer.KafkaMessagePartitioner;
+import com.pinterest.singer.writer.KafkaWriter;
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Locality (aws ec2 az) aware Kafka partitioner. This partitioner attempts to
+ * write to partitions whose leaders are in the same locality as the Kafka
+ * leader for the partition.
+ */
+public abstract class LocalityAwarePartitioner implements KafkaMessagePartitioner {
+
+ // the locality info is constant for a given instance of Singer and will
+ // not change during the Singer process lifetime for a given host
+ protected static String rack;
+ private long nextRefreshTime;
+ private long refreshIntervalMs;
+ protected List localPartitions;
+
+ public LocalityAwarePartitioner(long refreshIntervalMs) {
+ this.refreshIntervalMs = refreshIntervalMs;
+ // do a singleton based implementation
+ if (rack == null) {
+ synchronized (LocalityAwarePartitioner.class) {
+ if (rack == null) {
+ // load rack information from SingerEnvironment
+ rack = SingerSettings.getEnvironment().getLocality();
+ }
+ }
+ }
+ }
+
+ protected LocalityAwarePartitioner(String rack, long refreshIntervalMs) {
+ LocalityAwarePartitioner.rack = rack;
+ this.refreshIntervalMs = refreshIntervalMs;
+ }
+
+ protected void updateNextRefreshTime() {
+ nextRefreshTime = System.currentTimeMillis() + refreshIntervalMs;
+ }
+
+ /**
+ * Retain partitions that are local to this agent
+ * @param partitions
+ * @return local partitions
+ */
+ protected List retainLocalPartitions(List partitions) {
+ return partitions.stream()
+ .filter(p -> p.leader() != null && p.leader().hasRack() && rack.equals(p.leader().rack()))
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Check if local partitions are available if yes then assign those to the localPartitions object
+ * else assigns all supplied partitions to the localPartitions object
+ * @param partitions
+ */
+ protected void checkAndAssignLocalPartitions(List partitions) {
+ List retainLocalPartition = retainLocalPartitions(partitions);
+ if (retainLocalPartition.isEmpty()) {
+ OpenTsdbMetricConverter.gauge(SingerMetrics.MISSING_LOCAL_PARTITIONS, 1, "locality=" + rack,
+ "host=" + KafkaWriter.HOSTNAME);
+ // reset to all partitions if no local partitions are available
+ localPartitions = partitions;
+ } else {
+ localPartitions = retainLocalPartition;
+ }
+ }
+
+ protected boolean isTimeToRefresh() {
+ return System.currentTimeMillis() > nextRefreshTime;
+ }
+
+ protected void setNextRefreshTime(long nextRefreshTime) {
+ this.nextRefreshTime = nextRefreshTime;
+ }
+
+ public List getLocalPartitions() {
+ return localPartitions;
+ }
+
+ public String getRack() {
+ return rack;
+ }
+
+ @VisibleForTesting
+ protected static void nullifyRack() {
+ rack = null;
+ }
+
+ public long getRefreshIntervalMs() {
+ return refreshIntervalMs;
+ }
+}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java
index 50188949..95b0e0e4 100644
--- a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java
+++ b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java
@@ -15,64 +15,32 @@
*/
package com.pinterest.singer.writer.partitioners;
-import java.io.IOException;
-import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.stream.Collectors;
import org.apache.kafka.common.PartitionInfo;
-import com.google.common.annotations.VisibleForTesting;
-import com.pinterest.singer.writer.KafkaMessagePartitioner;
-import com.pinterest.singer.writer.kafka.localityaware.EC2LocalityInfoProvider;
-
/**
- * Locality (aws ec2 az) aware Kafka partitioner. This partitioner attempts to
- * write to partitions whose leaders are in the same locality as the Kafka
- * leader for the partition. If no partitions are available in the same
- * locality, a random partition from all partitions will be picked.
+ * Locality aware partitioner random partitioner
*/
-public class LocalityAwareRandomPartitioner implements KafkaMessagePartitioner {
+public class LocalityAwareRandomPartitioner extends LocalityAwarePartitioner {
// refresh every 10 seconds
public static final long REFRESH_INTERVAL_MS = 10_000;
- // the locality info is constant for a given instance of Singer and will
- // not change during the Singer process lifetime for a given host
- private static String rack;
- private long nextRefreshTime;
- private long refreshIntervalMs;
- private List localPartitions;
private ThreadLocalRandom random = ThreadLocalRandom.current();
public LocalityAwareRandomPartitioner() {
- refreshIntervalMs = REFRESH_INTERVAL_MS;
- // do a singleton based implementation on LocalityInfo provider instantiation
- if (rack == null) {
- synchronized (LocalityAwareRandomPartitioner.class) {
- if (rack == null) {
- EC2LocalityInfoProvider provider = new EC2LocalityInfoProvider();
- try {
- provider.init(new HashMap<>());
- } catch (IllegalArgumentException | IOException e) {
- throw new RuntimeException("Failed to initialize EC2 locality provider", e);
- }
- rack = provider.getLocalityInfoForLocalhost();
- }
- }
- }
+ super(REFRESH_INTERVAL_MS);
}
protected LocalityAwareRandomPartitioner(String rack, long refreshIntervalMs) {
- LocalityAwareRandomPartitioner.rack = rack;
- this.refreshIntervalMs = refreshIntervalMs;
+ super(rack, refreshIntervalMs);
}
@Override
public int partition(Object messageKey, List partitions) {
if (localPartitions == null || isTimeToRefresh()) {
- List retainLocalPartition = retainLocalPartitions(partitions);
- localPartitions = retainLocalPartition.size() > 0 ? retainLocalPartition : partitions;
+ checkAndAssignLocalPartitions(partitions);
// set next refresh time
updateNextRefreshTime();
}
@@ -80,43 +48,4 @@ public int partition(Object messageKey, List partitions) {
return localPartitions.get(Math.abs(random.nextInt() % localPartitions.size())).partition();
}
- protected void updateNextRefreshTime() {
- nextRefreshTime = System.currentTimeMillis() + refreshIntervalMs;
- }
-
- /**
- * Retain partitions that are local to this agent
- * @param partitions
- * @return local partitions
- */
- protected List retainLocalPartitions(List partitions) {
- return partitions.stream()
- .filter(p -> p.leader() != null && p.leader().hasRack() && rack.equals(p.leader().rack()))
- .collect(Collectors.toList());
- }
-
- protected boolean isTimeToRefresh() {
- return System.currentTimeMillis() > nextRefreshTime;
- }
-
- protected void setNextRefreshTime(long nextRefreshTime) {
- this.nextRefreshTime = nextRefreshTime;
- }
-
- public List getLocalPartitions() {
- return localPartitions;
- }
-
- public String getRack() {
- return rack;
- }
-
- @VisibleForTesting
- protected static void nullifyRack() {
- rack = null;
- }
-
- public long getRefreshIntervalMs() {
- return refreshIntervalMs;
- }
}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareSinglePartitionPartitioner.java b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareSinglePartitionPartitioner.java
new file mode 100644
index 00000000..9c256a74
--- /dev/null
+++ b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareSinglePartitionPartitioner.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.writer.partitioners;
+
+import org.apache.kafka.common.PartitionInfo;
+
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Locality aware partitioner single partition partitioner. This partitioner can
+ * heuristically provide better compression ratios for data since all data from
+ * a given singer agent is written to one partition. For some datasets we have
+ * seen ~2x compression ratios.
+ */
+public class LocalityAwareSinglePartitionPartitioner extends LocalityAwarePartitioner {
+
+ // refresh every 30 seconds
+ public static final long REFRESH_INTERVAL_MS = 30_000;
+ private ThreadLocalRandom random = ThreadLocalRandom.current();
+ private int partitionId;
+
+ public LocalityAwareSinglePartitionPartitioner() {
+ super(REFRESH_INTERVAL_MS);
+ }
+
+ protected LocalityAwareSinglePartitionPartitioner(String rack, long refreshIntervalMs) {
+ super(rack, refreshIntervalMs);
+ }
+
+ @Override
+ public int partition(Object messageKey, List partitions) {
+ if (localPartitions == null || isTimeToRefresh()) {
+ checkAndAssignLocalPartitions(partitions);
+ // set next refresh time
+ updateNextRefreshTime();
+ // NOTE we are not doing a delta update here since PartitionInfo object doesn't
+ // have an overridden hashcode and equals implementation therefore the delta computation
+ // will be cumbersome
+ partitionId = localPartitions.get(Math.abs(random.nextInt() % localPartitions.size()))
+ .partition();
+ }
+ return partitionId;
+ }
+
+}
\ No newline at end of file
diff --git a/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java b/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java
index 9764eb5e..c18dad8d 100644
--- a/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java
+++ b/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java
@@ -18,7 +18,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -42,6 +44,7 @@
import com.pinterest.singer.thrift.configuration.PulsarProducerConfig;
import com.pinterest.singer.thrift.configuration.PulsarWriterConfig;
import com.pinterest.singer.utils.SingerUtils;
+import com.pinterest.singer.utils.StatsUtils;
/**
* Pulsar Writer is capable of writing to a Pulsar cluster.1 Pulsar Writer
@@ -75,10 +78,14 @@ public class PulsarWriter implements LogStreamWriter {
+ "topic_pulsar_throughput";
public static final String PULSAR_LATENCY = SingerMetrics.SINGER_WRITER
+ "max_pulsar_batch_write_latency";
+ private static Map> producerCache = new ConcurrentHashMap<>();
private LogStream logStream;
- private PulsarClient pulsarClient;
private Producer producer;
private String topic;
+
+ // pulsar topics have colons in the topic name, which is invalid in tsdb.
+ // Use this string to publish pulsar related metrics.
+ private String metricTag;
private String logName;
@SuppressWarnings("unchecked")
@@ -102,7 +109,6 @@ public void validateConfig(PulsarProducerConfig producerConfig) throws Configura
}
}
- @SuppressWarnings("unchecked")
public PulsarWriter init(LogStream logStream, PulsarWriterConfig writerConfig)
throws ConfigurationException, LogStreamWriterException {
PulsarProducerConfig producerConfig = writerConfig.getProducerConfig();
@@ -110,41 +116,63 @@ public PulsarWriter init(LogStream logStream, PulsarWriterConfig writerConfig)
this.logStream = logStream;
this.logName = logStream.getLogStreamName();
this.topic = writerConfig.getTopic();
+ this.metricTag = StatsUtils.pulsarTopicToMetricTag(this.topic);
+
+ if ((producer = producerCache.get(getProducerKey(producerConfig, topic))) == null) {
+ synchronized (producerCache) {
+ if ((producer = producerCache.get(getProducerKey(producerConfig, topic))) == null) {
+ producer = createProducer(topic, logName, producerConfig);
+ producerCache.put(getProducerKey(producerConfig, topic), producer);
+ LOG.info("Created new Pulsar producer with pulsarServiceUrl:" + producerConfig.getServiceUrl()
+ + " topic:" + topic + " logStream:" + logName);
+ }
+ }
+ }
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Producer createProducer(String topic, String logName, PulsarProducerConfig producerConfig) throws LogStreamWriterException {
+ PulsarClient pulsarClient = null;
try {
- pulsarClient = PulsarClient.builder().serviceUrl(producerConfig.getServiceUrl()).build();
+ pulsarClient = PulsarClient.builder().serviceUrl(producerConfig.getServiceUrl())
+ .build();
} catch (PulsarClientException e) {
throw new LogStreamWriterException(
- "Failed to build Pulsar client with service URL:" + producerConfig.getServiceUrl(), e);
+ "Failed to build Pulsar client with service URL:" + producerConfig.getServiceUrl(),
+ e);
}
- LOG.info("Created Pulsar client to connect to:" + producerConfig.getServiceUrl() + " topic:"
- + topic + " logStream:" + logName);
+ LOG.info("Created Pulsar client to connect to:" + producerConfig.getServiceUrl()
+ + " topic:" + topic + " logStream:" + logName);
try {
- producer = pulsarClient.newProducer()
+ Producer producer = pulsarClient.newProducer()
.compressionType(
CompressionType.valueOf(producerConfig.getCompressionType().toUpperCase()))
.messageRoutingMode(MessageRoutingMode.CustomPartition)
- .messageRouter(new PulsarMessageRouter(
- (Class) Class.forName(producerConfig.getPartitionerClass())))
- .sendTimeout(producerConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS).topic(topic)
- .blockIfQueueFull(true).batchingMaxMessages(producerConfig.getBatchingMaxMessages())
+ .messageRouter(new PulsarMessageRouter((Class) Class
+ .forName(producerConfig.getPartitionerClass())))
+ .sendTimeout(producerConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS)
+ .topic(topic).blockIfQueueFull(true)
+ .batchingMaxMessages(producerConfig.getBatchingMaxMessages())
.maxPendingMessages(producerConfig.getMaxPendingMessages())
.batchingMaxPublishDelay(producerConfig.getBatchingMaxPublishDelayInMilli(),
TimeUnit.MILLISECONDS)
.enableBatching(true).maxPendingMessagesAcrossPartitions(
producerConfig.getMaxPendingMessagesAcrossPartitions())
.create();
+ return producer;
} catch (PulsarClientException | InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
throw new LogStreamWriterException("Failed to initialize Pulsar writer", e);
}
- LOG.info("Created Pulsar producer with pulsarServiceUrl:" + producerConfig.getServiceUrl()
- + " topic:" + topic + " logStream:" + logName);
- return this;
+ }
+
+ public static String getProducerKey(PulsarProducerConfig producerConfig, String topic) {
+ return producerConfig.getServiceUrl() + "__" + topic;
}
@Override
public void close() throws IOException {
- // TODO Auto-generated method stub
producer.flush();
}
@@ -179,20 +207,19 @@ public void writeLogMessages(List messages) throws LogStreamWriterEx
future.get();
}
} catch (PulsarClientException | InterruptedException | ExecutionException e) {
- OpenTsdbMetricConverter.incr(PULSAR_WRITE_FAILURE, messages.size(), "topic=" + topic,
+ OpenTsdbMetricConverter.incr(PULSAR_WRITE_FAILURE, messages.size(), "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
throw new LogStreamWriterException("Message delivery failed", e);
}
-
- maxPulsarWriteLatency = System.currentTimeMillis() - maxPulsarWriteLatency;
- OpenTsdbMetricConverter.gauge(PULSAR_THROUGHPUT, bytesWritten, "topic=" + topic,
+ maxPulsarWriteLatency = System.currentTimeMillis() - maxPulsarWriteLatency;
+ OpenTsdbMetricConverter.gauge(PULSAR_THROUGHPUT, bytesWritten, "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
- OpenTsdbMetricConverter.gauge(PULSAR_LATENCY, maxPulsarWriteLatency, "topic=" + topic,
+ OpenTsdbMetricConverter.gauge(PULSAR_LATENCY, maxPulsarWriteLatency, "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
- OpenTsdbMetricConverter.incr(NUM_PULSAR_MESSAGES, messages.size(), "topic=" + topic,
+ OpenTsdbMetricConverter.incr(NUM_PULSAR_MESSAGES, messages.size(), "topic=" + metricTag,
"host=" + HOSTNAME, "logname=" + logName);
- LOG.info("Completed batch writes to Pulsar topic:" + topic + " size:" + messages.size());
+ LOG.info("Completed batch writes to Pulsar topic:" + metricTag + " size:" + messages.size());
}
diff --git a/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java b/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java
index acf6c279..22098217 100644
--- a/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java
+++ b/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java
@@ -15,12 +15,23 @@
*/
package com.pinterest.singer.common;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.contrib.java.lang.system.EnvironmentVariables;
+import com.pinterest.singer.environment.EnvVariableBasedEnvironmentProvider;
import com.pinterest.singer.monitor.DefaultLogMonitor;
+import com.pinterest.singer.thrift.configuration.SingerConfig;
public class TestSingerSettings {
+ @Rule
+ public final EnvironmentVariables environmentVariables = new EnvironmentVariables();
+
/**
* This test is to ensure that if there are any changes to the static method's
* interface and no corresponding changes made to the initializer, we can catch
@@ -36,4 +47,36 @@ public void testLogMonitorMethodInstance() throws ClassNotFoundException, NoSuch
SingerSettings.getLogMonitorStaticInstanceMethod(DefaultLogMonitor.class.getName());
}
+ @Test
+ public void testEnvironmentLoader() {
+ SingerConfig config = new SingerConfig();
+ SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config);
+ assertNotNull(SingerSettings.getEnvironment());
+ // check by default environment provider class is null
+ assertNull(config.getEnvironmentProviderClass());
+ assertEquals(null, SingerSettings.getEnvironment().getDeploymentStage());
+
+ // bad class name should cause an exception to be logged but MUST not throw an
+ // error and environment should still be correct
+ config.setEnvironmentProviderClass("com.pinterest.xyz");
+ SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config);
+ assertNotNull(SingerSettings.getEnvironment());
+ assertEquals(null, SingerSettings.getEnvironment().getDeploymentStage());
+
+ environmentVariables.set("DEPLOYMENT_STAGE", "CANARY");
+ environmentVariables.set("LOCALITY", "us-east-1a");
+ config.setEnvironmentProviderClass(EnvVariableBasedEnvironmentProvider.class.getName());
+ SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config);
+ assertNotNull(SingerSettings.getEnvironment());
+ assertEquals("CANARY", SingerSettings.getEnvironment().getDeploymentStage());
+ assertEquals("us-east-1a", SingerSettings.getEnvironment().getLocality());
+
+ environmentVariables.set("DEPLOYMENT_STAGE", "DEV");
+ config.setEnvironmentProviderClass(EnvVariableBasedEnvironmentProvider.class.getName());
+ SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config);
+ assertNotNull(SingerSettings.getEnvironment());
+ assertEquals("DEV", SingerSettings.getEnvironment().getDeploymentStage());
+ assertEquals("us-east-1a", SingerSettings.getEnvironment().getLocality());
+ }
+
}
diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java
index 6e4cd1c4..0888daa0 100644
--- a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java
+++ b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java
@@ -36,7 +36,7 @@
public class TestLogConfigUtils {
@Test
- public void testRealpinTTLParsing() throws ConfigurationException {
+ public void testRealPinTTLParsing() throws ConfigurationException {
String CONFIG = "" + "topic=test\n" + "objectType=mobile_perf_log\n"
+ "serverSetPath=/xyz/realpin/prod";
PropertiesConfiguration config = new PropertiesConfiguration();
@@ -103,12 +103,11 @@ public void testKafkaProducerConfigCatchBadPartitioner() throws ConfigurationExc
}
for (String type : Arrays.asList(
- "com.pinterest.singer.writer.partitioners.Crc32ByteArrayPartitioner",
- "com.pinterest.singer.writer.partitioners.DefaultPartitioner",
- "com.pinterest.singer.writer.partitioners.SimpleRoundRobinPartitioner",
+ "com.pinterest.singer.writer.partitioners.Crc32ByteArrayPartitioner",
+ "com.pinterest.singer.writer.partitioners.DefaultPartitioner",
+ "com.pinterest.singer.writer.partitioners.SimpleRoundRobinPartitioner",
"com.pinterest.singer.writer.partitioners.SinglePartitionPartitioner",
- "com.pinterest.singer.writer.partitioners.LocalityAwareRandomPartitioner"
- )) {
+ "com.pinterest.singer.writer.partitioners.LocalityAwareRandomPartitioner")) {
map.put("partitioner.class", type);
try {
LogConfigUtils.parseProducerConfig(config);
@@ -118,6 +117,50 @@ public void testKafkaProducerConfigCatchBadPartitioner() throws ConfigurationExc
}
}
+ @Test
+ public void testBaseSingerConfig() throws ConfigurationException {
+ Map map = new HashMap<>();
+ map.put("monitor.monitorIntervalInSecs", "10");
+ map.put("statsPusherHostPort", "localhost:1900");
+ AbstractConfiguration config = new MapConfiguration(map);
+ // defaults should be correct
+ LogConfigUtils.parseCommonSingerConfigHeader(config);
+
+ // must throw configuration exception
+ map.put("statsPusherClass", "com.pinterest.singer.monitor.DefaultLogMonitor");
+ try {
+ LogConfigUtils.parseCommonSingerConfigHeader(config);
+ fail(
+ "Must fail since the supplied class is not a valid StatsPusher class but it is a valid class");
+ } catch (Exception e) {
+ }
+
+ map.put("statsPusherClass", "com.pinterest.singer.monitor.Xyz");
+ try {
+ LogConfigUtils.parseCommonSingerConfigHeader(config);
+ fail(
+ "Must fail since the supplied class is not a valid class");
+ } catch (Exception e) {
+ }
+ map.remove("statsPusherClass");
+ // cleanup after stats test
+
+ map.put("environmentProviderClass", "com.pinterest.singer.monitor.Xyz");
+ try {
+ LogConfigUtils.parseCommonSingerConfigHeader(config);
+ fail(
+ "Must fail since the supplied class is not a valid class");
+ } catch (Exception e) {
+ }
+ map.put("environmentProviderClass", "com.pinterest.singer.monitor.DefaultLogMonitor");
+ try {
+ LogConfigUtils.parseCommonSingerConfigHeader(config);
+ fail(
+ "Must fail since the supplied class is not a valid class");
+ } catch (Exception e) {
+ }
+ }
+
@Test
public void testKafkaProducerConfigAck() throws ConfigurationException {
Map map = new HashMap<>();
diff --git a/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java b/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java
index e586419a..b0af9f01 100644
--- a/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java
+++ b/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java
@@ -2,6 +2,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
@@ -10,11 +11,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.pulsar.shade.org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -23,7 +28,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
+import com.pinterest.singer.common.SingerSettings;
import com.pinterest.singer.thrift.LogMessage;
+import com.pinterest.singer.thrift.configuration.KafkaProducerConfig;
+import com.pinterest.singer.thrift.configuration.SingerConfig;
@RunWith(MockitoJUnitRunner.class)
public class TestKafkaWriter {
@@ -35,8 +43,12 @@ public class TestKafkaWriter {
@Test
public void testPartitioningParityCRC() throws Exception {
KafkaMessagePartitioner partitioner = new Crc32ByteArrayPartitioner();
+ KafkaProducerConfig config = new KafkaProducerConfig();
+ SingerSettings.setSingerConfig(new SingerConfig());
+ KafkaProducerManager.injectTestProducer(config, producer);
// default value for skip noleader partition is false
- KafkaWriter writer = new KafkaWriter(partitioner, false);
+ KafkaWriter writer = new KafkaWriter(config, partitioner, "topicx", false,
+ Executors.newCachedThreadPool());
List partitions = ImmutableList.copyOf(Arrays.asList(
new PartitionInfo("topicx", 1, new Node(2, "broker2", 9092, "us-east-1b"), null, null),
@@ -51,6 +63,8 @@ public void testPartitioningParityCRC() throws Exception {
new PartitionInfo("topicx", 9, new Node(5, "broker5", 9092, "us-east-1b"), null, null),
new PartitionInfo("topicx", 10, new Node(1, "broker1", 9092, "us-east-1a"), null, null)));
when(producer.partitionsFor("topicx")).thenReturn(partitions);
+ when(producer.send(any())).thenReturn(ConcurrentUtils.constantFuture(
+ new RecordMetadata(new TopicPartition("topicx", 0), 0L, 0L, 0L, 0L, 0, 0)));
Map> msgPartitionMap = new HashMap<>();
HashFunction crc32 = Hashing.crc32();
@@ -70,8 +84,8 @@ public void testPartitioningParityCRC() throws Exception {
list.add(logMessage);
}
- List>> messageCollation = writer.messageCollation(producer,
- "topicx", logMessages);
+ List>> messageCollation = writer
+ .messageCollation(partitions, "topicx", logMessages);
for (int i = 0; i < messageCollation.size(); i++) {
List> writerOutput = messageCollation.get(i);
List originalData = msgPartitionMap.get(i);
@@ -84,6 +98,9 @@ public void testPartitioningParityCRC() throws Exception {
assertTrue(Arrays.equals(originalData.get(j).getKey(), writerOutput.get(j).key()));
}
}
+
+ // validate if writes are throwing any error
+ writer.writeLogMessages(logMessages);
writer.close();
}
diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml
index e0415d21..af5229b1 100644
--- a/thrift-logger/pom.xml
+++ b/thrift-logger/pom.xml
@@ -4,7 +4,7 @@
com.pinterest.singer
singer-package
- 0.7.3.21
+ 0.7.3.25
../pom.xml
thrift-logger
diff --git a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java
index d603ce8f..f17de032 100644
--- a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java
+++ b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java
@@ -47,9 +47,11 @@
* The "addMetric" static function is provided to make it easier to add Ostrich metric names that
* contain tags.
*/
-public class OpenTsdbMetricConverter {
+public class OpenTsdbMetricConverter {
+
// According to http://opentsdb.net/docs/build/html/user_guide/writing.html
public static final String VALID_OPENSTD_STAT_TAG_PATTERN = "[a-zA-Z0-9_./-]+";
+ private static boolean enableGranularMetrics;
private final String prefix;
private final String defaultTags;
@@ -156,6 +158,13 @@ public static void addMetric(String name, int value) {
public static void addMetric(String name, int value, String... tags) {
Stats.addMetric(nameMetric(name, tags), value);
}
+
+ public static void addGranularMetric(String name, int value, String... tags) {
+ if (!enableGranularMetrics) {
+ return;
+ }
+ Stats.addMetric(nameMetric(name, tags), value);
+ }
public static void incr(String name) {
Stats.incr(name);
@@ -176,4 +185,26 @@ public static void gauge(String name, double value) {
public static void gauge(String name, double value, String... tags) {
Stats.setGauge(nameMetric(name, tags), value);
}
-}
+
+ public static void gaugeGranular(String name, double value, String... tags) {
+ if (!enableGranularMetrics) {
+ return;
+ }
+ Stats.setGauge(nameMetric(name, tags), value);
+ }
+
+ public static void incrGranular(String name, int i, String... tags) {
+ if (!enableGranularMetrics) {
+ return;
+ }
+ Stats.incr(nameMetric(name, tags), i);
+ }
+
+ public static void setEnableGranularMetrics(boolean granularMetrics) {
+ enableGranularMetrics = granularMetrics;
+ }
+
+ public static boolean isEnableGranularMetrics() {
+ return enableGranularMetrics;
+ }
+}
\ No newline at end of file
diff --git a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java
index d5caa032..15088c26 100644
--- a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java
+++ b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java
@@ -50,6 +50,7 @@ public void addHandler(String path, CustomHttpHandler handler) {
this.customHttpHandlerMap.put(path, handler);
}
+ @SuppressWarnings("restriction")
public void start() {
try {
Properties properties = new Properties();
@@ -62,7 +63,6 @@ public void start() {
Duration[] defaultLatchIntervals = {Duration.apply(1, TimeUnit.MINUTES)};
Iterator durationIterator = Arrays.asList(defaultLatchIntervals).iterator();
- @SuppressWarnings("deprecation")
AdminServiceFactory adminServiceFactory = new AdminServiceFactory(
this.port,
20,
diff --git a/thrift-logger/src/main/java/com/pinterest/singer/metrics/StatsPusher.java b/thrift-logger/src/main/java/com/pinterest/singer/metrics/StatsPusher.java
new file mode 100644
index 00000000..ba3c5b1a
--- /dev/null
+++ b/thrift-logger/src/main/java/com/pinterest/singer/metrics/StatsPusher.java
@@ -0,0 +1,80 @@
+/**
+ * Copyright 2019 Pinterest, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.pinterest.singer.metrics;
+
+import com.twitter.ostrich.stats.Stats$;
+import com.twitter.ostrich.stats.StatsListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Provides an abstraction to plugin custom metrics destination for Singer
+ * metrics. To implement your own {@link StatsPusher} you will need to
+ * override the sendMetrics method which is invoked at the configured frequency.
+ *
+ * {@link StatsPusher} is designed to run as an independent Daemon Thread.
+ */
+public abstract class StatsPusher extends Thread {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatsPusher.class);
+ protected StatsListener statsListener;
+ protected long pollMillis;
+ protected String sourceHostname;
+ protected String metricsPrefix;
+ protected String destinationHost;
+ protected int destinationPort;
+
+ public StatsPusher() {
+ this.statsListener = new StatsListener(Stats$.MODULE$);
+ setDaemon(true);
+ }
+
+ public void configure(String sourceHostname,
+ String metricsPrefix,
+ String destinationHost,
+ int destinationPort,
+ long pollMillis) throws IOException {
+ this.sourceHostname = sourceHostname;
+ this.metricsPrefix = metricsPrefix;
+ this.destinationHost = destinationHost;
+ this.destinationPort = destinationPort;
+ this.pollMillis = pollMillis;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Ignore the first interval, since we don't know when stats started being
+ // recorded,
+ // and we want to make sure all intervals are roughly the same length.
+ statsListener.get();
+ Thread.sleep(pollMillis);
+ while (!Thread.currentThread().isInterrupted()) {
+ long elapsedTimeMillis = sendMetrics(true);
+ Thread.sleep(Math.max(0, pollMillis - elapsedTimeMillis));
+ }
+ } catch (InterruptedException ex) {
+ LOG.info("OpenTsdbMetricsPusher thread interrupted, exiting");
+ } catch (Exception ex) {
+ LOG.error("Unexpected error in OpenTSDBMetricsPusher, exiting", ex);
+ }
+ }
+
+ public abstract long sendMetrics(boolean retryOnFailure) throws InterruptedException, IOException;
+
+}
\ No newline at end of file
diff --git a/thrift-logger/src/main/thrift/config.thrift b/thrift-logger/src/main/thrift/config.thrift
index 5ca83543..b1d9b47d 100644
--- a/thrift-logger/src/main/thrift/config.thrift
+++ b/thrift-logger/src/main/thrift/config.thrift
@@ -127,7 +127,7 @@ struct DummyWriteConfig {
enum RealpinObjectType {
MOBILE_PERF_LOG = 0
- PROMOTIONS_INSERTION = 1
+ PIN_PROMOTIONS_INSERTION = 1
}
struct RealpinWriterConfig {
@@ -146,7 +146,7 @@ struct PulsarProducerConfig {
2: required string serviceUrl;
3: optional string compressionType = "NONE";
4: optional i32 writeTimeoutInSeconds = 60;
- 5: optional string partitionerClass = "com.pinterest.singer.writer.partitioners.DefaultPartitioner";
+ 5: optional string partitionerClass = "com.pinterest.singer.writer.pulsar.DefaultPartitioner";
6: optional i32 batchingMaxMessages = 100;
7: optional i32 maxPendingMessages = 2000;
8: optional i32 batchingMaxPublishDelayInMilli = 10;
@@ -243,12 +243,12 @@ struct SingerConfig {
/**
* global thread pool size. Parsed directly from the singer config file.
*/
- 1: required i32 threadPoolSize;
+ 1: required i32 threadPoolSize = 20;
/**
* port number of ostrich. Parsed directly from the singer config file.
*/
- 2: required i32 ostrichPort;
+ 2: required i32 ostrichPort = 2047;
/**
* log monitor parameters. Parsed directly from the singer config file.
@@ -266,7 +266,7 @@ struct SingerConfig {
5: optional i32 logConfigPollIntervalSecs = 10;
/**
- * stats pusher host and port
+ * Stats pusher host and port
*/
6: optional string statsPusherHostPort;
@@ -320,4 +320,14 @@ struct SingerConfig {
*/
16: optional bool runCommandServer = false;
+ /**
+ * Class name for stats pusher
+ */
+ 17: optional string statsPusherClass = "com.pinterest.singer.metrics.OpenTsdbStatsPusher";
+
+ /**
+ * Singer Environment provider class
+ */
+ 18: optional string environmentProviderClass;
+
}
\ No newline at end of file
diff --git a/thrift-logger/src/main/thrift/thrift_message.thrift b/thrift-logger/src/main/thrift/thrift_message.thrift
index cae98ba9..5023dcf8 100644
--- a/thrift-logger/src/main/thrift/thrift_message.thrift
+++ b/thrift-logger/src/main/thrift/thrift_message.thrift
@@ -15,6 +15,6 @@ struct Event {
// Note that we call this time instead of timestamp since timestamp is reserved in Hive.
1: i64 time,
2: i32 eventType,
- 3: i64 userId, // user id
- 4: i64 objectId, // field to store generic object info
+ 3: i64 userId, // the user performing the action
+ 4: i64 objectId, // can be a user, pin, board, comment
}
\ No newline at end of file