Skip to content

Commit

Permalink
update singer with 0.7.3.25 changes (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuyang08 authored Aug 22, 2019
1 parent 0f08daa commit 7282d5c
Show file tree
Hide file tree
Showing 32 changed files with 930 additions and 209 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.7.3.21</version>
<version>0.7.3.25</version>
<packaging>pom</packaging>
<description>Singer Logging Agent modules</description>
<inceptionYear>2013</inceptionYear>
Expand Down
2 changes: 1 addition & 1 deletion singer/deb.version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.7.3.16
0.7.3.25
4 changes: 2 additions & 2 deletions singer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>com.pinterest.singer</groupId>
<artifactId>singer-package</artifactId>
<version>0.7.3.21</version>
<version>0.7.3.25</version>
<relativePath>../pom.xml</relativePath>
</parent>
<licenses>
Expand Down Expand Up @@ -200,7 +200,7 @@
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.2.1</version>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
Expand Down
35 changes: 20 additions & 15 deletions singer/src/main/java/com/pinterest/singer/SingerMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.pinterest.singer.config.DirectorySingerConfigurator;
import com.pinterest.singer.config.SingerConfigurator;
import com.pinterest.singer.heartbeat.HeartbeatGenerator;
import com.pinterest.singer.metrics.StatsPusher;
import com.pinterest.singer.metrics.OpenTsdbMetricConverter;
import com.pinterest.singer.metrics.OpenTsdbMetricsPusher;
import com.pinterest.singer.metrics.OstrichAdminService;
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.utils.SingerUtils;
Expand All @@ -34,10 +34,11 @@

public final class SingerMain {

private static final String SINGER_METRICS_PREFIX = "singer";
private static final Logger LOG = LoggerFactory.getLogger(SingerMain.class);
private static final int TSDB_METRICS_PUSH_INTERVAL_IN_MILLISECONDS = 10 * 1000;
private static final int STATS_PUSH_INTERVAL_IN_MILLISECONDS = 10 * 1000;
protected static final String hostName = SingerUtils.getHostname();
private static OpenTsdbMetricsPusher metricsPusher = null;
private static StatsPusher statsPusher = null;
private static String singerPath = "";

static class SingerCleanupThread extends Thread {
Expand Down Expand Up @@ -70,8 +71,8 @@ public void run() {

try {
OpenTsdbMetricConverter.incr("singer.shutdown", 1);
if (metricsPusher!= null) {
metricsPusher.sendMetrics(false);
if (statsPusher!= null) {
statsPusher.sendMetrics(false);
} else {
LOG.error("metricsPusher was not initialized properly.");
}
Expand All @@ -81,24 +82,28 @@ public void run() {
}
}

static void startOstrichService(SingerConfig singerConfig) {
public static void startOstrichService(SingerConfig singerConfig) {
// do not start ostrich if Ostrich server is disabled
if (System.getenv(SingerMetrics.DISABLE_SINGER_OSTRICH) == null) {
OstrichAdminService ostrichService = new OstrichAdminService(singerConfig.getOstrichPort());
ostrichService.start();
}
// enable high granularity metrics we are running in canary
if (singerConfig.isSetStatsPusherHostPort()) {
LOG.info("Starting the OpenTsdb metrics pusher");
LOG.info("Starting the stats pusher");
try {
@SuppressWarnings("unchecked")
Class<StatsPusher> pusherClass = (Class<StatsPusher>) Class.forName(singerConfig.getStatsPusherClass());
statsPusher = pusherClass.newInstance();
HostAndPort pushHostPort = HostAndPort.fromString(singerConfig.getStatsPusherHostPort());
metricsPusher = new OpenTsdbMetricsPusher(
pushHostPort.getHost(),
pushHostPort.getPort(),
// TODO: make the following 'prefix' and 'interval' configurable.
new OpenTsdbMetricConverter("singer", hostName),
TSDB_METRICS_PUSH_INTERVAL_IN_MILLISECONDS);
metricsPusher.start();
LOG.info("OpenTsdb metrics pusher started!");
// allows hostname to be overridden based on environment
statsPusher.configure(SingerSettings.getEnvironment().getHostname()
, SINGER_METRICS_PREFIX
, pushHostPort.getHost()
, pushHostPort.getPort()
, STATS_PUSH_INTERVAL_IN_MILLISECONDS);
statsPusher.start();
LOG.info("Stats pusher started!");
} catch (Throwable t) {
// pusher fail is OK, do
LOG.error("Exception when starting stats pusher: ", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public class SingerMetrics {
public static final String NUM_KAFKA_MESSAGES = SINGER_WRITER + "num_kafka_messages_delivery_success";
public static final String OVERSIZED_MESSAGES = SINGER_WRITER + "num_oversized_messages";
public static final String WRITE_FAILURE = SINGER_WRITER + "kafka_write_failure";
public static final String BROKER_WRITE_FAILURE = SINGER_WRITER + "broker_write_failure";
public static final String BROKER_WRITE_SUCCESS = SINGER_WRITER + "broker_write_success";
public static final String BROKER_WRITE_LATENCY = SINGER_WRITER + "broker_write_latency";
public static final String WRITER_BATCH_SIZE = SINGER_WRITER + "message_batch_size";
public static final String WRITER_SSL_EXCEPTION = SINGER_WRITER + "ssl_exception";
public static final String KAFKA_THROUGHPUT = SINGER_WRITER + "topic_kafka_throughput";
Expand Down Expand Up @@ -93,5 +96,7 @@ public class SingerMetrics {
public static final String SINGER_CONFIGURATOR_CONFIG_ERRORS_UNKNOWN = "singer.configurator.unexpected_config_errors";
public static final String LOCALITY_MISSING = "singer.locality.missing";
public static final String DISABLE_SINGER_OSTRICH = "DISABLE_OSTRICH_METRICS";
public static final String LEADER_INFO_EXCEPTION = SINGER_WRITER + "leader_info_exception";
public static final String MISSING_LOCAL_PARTITIONS = "singer.locality.missing_local_partitions";

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

import com.pinterest.singer.common.errors.SingerLogException;
import com.pinterest.singer.config.SingerDirectoryWatcher;
import com.pinterest.singer.environment.Environment;
import com.pinterest.singer.environment.EnvironmentProvider;
import com.pinterest.singer.heartbeat.HeartbeatGenerator;
import com.pinterest.singer.kubernetes.KubeService;
import com.pinterest.singer.monitor.FileSystemMonitor;
import com.pinterest.singer.monitor.LogStreamManager;
import com.pinterest.singer.thrift.configuration.SingerConfig;
import com.pinterest.singer.thrift.configuration.SingerLogConfig;
import com.twitter.ostrich.stats.Stats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -86,6 +89,9 @@ public final class SingerSettings {

// initialized here so that unit tests aren't required to call the initialize method below
private static SortedMap<String, Collection<SingerLogConfig>> logConfigMap = new TreeMap<>();

//environment is production by default
private static Environment environment = new Environment();

private SingerSettings() {
}
Expand All @@ -98,6 +104,9 @@ public static void initialize(SingerConfig config)
NoSuchMethodException,
SingerLogException {
setSingerConfig(config);

loadAndSetSingerEnvironmentIfConfigured(config);
LOG.warn("Singer environment has been configured to:" + environment);

SingerSettings.logProcessorExecutor = Executors.newScheduledThreadPool(
singerConfig.getThreadPoolSize(),
Expand Down Expand Up @@ -224,6 +233,25 @@ public static synchronized FileSystemMonitor getOrCreateFileSystemMonitor(String
return mon;
}

protected static void loadAndSetSingerEnvironmentIfConfigured(SingerConfig config) {
if (config.getEnvironmentProviderClass() != null) {
try {
String environmentProviderClass = config.getEnvironmentProviderClass();
@SuppressWarnings("unchecked")
Class<EnvironmentProvider> providerClass = (Class<EnvironmentProvider>)
Class.forName(environmentProviderClass);
EnvironmentProvider provider = providerClass.newInstance();
Environment env = provider.getEnvironment();
if (env != null) {
environment = env;
return;
}
} catch (Exception e) {
LOG.error("Failed to load Singer Environment configuration", e);
}
}
}

public static Map<String, FileSystemMonitor> getFsMonitorMap() {
return fsMonitorMap;
}
Expand Down Expand Up @@ -282,4 +310,13 @@ public static ScheduledExecutorService getLogProcessorExecutor() {
public static void initializeConfigMap(SingerConfig config) {
logConfigMap = loadLogConfigMap(config);
}

public static Environment getEnvironment() {
return environment;
}

@VisibleForTesting
public static void setEnvironment(Environment environment) {
SingerSettings.environment = environment;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright 2019 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.singer.environment;

/**
* Uses environment variables to specify the provider. This class defaults to
* Environment.PROD unless the environment variable is explicitly configured.
*/
public class EnvVariableBasedEnvironmentProvider extends EnvironmentProvider {

private static final String DEPLOYMENT_STAGE = "DEPLOYMENT_STAGE";
private static final String LOCALITY = "LOCALITY";

@Override
protected String getLocality() {
String locality = System.getenv(LOCALITY);
if (locality != null) {
return locality;
} else {
return Environment.LOCALITY_NOT_AVAILABLE;
}
}

@Override
protected String getDeploymentStage () {
return System.getenv(DEPLOYMENT_STAGE);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright 2019 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.singer.environment;

import com.pinterest.singer.utils.SingerUtils;

/**
* This indicates what environment is Singer running in.
*
* Singer Environment indicator can subsequently be used by any component that
* needs to switch functionality based on the environment it is running in.
*
* NOTE: all variable MUST have default initialized in case the loader doesn't
* work, all getters must return a NON-NULL value unless NULLs are expected.
*/
public class Environment {

public static final String LOCALITY_NOT_AVAILABLE = "n/a";
public static final String DEFAULT_HOSTNAME = SingerUtils.getHostname();
private String locality = LOCALITY_NOT_AVAILABLE;
private String deploymentStage;
private String hostname = DEFAULT_HOSTNAME;

/**
* @return the locality
*/
public String getLocality() {
return locality;
}

/**
* @param locality the locality to set
*/
public void setLocality(String locality) {
this.locality = locality;
}

/**
* @return the deploymentStage
*/
public String getDeploymentStage() {
return deploymentStage;
}

/**
* @param deploymentStage the deploymentStage to set
*/
public void setDeploymentStage(String deploymentStage) {
this.deploymentStage = deploymentStage;
}

/**
* @return the hostname
*/
public String getHostname() {
return hostname;
}

/**
* @param hostname the hostname to set
*/
public void setHostname(String hostname) {
this.hostname = hostname;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/**
* Copyright 2019 Pinterest, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.pinterest.singer.environment;

/**
* Environment provider for Singer, defaults to production. This class can be
* extended to change the behavior of environment provider.
*/
public abstract class EnvironmentProvider {

protected Environment environment = new Environment();

public EnvironmentProvider() {
environment.setDeploymentStage(getDeploymentStage());
environment.setLocality(getLocality());
environment.setHostname(getHostname());
}

protected abstract String getLocality();

protected abstract String getDeploymentStage();

protected String getHostname() {
return Environment.DEFAULT_HOSTNAME;
}

public Environment getEnvironment() {
return environment;
}

}
Loading

0 comments on commit 7282d5c

Please sign in to comment.