From 7282d5c7dfa24337d628768855bf633445296c46 Mon Sep 17 00:00:00 2001 From: Yu Yang Date: Wed, 21 Aug 2019 22:16:50 -0700 Subject: [PATCH] update singer with 0.7.3.25 changes (#3) --- pom.xml | 2 +- singer/deb.version | 2 +- singer/pom.xml | 4 +- .../java/com/pinterest/singer/SingerMain.java | 35 +++--- .../singer/common/SingerMetrics.java | 5 + .../singer/common/SingerSettings.java | 37 ++++++ .../EnvVariableBasedEnvironmentProvider.java | 42 +++++++ .../singer/environment/Environment.java | 79 ++++++++++++ .../environment/EnvironmentProvider.java | 44 +++++++ .../singer/kubernetes/KubeService.java | 32 +---- .../singer/monitor/DefaultLogMonitor.java | 1 - .../singer/tools/LogConfigCheckTool.java | 59 ++++++++- .../singer/utils/LogConfigUtils.java | 54 ++++++-- .../singer/utils/PartitionComparator.java | 32 +++++ .../pinterest/singer/utils/SingerUtils.java | 26 ++++ .../pinterest/singer/utils/StatsUtils.java | 4 + .../singer/writer/KafkaProducerManager.java | 5 + .../pinterest/singer/writer/KafkaWriter.java | 52 +++++--- .../singer/writer/KafkaWritingTask.java | 32 +++-- .../LocalityAwarePartitioner.java | 117 ++++++++++++++++++ .../LocalityAwareRandomPartitioner.java | 81 +----------- ...calityAwareSinglePartitionPartitioner.java | 59 +++++++++ .../singer/writer/pulsar/PulsarWriter.java | 71 +++++++---- .../singer/common/TestSingerSettings.java | 43 +++++++ .../singer/utils/TestLogConfigUtils.java | 55 +++++++- .../singer/writer/TestKafkaWriter.java | 23 +++- thrift-logger/pom.xml | 2 +- .../metrics/OpenTsdbMetricConverter.java | 35 +++++- .../singer/metrics/OstrichAdminService.java | 2 +- .../pinterest/singer/metrics/StatsPusher.java | 80 ++++++++++++ thrift-logger/src/main/thrift/config.thrift | 20 ++- .../src/main/thrift/thrift_message.thrift | 4 +- 32 files changed, 930 insertions(+), 209 deletions(-) create mode 100644 singer/src/main/java/com/pinterest/singer/environment/EnvVariableBasedEnvironmentProvider.java create mode 100644 singer/src/main/java/com/pinterest/singer/environment/Environment.java create mode 100644 singer/src/main/java/com/pinterest/singer/environment/EnvironmentProvider.java create mode 100644 singer/src/main/java/com/pinterest/singer/utils/PartitionComparator.java create mode 100644 singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwarePartitioner.java create mode 100644 singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareSinglePartitionPartitioner.java create mode 100644 thrift-logger/src/main/java/com/pinterest/singer/metrics/StatsPusher.java diff --git a/pom.xml b/pom.xml index 4467edbd..ad0a496e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.pinterest.singer singer-package - 0.7.3.21 + 0.7.3.25 pom Singer Logging Agent modules 2013 diff --git a/singer/deb.version b/singer/deb.version index b4eb0c59..fa388981 100644 --- a/singer/deb.version +++ b/singer/deb.version @@ -1 +1 @@ -0.7.3.16 \ No newline at end of file +0.7.3.25 \ No newline at end of file diff --git a/singer/pom.xml b/singer/pom.xml index 3534d6a8..94366b89 100644 --- a/singer/pom.xml +++ b/singer/pom.xml @@ -7,7 +7,7 @@ com.pinterest.singer singer-package - 0.7.3.21 + 0.7.3.25 ../pom.xml @@ -200,7 +200,7 @@ org.apache.pulsar pulsar-client - 2.2.1 + 2.3.2 org.mockito diff --git a/singer/src/main/java/com/pinterest/singer/SingerMain.java b/singer/src/main/java/com/pinterest/singer/SingerMain.java index e9721451..5c3ced91 100644 --- a/singer/src/main/java/com/pinterest/singer/SingerMain.java +++ b/singer/src/main/java/com/pinterest/singer/SingerMain.java @@ -20,8 +20,8 @@ import com.pinterest.singer.config.DirectorySingerConfigurator; import com.pinterest.singer.config.SingerConfigurator; import com.pinterest.singer.heartbeat.HeartbeatGenerator; +import com.pinterest.singer.metrics.StatsPusher; import com.pinterest.singer.metrics.OpenTsdbMetricConverter; -import com.pinterest.singer.metrics.OpenTsdbMetricsPusher; import com.pinterest.singer.metrics.OstrichAdminService; import com.pinterest.singer.thrift.configuration.SingerConfig; import com.pinterest.singer.utils.SingerUtils; @@ -34,10 +34,11 @@ public final class SingerMain { + private static final String SINGER_METRICS_PREFIX = "singer"; private static final Logger LOG = LoggerFactory.getLogger(SingerMain.class); - private static final int TSDB_METRICS_PUSH_INTERVAL_IN_MILLISECONDS = 10 * 1000; + private static final int STATS_PUSH_INTERVAL_IN_MILLISECONDS = 10 * 1000; protected static final String hostName = SingerUtils.getHostname(); - private static OpenTsdbMetricsPusher metricsPusher = null; + private static StatsPusher statsPusher = null; private static String singerPath = ""; static class SingerCleanupThread extends Thread { @@ -70,8 +71,8 @@ public void run() { try { OpenTsdbMetricConverter.incr("singer.shutdown", 1); - if (metricsPusher!= null) { - metricsPusher.sendMetrics(false); + if (statsPusher!= null) { + statsPusher.sendMetrics(false); } else { LOG.error("metricsPusher was not initialized properly."); } @@ -81,24 +82,28 @@ public void run() { } } - static void startOstrichService(SingerConfig singerConfig) { + public static void startOstrichService(SingerConfig singerConfig) { // do not start ostrich if Ostrich server is disabled if (System.getenv(SingerMetrics.DISABLE_SINGER_OSTRICH) == null) { OstrichAdminService ostrichService = new OstrichAdminService(singerConfig.getOstrichPort()); ostrichService.start(); } + // enable high granularity metrics we are running in canary if (singerConfig.isSetStatsPusherHostPort()) { - LOG.info("Starting the OpenTsdb metrics pusher"); + LOG.info("Starting the stats pusher"); try { + @SuppressWarnings("unchecked") + Class pusherClass = (Class) Class.forName(singerConfig.getStatsPusherClass()); + statsPusher = pusherClass.newInstance(); HostAndPort pushHostPort = HostAndPort.fromString(singerConfig.getStatsPusherHostPort()); - metricsPusher = new OpenTsdbMetricsPusher( - pushHostPort.getHost(), - pushHostPort.getPort(), - // TODO: make the following 'prefix' and 'interval' configurable. - new OpenTsdbMetricConverter("singer", hostName), - TSDB_METRICS_PUSH_INTERVAL_IN_MILLISECONDS); - metricsPusher.start(); - LOG.info("OpenTsdb metrics pusher started!"); + // allows hostname to be overridden based on environment + statsPusher.configure(SingerSettings.getEnvironment().getHostname() + , SINGER_METRICS_PREFIX + , pushHostPort.getHost() + , pushHostPort.getPort() + , STATS_PUSH_INTERVAL_IN_MILLISECONDS); + statsPusher.start(); + LOG.info("Stats pusher started!"); } catch (Throwable t) { // pusher fail is OK, do LOG.error("Exception when starting stats pusher: ", t); diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java index 55772730..1d569855 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerMetrics.java @@ -66,6 +66,9 @@ public class SingerMetrics { public static final String NUM_KAFKA_MESSAGES = SINGER_WRITER + "num_kafka_messages_delivery_success"; public static final String OVERSIZED_MESSAGES = SINGER_WRITER + "num_oversized_messages"; public static final String WRITE_FAILURE = SINGER_WRITER + "kafka_write_failure"; + public static final String BROKER_WRITE_FAILURE = SINGER_WRITER + "broker_write_failure"; + public static final String BROKER_WRITE_SUCCESS = SINGER_WRITER + "broker_write_success"; + public static final String BROKER_WRITE_LATENCY = SINGER_WRITER + "broker_write_latency"; public static final String WRITER_BATCH_SIZE = SINGER_WRITER + "message_batch_size"; public static final String WRITER_SSL_EXCEPTION = SINGER_WRITER + "ssl_exception"; public static final String KAFKA_THROUGHPUT = SINGER_WRITER + "topic_kafka_throughput"; @@ -93,5 +96,7 @@ public class SingerMetrics { public static final String SINGER_CONFIGURATOR_CONFIG_ERRORS_UNKNOWN = "singer.configurator.unexpected_config_errors"; public static final String LOCALITY_MISSING = "singer.locality.missing"; public static final String DISABLE_SINGER_OSTRICH = "DISABLE_OSTRICH_METRICS"; + public static final String LEADER_INFO_EXCEPTION = SINGER_WRITER + "leader_info_exception"; + public static final String MISSING_LOCAL_PARTITIONS = "singer.locality.missing_local_partitions"; } \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java b/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java index 7977f0c0..a2585a4f 100644 --- a/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java +++ b/singer/src/main/java/com/pinterest/singer/common/SingerSettings.java @@ -17,6 +17,8 @@ import com.pinterest.singer.common.errors.SingerLogException; import com.pinterest.singer.config.SingerDirectoryWatcher; +import com.pinterest.singer.environment.Environment; +import com.pinterest.singer.environment.EnvironmentProvider; import com.pinterest.singer.heartbeat.HeartbeatGenerator; import com.pinterest.singer.kubernetes.KubeService; import com.pinterest.singer.monitor.FileSystemMonitor; @@ -24,6 +26,7 @@ import com.pinterest.singer.thrift.configuration.SingerConfig; import com.pinterest.singer.thrift.configuration.SingerLogConfig; import com.twitter.ostrich.stats.Stats; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,6 +89,9 @@ public final class SingerSettings { // initialized here so that unit tests aren't required to call the initialize method below private static SortedMap> logConfigMap = new TreeMap<>(); + + //environment is production by default + private static Environment environment = new Environment(); private SingerSettings() { } @@ -98,6 +104,9 @@ public static void initialize(SingerConfig config) NoSuchMethodException, SingerLogException { setSingerConfig(config); + + loadAndSetSingerEnvironmentIfConfigured(config); + LOG.warn("Singer environment has been configured to:" + environment); SingerSettings.logProcessorExecutor = Executors.newScheduledThreadPool( singerConfig.getThreadPoolSize(), @@ -224,6 +233,25 @@ public static synchronized FileSystemMonitor getOrCreateFileSystemMonitor(String return mon; } + protected static void loadAndSetSingerEnvironmentIfConfigured(SingerConfig config) { + if (config.getEnvironmentProviderClass() != null) { + try { + String environmentProviderClass = config.getEnvironmentProviderClass(); + @SuppressWarnings("unchecked") + Class providerClass = (Class) + Class.forName(environmentProviderClass); + EnvironmentProvider provider = providerClass.newInstance(); + Environment env = provider.getEnvironment(); + if (env != null) { + environment = env; + return; + } + } catch (Exception e) { + LOG.error("Failed to load Singer Environment configuration", e); + } + } + } + public static Map getFsMonitorMap() { return fsMonitorMap; } @@ -282,4 +310,13 @@ public static ScheduledExecutorService getLogProcessorExecutor() { public static void initializeConfigMap(SingerConfig config) { logConfigMap = loadLogConfigMap(config); } + + public static Environment getEnvironment() { + return environment; + } + + @VisibleForTesting + public static void setEnvironment(Environment environment) { + SingerSettings.environment = environment; + } } diff --git a/singer/src/main/java/com/pinterest/singer/environment/EnvVariableBasedEnvironmentProvider.java b/singer/src/main/java/com/pinterest/singer/environment/EnvVariableBasedEnvironmentProvider.java new file mode 100644 index 00000000..07031711 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/environment/EnvVariableBasedEnvironmentProvider.java @@ -0,0 +1,42 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.environment; + +/** + * Uses environment variables to specify the provider. This class defaults to + * Environment.PROD unless the environment variable is explicitly configured. + */ +public class EnvVariableBasedEnvironmentProvider extends EnvironmentProvider { + + private static final String DEPLOYMENT_STAGE = "DEPLOYMENT_STAGE"; + private static final String LOCALITY = "LOCALITY"; + + @Override + protected String getLocality() { + String locality = System.getenv(LOCALITY); + if (locality != null) { + return locality; + } else { + return Environment.LOCALITY_NOT_AVAILABLE; + } + } + + @Override + protected String getDeploymentStage () { + return System.getenv(DEPLOYMENT_STAGE); + } + +} \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/environment/Environment.java b/singer/src/main/java/com/pinterest/singer/environment/Environment.java new file mode 100644 index 00000000..7690a22e --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/environment/Environment.java @@ -0,0 +1,79 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.environment; + +import com.pinterest.singer.utils.SingerUtils; + +/** + * This indicates what environment is Singer running in. + * + * Singer Environment indicator can subsequently be used by any component that + * needs to switch functionality based on the environment it is running in. + * + * NOTE: all variable MUST have default initialized in case the loader doesn't + * work, all getters must return a NON-NULL value unless NULLs are expected. + */ +public class Environment { + + public static final String LOCALITY_NOT_AVAILABLE = "n/a"; + public static final String DEFAULT_HOSTNAME = SingerUtils.getHostname(); + private String locality = LOCALITY_NOT_AVAILABLE; + private String deploymentStage; + private String hostname = DEFAULT_HOSTNAME; + + /** + * @return the locality + */ + public String getLocality() { + return locality; + } + + /** + * @param locality the locality to set + */ + public void setLocality(String locality) { + this.locality = locality; + } + + /** + * @return the deploymentStage + */ + public String getDeploymentStage() { + return deploymentStage; + } + + /** + * @param deploymentStage the deploymentStage to set + */ + public void setDeploymentStage(String deploymentStage) { + this.deploymentStage = deploymentStage; + } + + /** + * @return the hostname + */ + public String getHostname() { + return hostname; + } + + /** + * @param hostname the hostname to set + */ + public void setHostname(String hostname) { + this.hostname = hostname; + } + +} \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/environment/EnvironmentProvider.java b/singer/src/main/java/com/pinterest/singer/environment/EnvironmentProvider.java new file mode 100644 index 00000000..d8e83f3b --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/environment/EnvironmentProvider.java @@ -0,0 +1,44 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.environment; + +/** + * Environment provider for Singer, defaults to production. This class can be + * extended to change the behavior of environment provider. + */ +public abstract class EnvironmentProvider { + + protected Environment environment = new Environment(); + + public EnvironmentProvider() { + environment.setDeploymentStage(getDeploymentStage()); + environment.setLocality(getLocality()); + environment.setHostname(getHostname()); + } + + protected abstract String getLocality(); + + protected abstract String getDeploymentStage(); + + protected String getHostname() { + return Environment.DEFAULT_HOSTNAME; + } + + public Environment getEnvironment() { + return environment; + } + +} \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java index 491f4dd2..e9d20d99 100644 --- a/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java +++ b/singer/src/main/java/com/pinterest/singer/kubernetes/KubeService.java @@ -21,17 +21,11 @@ import java.nio.file.Path; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; -import java.security.KeyManagementException; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; import java.util.Date; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -265,7 +259,7 @@ public void updatePodWatchers(String podUid, boolean isDelete) { */ public Set fetchPodUidsFromMetadata() throws IOException { LOG.debug("Attempting to make pod md request"); - String response = makeGetRequest(PODS_MD_URL); + String response = SingerUtils.makeGetRequest(PODS_MD_URL); LOG.debug("Received pod md response:" + response); Gson gson = new Gson(); JsonObject obj = gson.fromJson(response, JsonObject.class); @@ -283,30 +277,6 @@ public Set fetchPodUidsFromMetadata() throws IOException { return podUids; } - /** - * Make an HTTP Get request on the supplied URI and return the response entity - * as {@link String} - * - * @param uri - * @return - * @throws IOException - */ - private String makeGetRequest(String uri) throws IOException { - HttpGet getPodRequest = new HttpGet(uri); - try { - CloseableHttpResponse response = SingerUtils.makeRequest(getPodRequest); - if (response.getStatusLine().getStatusCode() != 200) { - LOG.warn("Non-200 status code(" + response.getStatusLine().getStatusCode() + ") reason:" - + response.getStatusLine().getReasonPhrase()); - } - String entity = EntityUtils.toString(response.getEntity()); - response.close(); - return entity; - } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) { - throw new IOException(e); - } - } - /** * Return the poll frequency in milliseconds * diff --git a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java index 6b143a5e..8b5375fb 100644 --- a/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java +++ b/singer/src/main/java/com/pinterest/singer/monitor/DefaultLogMonitor.java @@ -533,7 +533,6 @@ public void reportStats() { "log=" + log.replace(":", "/")), maxLatency); } } - Stats.setGauge(SingerMetrics.CURRENT_PROCESSOR_LATENCY, overallMaxLatency); for (String log : perLogStuck.keySet()) { OpenTsdbMetricConverter .incr("singer.processor.stuck", perLogStuck.get(log), "log=" + log, "host=" + HOSTNAME); diff --git a/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java b/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java index 1025f29e..b11205c0 100644 --- a/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java +++ b/singer/src/main/java/com/pinterest/singer/tools/LogConfigCheckTool.java @@ -17,8 +17,21 @@ import java.io.File; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import org.apache.kafka.clients.admin.AdminClient; import com.pinterest.singer.config.DirectorySingerConfigurator; +import com.pinterest.singer.thrift.configuration.KafkaProducerConfig; +import com.pinterest.singer.thrift.configuration.KafkaWriterConfig; +import com.pinterest.singer.thrift.configuration.SingerLogConfig; +import com.pinterest.singer.thrift.configuration.WriterType; import com.pinterest.singer.utils.LogConfigUtils; /** @@ -26,7 +39,7 @@ */ public class LogConfigCheckTool { - public static void main(String[] args) { + public static void main(String[] args) throws InterruptedException, ExecutionException { if (args.length < 1) { System.err.println("Must specify the configuration directory"); System.exit(-1); @@ -47,17 +60,59 @@ public static void main(String[] args) { } } + Map> topicMap = new HashMap<>(); File[] listFiles = confDir.listFiles(); Arrays.sort(listFiles); for (File file : listFiles) { try { - LogConfigUtils.parseLogConfigFromFile(file); + SingerLogConfig slc = LogConfigUtils.parseLogConfigFromFile(file); + KafkaWriterConfig kafkaWriterConfig = slc.getLogStreamWriterConfig().getKafkaWriterConfig(); + if ((slc.getLogStreamWriterConfig().getType() == WriterType.KAFKA + || slc.getLogStreamWriterConfig().getType() == WriterType.KAFKA08)) { + if (kafkaWriterConfig == null) { + System.err + .println("Kafka writer specified with missing Kafka config:" + file.getName()); + System.exit(-1); + } else { + KafkaProducerConfig producerConfig = kafkaWriterConfig.getProducerConfig(); + if (producerConfig.getBrokerLists().isEmpty()) { + System.err + .println("No brokers in the kafka configuration specified:" + file.getName()); + System.exit(-1); + } + String broker = producerConfig.getBrokerLists().get(0); + Set topics = topicMap.get(broker); + if (topics == null) { + topics = new HashSet<>(); + topicMap.put(broker, topics); + } + topics.add(kafkaWriterConfig.getTopic() + " " + file.getName()); + } + } } catch (Exception e) { e.printStackTrace(); System.err.println("Bad configuration file:" + file.getName()); System.exit(-1); } } + for (Entry> entry : topicMap.entrySet()) { + String bootstrapBroker = entry.getKey(); + Properties properties = new Properties(); + bootstrapBroker = bootstrapBroker.replace("9093", "9092"); + properties.put("bootstrap.servers", bootstrapBroker); + AdminClient cl = AdminClient.create(properties); + Set topicSet = cl.listTopics().names().get(); + for (String topicFileMap : entry.getValue()) { + String[] splits = topicFileMap.split(" "); + String topic = splits[0]; + if (!topicSet.contains(topic)) { + System.err.println("Topic:" + topic + " doesn't exist for file:" + splits[1]); + cl.close(); + System.exit(-1); + } + } + cl.close(); + } } } \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java index 68a7681a..91f8b167 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/LogConfigUtils.java @@ -18,6 +18,8 @@ import com.pinterest.singer.common.SingerConfigDef; import com.pinterest.singer.config.ConfigFileServerSet; import com.pinterest.singer.config.DirectorySingerConfigurator; +import com.pinterest.singer.environment.EnvironmentProvider; +import com.pinterest.singer.metrics.StatsPusher; import com.pinterest.singer.thrift.configuration.DummyWriteConfig; import com.pinterest.singer.thrift.configuration.FileNameMatchMode; import com.pinterest.singer.thrift.configuration.HeartbeatWriterConfig; @@ -143,11 +145,6 @@ public static SingerConfig parseDirBasedSingerConfigHeader(String singerConfigFi errors.add("singer.setLogConfigPollIntervalSecs " + "is required for Singer Configuration"); } - String statsPusherHostPort = singerConfiguration.getString("statsPusherHostPort"); - if (statsPusherHostPort != null) { - result.setStatsPusherHostPort(statsPusherHostPort); - } - try { result.singerRestartConfig = parseSingerRestartConfig(configHeader); } catch (ConfigurationException x) { @@ -391,11 +388,16 @@ public static SingerLogConfig[] parseLogStreamConfig(AbstractConfiguration confi /** * Parse the common singer configuration + * @throws ConfigurationException */ - private static SingerConfig parseCommonSingerConfigHeader(AbstractConfiguration singerConfiguration) { + public static SingerConfig parseCommonSingerConfigHeader(AbstractConfiguration singerConfiguration) throws ConfigurationException { SingerConfig singerConfig = new SingerConfig(); - singerConfig.setThreadPoolSize(singerConfiguration.getInt("threadPoolSize")); - singerConfig.setOstrichPort(singerConfiguration.getInt("ostrichPort")); + if (singerConfiguration.containsKey("threadPoolSize")) { + singerConfig.setThreadPoolSize(singerConfiguration.getInt("threadPoolSize")); + } + if (singerConfiguration.containsKey("ostrichPort")) { + singerConfig.setOstrichPort(singerConfiguration.getInt("ostrichPort")); + } if (singerConfiguration.containsKey("writerThreadPoolSize")) { int writerThreadPoolSize = singerConfiguration.getInt("writerThreadPoolSize"); singerConfig.setWriterThreadPoolSize(writerThreadPoolSize); @@ -418,7 +420,41 @@ private static SingerConfig parseCommonSingerConfigHeader(AbstractConfiguration if (singerConfiguration.containsKey("heartbeatEnabled")) { singerConfig.setHeartbeatEnabled(singerConfiguration.getBoolean("heartbeatEnabled")); } + + if (singerConfiguration.containsKey("environmentProviderClass")) { + String envProviderClass = singerConfiguration.getString("environmentProviderClass"); + singerConfig.setEnvironmentProviderClass(envProviderClass); + // environmentProviderClass can be null therefore validation is only needed + // if user has configured it + try { + Class cls = Class.forName(envProviderClass); + if (!EnvironmentProvider.class.isAssignableFrom(cls)) { + throw new ConfigurationException("environmentProviderClass " + envProviderClass + + " doesn't extend " + EnvironmentProvider.class.getName()); + } + } catch (ClassNotFoundException e) { + throw new ConfigurationException("Couldn't find environmentProviderClass " + envProviderClass); + } + } + String statsPusherHostPort = singerConfiguration.getString("statsPusherHostPort"); + if (statsPusherHostPort != null) { + singerConfig.setStatsPusherHostPort(statsPusherHostPort); + + String statsPusherClass = singerConfiguration.getString("statsPusherClass"); + if (statsPusherClass != null) { + singerConfig.setStatsPusherClass(statsPusherClass); + } + try { + Class cls = Class.forName(singerConfig.getStatsPusherClass()); + if (!StatsPusher.class.isAssignableFrom(cls)) { + throw new ConfigurationException("statsPusherClass " + singerConfig.getStatsPusherClass() + + " doesn't extend " + StatsPusher.class.getName()); + } + } catch (ClassNotFoundException e) { + throw new ConfigurationException("Couldn't find statsPusherClass " + statsPusherClass); + } + } return singerConfig; } @@ -466,7 +502,7 @@ private static PulsarProducerConfig parsePulsarProducerConfig(SubsetConfiguratio .setCompressionType(subsetConfiguration.getString(SingerConfigDef.COMPRESSION_TYPE)); } if (subsetConfiguration.containsKey(SingerConfigDef.PARTITIONER_CLASS)) { - pulsarProducerConfig.setPartitionerClass(SingerConfigDef.PARTITIONER_CLASS); + pulsarProducerConfig.setPartitionerClass(subsetConfiguration.getString(SingerConfigDef.PARTITIONER_CLASS)); } if (subsetConfiguration.containsKey(SingerConfigDef.PULSAR_SERVICE_URL)) { pulsarProducerConfig diff --git a/singer/src/main/java/com/pinterest/singer/utils/PartitionComparator.java b/singer/src/main/java/com/pinterest/singer/utils/PartitionComparator.java new file mode 100644 index 00000000..4c7c9f32 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/utils/PartitionComparator.java @@ -0,0 +1,32 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.utils; + +import org.apache.kafka.common.PartitionInfo; + +import java.util.Comparator; + +/** + * Used to sort {@link PartitionInfo} by partitionid + */ +public class PartitionComparator implements Comparator { + + @Override + public int compare(PartitionInfo p1, PartitionInfo p2) { + return Integer.compare(p1.partition(), p2.partition()); + } + +} diff --git a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java index c8a93067..e33cb8bf 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/SingerUtils.java @@ -55,10 +55,12 @@ import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; /** * The utility methods for Singer @@ -274,4 +276,28 @@ public int compare(File file1, File file2) { return NameFileComparator.NAME_REVERSE.compare(file1, file2); } } + + /** + * Make an HTTP Get request on the supplied URI and return the response entity + * as {@link String} + * + * @param uri + * @return + * @throws IOException + */ + public static String makeGetRequest(String uri) throws IOException { + HttpGet getPodRequest = new HttpGet(uri); + try { + CloseableHttpResponse response = SingerUtils.makeRequest(getPodRequest); + if (response.getStatusLine().getStatusCode() != 200) { + LOG.warn("Non-200 status code(" + response.getStatusLine().getStatusCode() + ") reason:" + + response.getStatusLine().getReasonPhrase()); + } + String entity = EntityUtils.toString(response.getEntity()); + response.close(); + return entity; + } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException | IOException e) { + throw new IOException(e); + } + } } diff --git a/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java b/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java index ef7cf9ae..5aa7b91e 100644 --- a/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java +++ b/singer/src/main/java/com/pinterest/singer/utils/StatsUtils.java @@ -33,4 +33,8 @@ public static String getLogStreamStatName(LogStream logStream, String group, Str public static String getLogStatName(SingerLog singerLog, String stat) { return String.format("singer.%s.%s", singerLog.getLogName(), stat); } + + public static String pulsarTopicToMetricTag(String pulsarTopic) { + return pulsarTopic.replaceAll("persistent://", ""); + } } diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java index 9246cefb..54aabf9b 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java +++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaProducerManager.java @@ -85,6 +85,11 @@ private KafkaProducer getProducerInternal(KafkaProducerConfig co result = producers.get(config); return result; } + + + public static void injectTestProducer(KafkaProducerConfig config, KafkaProducer producer) { + KafkaProducerManager.getInstance().producers.put(config, producer); + } /** * Reset the kafka producer diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java index 86444c17..f00069d6 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java +++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaWriter.java @@ -25,7 +25,7 @@ import com.pinterest.singer.thrift.LogMessage; import com.pinterest.singer.thrift.configuration.KafkaProducerConfig; import com.pinterest.singer.thrift.configuration.SingerRestartConfig; -import com.pinterest.singer.utils.SingerUtils; +import com.pinterest.singer.utils.PartitionComparator; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import org.apache.kafka.clients.producer.KafkaProducer; @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -52,8 +53,9 @@ */ public class KafkaWriter implements LogStreamWriter { - public static final String HOSTNAME = SingerUtils.getHostname(); + public static final String HOSTNAME = SingerSettings.getEnvironment().getHostname(); private static final Logger LOG = LoggerFactory.getLogger(KafkaWriter.class); + private static final PartitionComparator COMPARATOR = new PartitionComparator(); // Counter for the number of batch message writing failures private static AtomicInteger failureCounter = new AtomicInteger(0); @@ -118,20 +120,23 @@ public KafkaWriter(LogStream logStream, this.serializer = new TSerializer(); } - protected KafkaWriter(KafkaMessagePartitioner partitioner, - boolean skipNoLeaderPartitions) { + protected KafkaWriter(KafkaProducerConfig producerConfig, + KafkaMessagePartitioner partitioner, + String topic, + boolean skipNoLeaderPartitions, + ExecutorService clusterThreadPool) { this.partitioner = partitioner; this.skipNoLeaderPartitions = skipNoLeaderPartitions; + this.producerConfig = producerConfig; + this.clusterThreadPool = clusterThreadPool; + this.topic = topic; logStream = null; logName = null; - topic = null; writeTimeoutInSeconds = 0; serializer = null; auditingEnabled = false; auditTopic = null; - producerConfig = null; kafkaClusterSig = null; - clusterThreadPool = null; } @Override @@ -167,20 +172,19 @@ private void includeAuditMessageInBatch(List> mes * * If skipNoLeaderPartitions flag is set, we will skip the partitions that have no leader. * - * @param producer kafkaProducer object + * @param partitions unordered list of partitionInfo to be used for partitioning this batch * @param topic the kafka topic * @param logMessages the messages that will be written to kafka * @return a list of message lists that are classified based on partitions. */ List>> messageCollation( - KafkaProducer producer, + List partitions, String topic, List logMessages) throws Exception { LOG.info("Collate " + logMessages.size() + " messages"); List>> buckets = new ArrayList<>(); try { - List partitions = producer.partitionsFor(topic); List validPartitions = partitions; if (skipNoLeaderPartitions) { validPartitions = new ArrayList<>(); @@ -219,7 +223,7 @@ List>> messageCollation( @Override public void writeLogMessages(List logMessages) throws LogStreamWriterException { - Set> set = new HashSet<>(); + Set> resultSet = new HashSet<>(); KafkaProducer producer = KafkaProducerManager.getProducer(producerConfig); Preconditions.checkNotNull(producer); @@ -227,22 +231,30 @@ public void writeLogMessages(List logMessages) throws LogStreamWrite producer.beginTransaction(); } try { + List partitions = producer.partitionsFor(topic); List>> - buckets = messageCollation(producer, topic, logMessages); + buckets = messageCollation(partitions, topic, logMessages); + + // we sort this info after, we have to create a copy of the data since + // the returned list is immutable + List sortedPartitions = new ArrayList<>(partitions); + Collections.sort(sortedPartitions, COMPARATOR); for (List> msgs : buckets) { - if (auditingEnabled && msgs.size() > 0) { - includeAuditMessageInBatch(msgs); + if (msgs.size() > 0) { + if (auditingEnabled) { + includeAuditMessageInBatch(msgs); + } + Callable worker = + new KafkaWritingTask(producer, msgs, writeTimeoutInSeconds, sortedPartitions); + Future future = clusterThreadPool.submit(worker); + resultSet.add(future); } - Callable worker = - new KafkaWritingTask(producer, msgs, writeTimeoutInSeconds); - Future future = clusterThreadPool.submit(worker); - set.add(future); } int bytesWritten = 0; int maxKafkaBatchWriteLatency = 0; - for (Future f : set) { + for (Future f : resultSet) { KafkaWritingTaskResult result = f.get(); if (!result.success) { LOG.error("Failed to write messages to kafka topic {}", topic, result.exception); @@ -286,7 +298,7 @@ public void writeLogMessages(List logMessages) throws LogStreamWrite throw new LogStreamWriterException("Failed to write messages to topic " + topic, e); } finally { - for (Future f : set) { + for (Future f : resultSet) { if (!f.isDone() && !f.isCancelled()) { f.cancel(true); } diff --git a/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java b/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java index 849eabd1..702eb445 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java +++ b/singer/src/main/java/com/pinterest/singer/writer/KafkaWritingTask.java @@ -15,20 +15,21 @@ */ package com.pinterest.singer.writer; -import com.pinterest.singer.common.SingerMetrics; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; -import com.pinterest.singer.metrics.OpenTsdbMetricConverter; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; +import com.pinterest.singer.common.SingerMetrics; +import com.pinterest.singer.metrics.OpenTsdbMetricConverter; public class KafkaWritingTask implements Callable { @@ -38,14 +39,23 @@ public class KafkaWritingTask implements Callable { private List> messages; private int writeTimeoutInSeconds; private long taskCreationTimeInMillis; + private String leaderNode; public KafkaWritingTask(KafkaProducer producer, List> msgs, - int writeTimeoutInSeconds) { + int writeTimeoutInSeconds, + List sortedPartitions) { this.producer = producer; this.messages = msgs; this.writeTimeoutInSeconds = writeTimeoutInSeconds; this.taskCreationTimeInMillis = System.currentTimeMillis(); + try { + leaderNode = sortedPartitions.get(msgs.get(0).partition()).leader().host(); + } catch (Exception e) { + LOG.error("Error getting leader node from partition metadata", e); + OpenTsdbMetricConverter.incr(SingerMetrics.LEADER_INFO_EXCEPTION, 1, "host=" + KafkaWriter.HOSTNAME); + leaderNode = "n/a"; + } } @Override @@ -81,10 +91,13 @@ public KafkaWritingTaskResult call() { } if (result == null) { - long kafkaLatency = System.currentTimeMillis() - taskCreationTimeInMillis; + // we can down convert since latency should be less that Integer.MAX_VALUE + int kafkaLatency = (int)(System.currentTimeMillis() - taskCreationTimeInMillis); // we shouldn't have latency creater than 2B milliseoncds so it should be okay // to downcast to integer result = new KafkaWritingTaskResult(true, bytesWritten, (int) kafkaLatency); + OpenTsdbMetricConverter.incrGranular(SingerMetrics.BROKER_WRITE_SUCCESS, 1, "broker=" + leaderNode); + OpenTsdbMetricConverter.addGranularMetric(SingerMetrics.BROKER_WRITE_LATENCY, kafkaLatency, "broker=" + leaderNode); } } catch (org.apache.kafka.common.errors.RecordTooLargeException e) { LOG.error("Kafka write failure due to excessively large message size", e); @@ -100,6 +113,7 @@ public KafkaWritingTaskResult call() { String errorMsg = "Failed to write " + messages.size() + " messages to kafka"; LOG.error(errorMsg, e); OpenTsdbMetricConverter.incr(SingerMetrics.WRITE_FAILURE, 1, "topic=" + topic, "host=" + KafkaWriter.HOSTNAME); + OpenTsdbMetricConverter.incrGranular(SingerMetrics.BROKER_WRITE_FAILURE, 1, "broker=" + leaderNode); result = new KafkaWritingTaskResult(false, e); } finally { if (!result.success) { diff --git a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwarePartitioner.java b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwarePartitioner.java new file mode 100644 index 00000000..e2eca80a --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwarePartitioner.java @@ -0,0 +1,117 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.writer.partitioners; + +import com.google.common.annotations.VisibleForTesting; +import com.pinterest.singer.common.SingerMetrics; +import com.pinterest.singer.common.SingerSettings; +import com.pinterest.singer.metrics.OpenTsdbMetricConverter; +import com.pinterest.singer.writer.KafkaMessagePartitioner; +import com.pinterest.singer.writer.KafkaWriter; +import org.apache.kafka.common.PartitionInfo; + +import java.util.List; +import java.util.stream.Collectors; + +/** + * Locality (aws ec2 az) aware Kafka partitioner. This partitioner attempts to + * write to partitions whose leaders are in the same locality as the Kafka + * leader for the partition. + */ +public abstract class LocalityAwarePartitioner implements KafkaMessagePartitioner { + + // the locality info is constant for a given instance of Singer and will + // not change during the Singer process lifetime for a given host + protected static String rack; + private long nextRefreshTime; + private long refreshIntervalMs; + protected List localPartitions; + + public LocalityAwarePartitioner(long refreshIntervalMs) { + this.refreshIntervalMs = refreshIntervalMs; + // do a singleton based implementation + if (rack == null) { + synchronized (LocalityAwarePartitioner.class) { + if (rack == null) { + // load rack information from SingerEnvironment + rack = SingerSettings.getEnvironment().getLocality(); + } + } + } + } + + protected LocalityAwarePartitioner(String rack, long refreshIntervalMs) { + LocalityAwarePartitioner.rack = rack; + this.refreshIntervalMs = refreshIntervalMs; + } + + protected void updateNextRefreshTime() { + nextRefreshTime = System.currentTimeMillis() + refreshIntervalMs; + } + + /** + * Retain partitions that are local to this agent + * @param partitions + * @return local partitions + */ + protected List retainLocalPartitions(List partitions) { + return partitions.stream() + .filter(p -> p.leader() != null && p.leader().hasRack() && rack.equals(p.leader().rack())) + .collect(Collectors.toList()); + } + + /** + * Check if local partitions are available if yes then assign those to the localPartitions object + * else assigns all supplied partitions to the localPartitions object + * @param partitions + */ + protected void checkAndAssignLocalPartitions(List partitions) { + List retainLocalPartition = retainLocalPartitions(partitions); + if (retainLocalPartition.isEmpty()) { + OpenTsdbMetricConverter.gauge(SingerMetrics.MISSING_LOCAL_PARTITIONS, 1, "locality=" + rack, + "host=" + KafkaWriter.HOSTNAME); + // reset to all partitions if no local partitions are available + localPartitions = partitions; + } else { + localPartitions = retainLocalPartition; + } + } + + protected boolean isTimeToRefresh() { + return System.currentTimeMillis() > nextRefreshTime; + } + + protected void setNextRefreshTime(long nextRefreshTime) { + this.nextRefreshTime = nextRefreshTime; + } + + public List getLocalPartitions() { + return localPartitions; + } + + public String getRack() { + return rack; + } + + @VisibleForTesting + protected static void nullifyRack() { + rack = null; + } + + public long getRefreshIntervalMs() { + return refreshIntervalMs; + } +} \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java index 50188949..95b0e0e4 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java +++ b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareRandomPartitioner.java @@ -15,64 +15,32 @@ */ package com.pinterest.singer.writer.partitioners; -import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.concurrent.ThreadLocalRandom; -import java.util.stream.Collectors; import org.apache.kafka.common.PartitionInfo; -import com.google.common.annotations.VisibleForTesting; -import com.pinterest.singer.writer.KafkaMessagePartitioner; -import com.pinterest.singer.writer.kafka.localityaware.EC2LocalityInfoProvider; - /** - * Locality (aws ec2 az) aware Kafka partitioner. This partitioner attempts to - * write to partitions whose leaders are in the same locality as the Kafka - * leader for the partition. If no partitions are available in the same - * locality, a random partition from all partitions will be picked. + * Locality aware partitioner random partitioner */ -public class LocalityAwareRandomPartitioner implements KafkaMessagePartitioner { +public class LocalityAwareRandomPartitioner extends LocalityAwarePartitioner { // refresh every 10 seconds public static final long REFRESH_INTERVAL_MS = 10_000; - // the locality info is constant for a given instance of Singer and will - // not change during the Singer process lifetime for a given host - private static String rack; - private long nextRefreshTime; - private long refreshIntervalMs; - private List localPartitions; private ThreadLocalRandom random = ThreadLocalRandom.current(); public LocalityAwareRandomPartitioner() { - refreshIntervalMs = REFRESH_INTERVAL_MS; - // do a singleton based implementation on LocalityInfo provider instantiation - if (rack == null) { - synchronized (LocalityAwareRandomPartitioner.class) { - if (rack == null) { - EC2LocalityInfoProvider provider = new EC2LocalityInfoProvider(); - try { - provider.init(new HashMap<>()); - } catch (IllegalArgumentException | IOException e) { - throw new RuntimeException("Failed to initialize EC2 locality provider", e); - } - rack = provider.getLocalityInfoForLocalhost(); - } - } - } + super(REFRESH_INTERVAL_MS); } protected LocalityAwareRandomPartitioner(String rack, long refreshIntervalMs) { - LocalityAwareRandomPartitioner.rack = rack; - this.refreshIntervalMs = refreshIntervalMs; + super(rack, refreshIntervalMs); } @Override public int partition(Object messageKey, List partitions) { if (localPartitions == null || isTimeToRefresh()) { - List retainLocalPartition = retainLocalPartitions(partitions); - localPartitions = retainLocalPartition.size() > 0 ? retainLocalPartition : partitions; + checkAndAssignLocalPartitions(partitions); // set next refresh time updateNextRefreshTime(); } @@ -80,43 +48,4 @@ public int partition(Object messageKey, List partitions) { return localPartitions.get(Math.abs(random.nextInt() % localPartitions.size())).partition(); } - protected void updateNextRefreshTime() { - nextRefreshTime = System.currentTimeMillis() + refreshIntervalMs; - } - - /** - * Retain partitions that are local to this agent - * @param partitions - * @return local partitions - */ - protected List retainLocalPartitions(List partitions) { - return partitions.stream() - .filter(p -> p.leader() != null && p.leader().hasRack() && rack.equals(p.leader().rack())) - .collect(Collectors.toList()); - } - - protected boolean isTimeToRefresh() { - return System.currentTimeMillis() > nextRefreshTime; - } - - protected void setNextRefreshTime(long nextRefreshTime) { - this.nextRefreshTime = nextRefreshTime; - } - - public List getLocalPartitions() { - return localPartitions; - } - - public String getRack() { - return rack; - } - - @VisibleForTesting - protected static void nullifyRack() { - rack = null; - } - - public long getRefreshIntervalMs() { - return refreshIntervalMs; - } } \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareSinglePartitionPartitioner.java b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareSinglePartitionPartitioner.java new file mode 100644 index 00000000..9c256a74 --- /dev/null +++ b/singer/src/main/java/com/pinterest/singer/writer/partitioners/LocalityAwareSinglePartitionPartitioner.java @@ -0,0 +1,59 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.writer.partitioners; + +import org.apache.kafka.common.PartitionInfo; + +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Locality aware partitioner single partition partitioner. This partitioner can + * heuristically provide better compression ratios for data since all data from + * a given singer agent is written to one partition. For some datasets we have + * seen ~2x compression ratios. + */ +public class LocalityAwareSinglePartitionPartitioner extends LocalityAwarePartitioner { + + // refresh every 30 seconds + public static final long REFRESH_INTERVAL_MS = 30_000; + private ThreadLocalRandom random = ThreadLocalRandom.current(); + private int partitionId; + + public LocalityAwareSinglePartitionPartitioner() { + super(REFRESH_INTERVAL_MS); + } + + protected LocalityAwareSinglePartitionPartitioner(String rack, long refreshIntervalMs) { + super(rack, refreshIntervalMs); + } + + @Override + public int partition(Object messageKey, List partitions) { + if (localPartitions == null || isTimeToRefresh()) { + checkAndAssignLocalPartitions(partitions); + // set next refresh time + updateNextRefreshTime(); + // NOTE we are not doing a delta update here since PartitionInfo object doesn't + // have an overridden hashcode and equals implementation therefore the delta computation + // will be cumbersome + partitionId = localPartitions.get(Math.abs(random.nextInt() % localPartitions.size())) + .partition(); + } + return partitionId; + } + +} \ No newline at end of file diff --git a/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java b/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java index 9764eb5e..c18dad8d 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java +++ b/singer/src/main/java/com/pinterest/singer/writer/pulsar/PulsarWriter.java @@ -18,7 +18,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -42,6 +44,7 @@ import com.pinterest.singer.thrift.configuration.PulsarProducerConfig; import com.pinterest.singer.thrift.configuration.PulsarWriterConfig; import com.pinterest.singer.utils.SingerUtils; +import com.pinterest.singer.utils.StatsUtils; /** * Pulsar Writer is capable of writing to a Pulsar cluster.1 Pulsar Writer @@ -75,10 +78,14 @@ public class PulsarWriter implements LogStreamWriter { + "topic_pulsar_throughput"; public static final String PULSAR_LATENCY = SingerMetrics.SINGER_WRITER + "max_pulsar_batch_write_latency"; + private static Map> producerCache = new ConcurrentHashMap<>(); private LogStream logStream; - private PulsarClient pulsarClient; private Producer producer; private String topic; + + // pulsar topics have colons in the topic name, which is invalid in tsdb. + // Use this string to publish pulsar related metrics. + private String metricTag; private String logName; @SuppressWarnings("unchecked") @@ -102,7 +109,6 @@ public void validateConfig(PulsarProducerConfig producerConfig) throws Configura } } - @SuppressWarnings("unchecked") public PulsarWriter init(LogStream logStream, PulsarWriterConfig writerConfig) throws ConfigurationException, LogStreamWriterException { PulsarProducerConfig producerConfig = writerConfig.getProducerConfig(); @@ -110,41 +116,63 @@ public PulsarWriter init(LogStream logStream, PulsarWriterConfig writerConfig) this.logStream = logStream; this.logName = logStream.getLogStreamName(); this.topic = writerConfig.getTopic(); + this.metricTag = StatsUtils.pulsarTopicToMetricTag(this.topic); + + if ((producer = producerCache.get(getProducerKey(producerConfig, topic))) == null) { + synchronized (producerCache) { + if ((producer = producerCache.get(getProducerKey(producerConfig, topic))) == null) { + producer = createProducer(topic, logName, producerConfig); + producerCache.put(getProducerKey(producerConfig, topic), producer); + LOG.info("Created new Pulsar producer with pulsarServiceUrl:" + producerConfig.getServiceUrl() + + " topic:" + topic + " logStream:" + logName); + } + } + } + return this; + } + + @SuppressWarnings("unchecked") + public static Producer createProducer(String topic, String logName, PulsarProducerConfig producerConfig) throws LogStreamWriterException { + PulsarClient pulsarClient = null; try { - pulsarClient = PulsarClient.builder().serviceUrl(producerConfig.getServiceUrl()).build(); + pulsarClient = PulsarClient.builder().serviceUrl(producerConfig.getServiceUrl()) + .build(); } catch (PulsarClientException e) { throw new LogStreamWriterException( - "Failed to build Pulsar client with service URL:" + producerConfig.getServiceUrl(), e); + "Failed to build Pulsar client with service URL:" + producerConfig.getServiceUrl(), + e); } - LOG.info("Created Pulsar client to connect to:" + producerConfig.getServiceUrl() + " topic:" - + topic + " logStream:" + logName); + LOG.info("Created Pulsar client to connect to:" + producerConfig.getServiceUrl() + + " topic:" + topic + " logStream:" + logName); try { - producer = pulsarClient.newProducer() + Producer producer = pulsarClient.newProducer() .compressionType( CompressionType.valueOf(producerConfig.getCompressionType().toUpperCase())) .messageRoutingMode(MessageRoutingMode.CustomPartition) - .messageRouter(new PulsarMessageRouter( - (Class) Class.forName(producerConfig.getPartitionerClass()))) - .sendTimeout(producerConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS).topic(topic) - .blockIfQueueFull(true).batchingMaxMessages(producerConfig.getBatchingMaxMessages()) + .messageRouter(new PulsarMessageRouter((Class) Class + .forName(producerConfig.getPartitionerClass()))) + .sendTimeout(producerConfig.getWriteTimeoutInSeconds(), TimeUnit.SECONDS) + .topic(topic).blockIfQueueFull(true) + .batchingMaxMessages(producerConfig.getBatchingMaxMessages()) .maxPendingMessages(producerConfig.getMaxPendingMessages()) .batchingMaxPublishDelay(producerConfig.getBatchingMaxPublishDelayInMilli(), TimeUnit.MILLISECONDS) .enableBatching(true).maxPendingMessagesAcrossPartitions( producerConfig.getMaxPendingMessagesAcrossPartitions()) .create(); + return producer; } catch (PulsarClientException | InstantiationException | IllegalAccessException | ClassNotFoundException e) { throw new LogStreamWriterException("Failed to initialize Pulsar writer", e); } - LOG.info("Created Pulsar producer with pulsarServiceUrl:" + producerConfig.getServiceUrl() - + " topic:" + topic + " logStream:" + logName); - return this; + } + + public static String getProducerKey(PulsarProducerConfig producerConfig, String topic) { + return producerConfig.getServiceUrl() + "__" + topic; } @Override public void close() throws IOException { - // TODO Auto-generated method stub producer.flush(); } @@ -179,20 +207,19 @@ public void writeLogMessages(List messages) throws LogStreamWriterEx future.get(); } } catch (PulsarClientException | InterruptedException | ExecutionException e) { - OpenTsdbMetricConverter.incr(PULSAR_WRITE_FAILURE, messages.size(), "topic=" + topic, + OpenTsdbMetricConverter.incr(PULSAR_WRITE_FAILURE, messages.size(), "topic=" + metricTag, "host=" + HOSTNAME, "logname=" + logName); throw new LogStreamWriterException("Message delivery failed", e); } - - maxPulsarWriteLatency = System.currentTimeMillis() - maxPulsarWriteLatency; - OpenTsdbMetricConverter.gauge(PULSAR_THROUGHPUT, bytesWritten, "topic=" + topic, + maxPulsarWriteLatency = System.currentTimeMillis() - maxPulsarWriteLatency; + OpenTsdbMetricConverter.gauge(PULSAR_THROUGHPUT, bytesWritten, "topic=" + metricTag, "host=" + HOSTNAME, "logname=" + logName); - OpenTsdbMetricConverter.gauge(PULSAR_LATENCY, maxPulsarWriteLatency, "topic=" + topic, + OpenTsdbMetricConverter.gauge(PULSAR_LATENCY, maxPulsarWriteLatency, "topic=" + metricTag, "host=" + HOSTNAME, "logname=" + logName); - OpenTsdbMetricConverter.incr(NUM_PULSAR_MESSAGES, messages.size(), "topic=" + topic, + OpenTsdbMetricConverter.incr(NUM_PULSAR_MESSAGES, messages.size(), "topic=" + metricTag, "host=" + HOSTNAME, "logname=" + logName); - LOG.info("Completed batch writes to Pulsar topic:" + topic + " size:" + messages.size()); + LOG.info("Completed batch writes to Pulsar topic:" + metricTag + " size:" + messages.size()); } diff --git a/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java b/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java index acf6c279..22098217 100644 --- a/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java +++ b/singer/src/test/java/com/pinterest/singer/common/TestSingerSettings.java @@ -15,12 +15,23 @@ */ package com.pinterest.singer.common; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import org.junit.Rule; import org.junit.Test; +import org.junit.contrib.java.lang.system.EnvironmentVariables; +import com.pinterest.singer.environment.EnvVariableBasedEnvironmentProvider; import com.pinterest.singer.monitor.DefaultLogMonitor; +import com.pinterest.singer.thrift.configuration.SingerConfig; public class TestSingerSettings { + @Rule + public final EnvironmentVariables environmentVariables = new EnvironmentVariables(); + /** * This test is to ensure that if there are any changes to the static method's * interface and no corresponding changes made to the initializer, we can catch @@ -36,4 +47,36 @@ public void testLogMonitorMethodInstance() throws ClassNotFoundException, NoSuch SingerSettings.getLogMonitorStaticInstanceMethod(DefaultLogMonitor.class.getName()); } + @Test + public void testEnvironmentLoader() { + SingerConfig config = new SingerConfig(); + SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config); + assertNotNull(SingerSettings.getEnvironment()); + // check by default environment provider class is null + assertNull(config.getEnvironmentProviderClass()); + assertEquals(null, SingerSettings.getEnvironment().getDeploymentStage()); + + // bad class name should cause an exception to be logged but MUST not throw an + // error and environment should still be correct + config.setEnvironmentProviderClass("com.pinterest.xyz"); + SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config); + assertNotNull(SingerSettings.getEnvironment()); + assertEquals(null, SingerSettings.getEnvironment().getDeploymentStage()); + + environmentVariables.set("DEPLOYMENT_STAGE", "CANARY"); + environmentVariables.set("LOCALITY", "us-east-1a"); + config.setEnvironmentProviderClass(EnvVariableBasedEnvironmentProvider.class.getName()); + SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config); + assertNotNull(SingerSettings.getEnvironment()); + assertEquals("CANARY", SingerSettings.getEnvironment().getDeploymentStage()); + assertEquals("us-east-1a", SingerSettings.getEnvironment().getLocality()); + + environmentVariables.set("DEPLOYMENT_STAGE", "DEV"); + config.setEnvironmentProviderClass(EnvVariableBasedEnvironmentProvider.class.getName()); + SingerSettings.loadAndSetSingerEnvironmentIfConfigured(config); + assertNotNull(SingerSettings.getEnvironment()); + assertEquals("DEV", SingerSettings.getEnvironment().getDeploymentStage()); + assertEquals("us-east-1a", SingerSettings.getEnvironment().getLocality()); + } + } diff --git a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java index 6e4cd1c4..0888daa0 100644 --- a/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java +++ b/singer/src/test/java/com/pinterest/singer/utils/TestLogConfigUtils.java @@ -36,7 +36,7 @@ public class TestLogConfigUtils { @Test - public void testRealpinTTLParsing() throws ConfigurationException { + public void testRealPinTTLParsing() throws ConfigurationException { String CONFIG = "" + "topic=test\n" + "objectType=mobile_perf_log\n" + "serverSetPath=/xyz/realpin/prod"; PropertiesConfiguration config = new PropertiesConfiguration(); @@ -103,12 +103,11 @@ public void testKafkaProducerConfigCatchBadPartitioner() throws ConfigurationExc } for (String type : Arrays.asList( - "com.pinterest.singer.writer.partitioners.Crc32ByteArrayPartitioner", - "com.pinterest.singer.writer.partitioners.DefaultPartitioner", - "com.pinterest.singer.writer.partitioners.SimpleRoundRobinPartitioner", + "com.pinterest.singer.writer.partitioners.Crc32ByteArrayPartitioner", + "com.pinterest.singer.writer.partitioners.DefaultPartitioner", + "com.pinterest.singer.writer.partitioners.SimpleRoundRobinPartitioner", "com.pinterest.singer.writer.partitioners.SinglePartitionPartitioner", - "com.pinterest.singer.writer.partitioners.LocalityAwareRandomPartitioner" - )) { + "com.pinterest.singer.writer.partitioners.LocalityAwareRandomPartitioner")) { map.put("partitioner.class", type); try { LogConfigUtils.parseProducerConfig(config); @@ -118,6 +117,50 @@ public void testKafkaProducerConfigCatchBadPartitioner() throws ConfigurationExc } } + @Test + public void testBaseSingerConfig() throws ConfigurationException { + Map map = new HashMap<>(); + map.put("monitor.monitorIntervalInSecs", "10"); + map.put("statsPusherHostPort", "localhost:1900"); + AbstractConfiguration config = new MapConfiguration(map); + // defaults should be correct + LogConfigUtils.parseCommonSingerConfigHeader(config); + + // must throw configuration exception + map.put("statsPusherClass", "com.pinterest.singer.monitor.DefaultLogMonitor"); + try { + LogConfigUtils.parseCommonSingerConfigHeader(config); + fail( + "Must fail since the supplied class is not a valid StatsPusher class but it is a valid class"); + } catch (Exception e) { + } + + map.put("statsPusherClass", "com.pinterest.singer.monitor.Xyz"); + try { + LogConfigUtils.parseCommonSingerConfigHeader(config); + fail( + "Must fail since the supplied class is not a valid class"); + } catch (Exception e) { + } + map.remove("statsPusherClass"); + // cleanup after stats test + + map.put("environmentProviderClass", "com.pinterest.singer.monitor.Xyz"); + try { + LogConfigUtils.parseCommonSingerConfigHeader(config); + fail( + "Must fail since the supplied class is not a valid class"); + } catch (Exception e) { + } + map.put("environmentProviderClass", "com.pinterest.singer.monitor.DefaultLogMonitor"); + try { + LogConfigUtils.parseCommonSingerConfigHeader(config); + fail( + "Must fail since the supplied class is not a valid class"); + } catch (Exception e) { + } + } + @Test public void testKafkaProducerConfigAck() throws ConfigurationException { Map map = new HashMap<>(); diff --git a/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java b/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java index e586419a..b0af9f01 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java +++ b/singer/src/test/java/com/pinterest/singer/writer/TestKafkaWriter.java @@ -2,6 +2,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.when; import java.nio.ByteBuffer; @@ -10,11 +11,15 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.pulsar.shade.org.apache.commons.lang3.concurrent.ConcurrentUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -23,7 +28,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; +import com.pinterest.singer.common.SingerSettings; import com.pinterest.singer.thrift.LogMessage; +import com.pinterest.singer.thrift.configuration.KafkaProducerConfig; +import com.pinterest.singer.thrift.configuration.SingerConfig; @RunWith(MockitoJUnitRunner.class) public class TestKafkaWriter { @@ -35,8 +43,12 @@ public class TestKafkaWriter { @Test public void testPartitioningParityCRC() throws Exception { KafkaMessagePartitioner partitioner = new Crc32ByteArrayPartitioner(); + KafkaProducerConfig config = new KafkaProducerConfig(); + SingerSettings.setSingerConfig(new SingerConfig()); + KafkaProducerManager.injectTestProducer(config, producer); // default value for skip noleader partition is false - KafkaWriter writer = new KafkaWriter(partitioner, false); + KafkaWriter writer = new KafkaWriter(config, partitioner, "topicx", false, + Executors.newCachedThreadPool()); List partitions = ImmutableList.copyOf(Arrays.asList( new PartitionInfo("topicx", 1, new Node(2, "broker2", 9092, "us-east-1b"), null, null), @@ -51,6 +63,8 @@ public void testPartitioningParityCRC() throws Exception { new PartitionInfo("topicx", 9, new Node(5, "broker5", 9092, "us-east-1b"), null, null), new PartitionInfo("topicx", 10, new Node(1, "broker1", 9092, "us-east-1a"), null, null))); when(producer.partitionsFor("topicx")).thenReturn(partitions); + when(producer.send(any())).thenReturn(ConcurrentUtils.constantFuture( + new RecordMetadata(new TopicPartition("topicx", 0), 0L, 0L, 0L, 0L, 0, 0))); Map> msgPartitionMap = new HashMap<>(); HashFunction crc32 = Hashing.crc32(); @@ -70,8 +84,8 @@ public void testPartitioningParityCRC() throws Exception { list.add(logMessage); } - List>> messageCollation = writer.messageCollation(producer, - "topicx", logMessages); + List>> messageCollation = writer + .messageCollation(partitions, "topicx", logMessages); for (int i = 0; i < messageCollation.size(); i++) { List> writerOutput = messageCollation.get(i); List originalData = msgPartitionMap.get(i); @@ -84,6 +98,9 @@ public void testPartitioningParityCRC() throws Exception { assertTrue(Arrays.equals(originalData.get(j).getKey(), writerOutput.get(j).key())); } } + + // validate if writes are throwing any error + writer.writeLogMessages(logMessages); writer.close(); } diff --git a/thrift-logger/pom.xml b/thrift-logger/pom.xml index e0415d21..af5229b1 100644 --- a/thrift-logger/pom.xml +++ b/thrift-logger/pom.xml @@ -4,7 +4,7 @@ com.pinterest.singer singer-package - 0.7.3.21 + 0.7.3.25 ../pom.xml thrift-logger diff --git a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java index d603ce8f..f17de032 100644 --- a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java +++ b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OpenTsdbMetricConverter.java @@ -47,9 +47,11 @@ * The "addMetric" static function is provided to make it easier to add Ostrich metric names that * contain tags. */ -public class OpenTsdbMetricConverter { +public class OpenTsdbMetricConverter { + // According to http://opentsdb.net/docs/build/html/user_guide/writing.html public static final String VALID_OPENSTD_STAT_TAG_PATTERN = "[a-zA-Z0-9_./-]+"; + private static boolean enableGranularMetrics; private final String prefix; private final String defaultTags; @@ -156,6 +158,13 @@ public static void addMetric(String name, int value) { public static void addMetric(String name, int value, String... tags) { Stats.addMetric(nameMetric(name, tags), value); } + + public static void addGranularMetric(String name, int value, String... tags) { + if (!enableGranularMetrics) { + return; + } + Stats.addMetric(nameMetric(name, tags), value); + } public static void incr(String name) { Stats.incr(name); @@ -176,4 +185,26 @@ public static void gauge(String name, double value) { public static void gauge(String name, double value, String... tags) { Stats.setGauge(nameMetric(name, tags), value); } -} + + public static void gaugeGranular(String name, double value, String... tags) { + if (!enableGranularMetrics) { + return; + } + Stats.setGauge(nameMetric(name, tags), value); + } + + public static void incrGranular(String name, int i, String... tags) { + if (!enableGranularMetrics) { + return; + } + Stats.incr(nameMetric(name, tags), i); + } + + public static void setEnableGranularMetrics(boolean granularMetrics) { + enableGranularMetrics = granularMetrics; + } + + public static boolean isEnableGranularMetrics() { + return enableGranularMetrics; + } +} \ No newline at end of file diff --git a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java index d5caa032..15088c26 100644 --- a/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java +++ b/thrift-logger/src/main/java/com/pinterest/singer/metrics/OstrichAdminService.java @@ -50,6 +50,7 @@ public void addHandler(String path, CustomHttpHandler handler) { this.customHttpHandlerMap.put(path, handler); } + @SuppressWarnings("restriction") public void start() { try { Properties properties = new Properties(); @@ -62,7 +63,6 @@ public void start() { Duration[] defaultLatchIntervals = {Duration.apply(1, TimeUnit.MINUTES)}; Iterator durationIterator = Arrays.asList(defaultLatchIntervals).iterator(); - @SuppressWarnings("deprecation") AdminServiceFactory adminServiceFactory = new AdminServiceFactory( this.port, 20, diff --git a/thrift-logger/src/main/java/com/pinterest/singer/metrics/StatsPusher.java b/thrift-logger/src/main/java/com/pinterest/singer/metrics/StatsPusher.java new file mode 100644 index 00000000..ba3c5b1a --- /dev/null +++ b/thrift-logger/src/main/java/com/pinterest/singer/metrics/StatsPusher.java @@ -0,0 +1,80 @@ +/** + * Copyright 2019 Pinterest, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.pinterest.singer.metrics; + +import com.twitter.ostrich.stats.Stats$; +import com.twitter.ostrich.stats.StatsListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Provides an abstraction to plugin custom metrics destination for Singer + * metrics. To implement your own {@link StatsPusher} you will need to + * override the sendMetrics method which is invoked at the configured frequency. + *

+ * {@link StatsPusher} is designed to run as an independent Daemon Thread. + */ +public abstract class StatsPusher extends Thread { + + private static final Logger LOG = LoggerFactory.getLogger(StatsPusher.class); + protected StatsListener statsListener; + protected long pollMillis; + protected String sourceHostname; + protected String metricsPrefix; + protected String destinationHost; + protected int destinationPort; + + public StatsPusher() { + this.statsListener = new StatsListener(Stats$.MODULE$); + setDaemon(true); + } + + public void configure(String sourceHostname, + String metricsPrefix, + String destinationHost, + int destinationPort, + long pollMillis) throws IOException { + this.sourceHostname = sourceHostname; + this.metricsPrefix = metricsPrefix; + this.destinationHost = destinationHost; + this.destinationPort = destinationPort; + this.pollMillis = pollMillis; + } + + @Override + public void run() { + try { + // Ignore the first interval, since we don't know when stats started being + // recorded, + // and we want to make sure all intervals are roughly the same length. + statsListener.get(); + Thread.sleep(pollMillis); + while (!Thread.currentThread().isInterrupted()) { + long elapsedTimeMillis = sendMetrics(true); + Thread.sleep(Math.max(0, pollMillis - elapsedTimeMillis)); + } + } catch (InterruptedException ex) { + LOG.info("OpenTsdbMetricsPusher thread interrupted, exiting"); + } catch (Exception ex) { + LOG.error("Unexpected error in OpenTSDBMetricsPusher, exiting", ex); + } + } + + public abstract long sendMetrics(boolean retryOnFailure) throws InterruptedException, IOException; + +} \ No newline at end of file diff --git a/thrift-logger/src/main/thrift/config.thrift b/thrift-logger/src/main/thrift/config.thrift index 5ca83543..b1d9b47d 100644 --- a/thrift-logger/src/main/thrift/config.thrift +++ b/thrift-logger/src/main/thrift/config.thrift @@ -127,7 +127,7 @@ struct DummyWriteConfig { enum RealpinObjectType { MOBILE_PERF_LOG = 0 - PROMOTIONS_INSERTION = 1 + PIN_PROMOTIONS_INSERTION = 1 } struct RealpinWriterConfig { @@ -146,7 +146,7 @@ struct PulsarProducerConfig { 2: required string serviceUrl; 3: optional string compressionType = "NONE"; 4: optional i32 writeTimeoutInSeconds = 60; - 5: optional string partitionerClass = "com.pinterest.singer.writer.partitioners.DefaultPartitioner"; + 5: optional string partitionerClass = "com.pinterest.singer.writer.pulsar.DefaultPartitioner"; 6: optional i32 batchingMaxMessages = 100; 7: optional i32 maxPendingMessages = 2000; 8: optional i32 batchingMaxPublishDelayInMilli = 10; @@ -243,12 +243,12 @@ struct SingerConfig { /** * global thread pool size. Parsed directly from the singer config file. */ - 1: required i32 threadPoolSize; + 1: required i32 threadPoolSize = 20; /** * port number of ostrich. Parsed directly from the singer config file. */ - 2: required i32 ostrichPort; + 2: required i32 ostrichPort = 2047; /** * log monitor parameters. Parsed directly from the singer config file. @@ -266,7 +266,7 @@ struct SingerConfig { 5: optional i32 logConfigPollIntervalSecs = 10; /** - * stats pusher host and port + * Stats pusher host and port */ 6: optional string statsPusherHostPort; @@ -320,4 +320,14 @@ struct SingerConfig { */ 16: optional bool runCommandServer = false; + /** + * Class name for stats pusher + */ + 17: optional string statsPusherClass = "com.pinterest.singer.metrics.OpenTsdbStatsPusher"; + + /** + * Singer Environment provider class + */ + 18: optional string environmentProviderClass; + } \ No newline at end of file diff --git a/thrift-logger/src/main/thrift/thrift_message.thrift b/thrift-logger/src/main/thrift/thrift_message.thrift index cae98ba9..5023dcf8 100644 --- a/thrift-logger/src/main/thrift/thrift_message.thrift +++ b/thrift-logger/src/main/thrift/thrift_message.thrift @@ -15,6 +15,6 @@ struct Event { // Note that we call this time instead of timestamp since timestamp is reserved in Hive. 1: i64 time, 2: i32 eventType, - 3: i64 userId, // user id - 4: i64 objectId, // field to store generic object info + 3: i64 userId, // the user performing the action + 4: i64 objectId, // can be a user, pin, board, comment } \ No newline at end of file