Skip to content

Commit

Permalink
Release AmazonKinesisAgent 1.1.2
Browse files Browse the repository at this point in the history
  • Loading branch information
chaochenq committed Sep 16, 2016
1 parent 78e7c50 commit e1d48d8
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 82 deletions.
4 changes: 2 additions & 2 deletions bin/aws-kinesis-agent-babysit
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ $SERVICE status >/dev/null 2>&1
status=$?

if [ "$status" -eq "1" -o "$status" -eq "2" ]; then
start_agent
start_agent
fi

exit 0
exit 0
149 changes: 74 additions & 75 deletions bin/aws-kinesis-agent.RedHat
Original file line number Diff line number Diff line change
Expand Up @@ -68,25 +68,25 @@ do_start () {
export AWS_SECRET_ACCESS_KEY
export AWS_DEFAULT_REGION

DAEMON_NAME=$DAEMON_NAME nohup runuser $AGENT_USER -s /bin/sh -c "$DAEMON_EXEC -L $AGENT_LOG_LEVEL $AGENT_ARGS $@" > $INITLOGFILE 2>&1 &

pid=$!
echo $pid > $PIDFILE

# wait a bit and make sure process still there
sleep 2
checkpid $pid
RETVAL=$?
# create the lock file
if [[ $RETVAL != 0 && -s $INITLOGFILE ]]; then
echo "Initialization logs can be found in $INITLOGFILE" >&2
cat $INITLOGFILE >&2
else
rm -f $INITLOGFILE
fi
DAEMON_NAME=$DAEMON_NAME nohup runuser $AGENT_USER -s /bin/sh -c "$DAEMON_EXEC -L $AGENT_LOG_LEVEL $AGENT_ARGS $@" > $INITLOGFILE 2>&1 &

# output status message
[[ $RETVAL == 0 ]] && success "$DAEMON_NAME startup" || failure "$DAEMON_NAME startup"
pid=$!
echo $pid > $PIDFILE

# wait a bit and make sure process still there
sleep 2
checkpid $pid
RETVAL=$?
# create the lock file
if [[ $RETVAL != 0 && -s $INITLOGFILE ]]; then
echo "Initialization logs can be found in $INITLOGFILE" >&2
cat $INITLOGFILE >&2
else
rm -f $INITLOGFILE
fi

# output status message
[[ $RETVAL == 0 ]] && success "$DAEMON_NAME startup" || failure "$DAEMON_NAME startup"

RETVAL=$?
return $RETVAL
Expand All @@ -100,42 +100,42 @@ get_pids() {
}

do_stop () {
ppids=`get_pids | awk '{print $1}'`
if [[ $? == 0 ]]; then
for pid in $ppids; do
pkill -TERM -P $pid > /dev/null 2>&1
done

i=0
while [[ $i -lt $SHUTDOWN_TIME ]] && get_pids > /dev/null; do
sleep 1
(( i = i + 1 ))
done

ppids=`get_pids | awk '{print $1}'`
if [[ $? == 0 ]]; then
for pid in $ppids; do
pkill -TERM -P $pid > /dev/null 2>&1
done

i=0
while [[ $i -lt $SHUTDOWN_TIME ]] && get_pids > /dev/null; do
sleep 1
(( i = i + 1 ))
pkill -KILL -P $pid > /dev/null 2>&1
done

ppids=`get_pids | awk '{print $1}'`
if [[ $? == 0 ]]; then
for pid in $ppids; do
pkill -KILL -P $pid > /dev/null 2>&1
done
fi
procList=`get_pids`
if [ $? != 0 ]; then
RETVAL=0
else
RETVAL=1
fi
fi

# finally validate that the pid in the PIDFILE is gone
if [[ $RETVAL == 0 && -e $PIDFILE ]] && checkpid `cat $PIDFILE`; then
procList=`get_pids`
if [ $? != 0 ]; then
RETVAL=0
else
RETVAL=1
fi
fi

# all OK? cleanup
[[ $RETVAL == 0 ]] && rm -f $PIDFILE
# finally validate that the pid in the PIDFILE is gone
if [[ $RETVAL == 0 && -e $PIDFILE ]] && checkpid `cat $PIDFILE`; then
RETVAL=1
fi

# all OK? cleanup
[[ $RETVAL == 0 ]] && rm -f $PIDFILE

# print status message
[[ $RETVAL == 0 ]] && success "$DAEMON_NAME shutdown" || failure "$DAEMON_NAME shutdown"
# print status message
[[ $RETVAL == 0 ]] && success "$DAEMON_NAME shutdown" || failure "$DAEMON_NAME shutdown"

RETVAL=$?
return $RETVAL
Expand All @@ -147,8 +147,7 @@ function get_agent_pid() {

do_status () {
status -p $PIDFILE $DAEMON_NAME
RETVAL=$?

RETVAL=$?
if [[ $RETVAL = 0 && -z $(get_agent_pid) ]]; then
RETVAL=1
fi
Expand Down Expand Up @@ -197,31 +196,31 @@ if ! flock -w 50 200; then
exit 1
fi
(
command=$1
shift
case "$command" in
start)
do_start "$@"
;;
stop)
do_stop
;;
restart)
do_restart "$@"
;;
condrestart)
do_condrestart "$@"
;;
status)
do_status
;;
install)
do_install
;;
*)
echo "Usage: $0 {start|stop|restart|condrestart|status|install}"
exit 1
;;
esac
exit $RETVAL
) 200>&-
command=$1
shift
case "$command" in
start)
do_start "$@"
;;
stop)
do_stop
;;
restart)
do_restart "$@"
;;
condrestart)
do_condrestart "$@"
;;
status)
do_status
;;
install)
do_install
;;
*)
echo "Usage: $0 {start|stop|restart|condrestart|status|install}"
exit 1
;;
esac
exit $RETVAL
) 200>&-
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,4 @@
</plugins>
</build>

</project>
</project>
3 changes: 2 additions & 1 deletion setup
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ case "$dist" in
Ubuntu)
install_package="apt-get install -y"
init_dir=/etc/init.d
sysconfig_dir=/etc/defaults ;;
sysconfig_dir=/etc/default ;;
*)
install_package="yum install -y"
init_dir=/etc/rc.d/init.d
Expand Down Expand Up @@ -92,6 +92,7 @@ download_dependencies() {
remote_mvn_pkg="com.amazonaws:aws-java-sdk-core:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-kinesis:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-cloudwatch:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-sts:${aws_java_sdk_version} \
com.fasterxml.jackson.core:jackson-annotations:2.6.3 \
com.fasterxml.jackson.core:jackson-core:2.6.3 \
com.fasterxml.jackson.core:jackson-databind:2.6.3 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

import com.amazon.kinesis.streaming.agent.config.AgentConfiguration;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.auth.ContainerCredentialsProvider;
import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.auth.InstanceProfileCredentialsProvider;
import com.amazonaws.auth.SystemPropertiesCredentialsProvider;
import com.amazonaws.auth.ContainerCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;

public class AgentAWSCredentialsProviderChain extends AWSCredentialsProviderChain {
Expand Down
33 changes: 31 additions & 2 deletions src/com/amazon/kinesis/streaming/agent/AgentContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@
import com.amazon.kinesis.streaming.agent.tailing.FileFlowFactory;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

Expand Down Expand Up @@ -146,7 +148,7 @@ public String userAgent(ClientConfiguration config) {
public synchronized AmazonKinesisFirehose getFirehoseClient() {
if (firehoseClient == null) {
firehoseClient = new AmazonKinesisFirehoseClient(
getAwsCredentialsProvider(), getAwsClientConfiguration());
getAwsCredentialsProvider(), getAwsClientConfiguration());
if (!Strings.isNullOrEmpty(firehoseEndpoint()))
firehoseClient.setEndpoint(firehoseEndpoint());
}
Expand Down Expand Up @@ -181,7 +183,34 @@ public AmazonCloudWatch getCloudWatchClient() {
}

public AWSCredentialsProvider getAwsCredentialsProvider() {
return new AgentAWSCredentialsProviderChain(this);
AWSCredentialsProvider credentialsProvider = new AgentAWSCredentialsProviderChain(this);
final String assumeRoleARN = readString(ASSUME_ROLE_ARN, null);
if (!Strings.isNullOrEmpty(assumeRoleARN)) {
credentialsProvider =
getSTSAssumeRoleSessionCredentialsProvider(assumeRoleARN,
credentialsProvider);
}
return credentialsProvider;
}

public STSAssumeRoleSessionCredentialsProvider getSTSAssumeRoleSessionCredentialsProvider(
String roleARN, AWSCredentialsProvider credentialsProvider) {
Preconditions.checkNotNull(credentialsProvider);
final String stsEndpoint = stsEndpoint();
final String roleExternalId = readString(ASSUME_ROLE_EXTERNAL_ID, null);

STSAssumeRoleSessionCredentialsProvider.Builder builder =
new STSAssumeRoleSessionCredentialsProvider.Builder(roleARN, ASSUME_ROLE_SESSION)
.withLongLivedCredentialsProvider(credentialsProvider)
.withRoleSessionDurationSeconds(DEFAULT_ASSUME_ROLE_DURATION_SECONDS);
if (!Strings.isNullOrEmpty(roleExternalId)) {
builder = builder.withExternalId(roleExternalId);
}
if (!Strings.isNullOrEmpty(stsEndpoint)) {
builder = builder.withServiceEndpoint(stsEndpoint);
}

return builder.build();
}

public ClientConfiguration getAwsClientConfiguration() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import com.amazonaws.ClientConfiguration;

Expand All @@ -33,13 +34,18 @@ public class AgentConfiguration extends Configuration {
static final boolean DEFAULT_LOG_EMIT_INTERNAL_METRICS = false;
static final int DEFAULT_LOG_STATUS_REPORTING_PERIOD_SECONDS = 30;
static final int DEFAULT_CHECKPOINT_TTL_DAYS = 7;
protected static final int DEFAULT_ASSUME_ROLE_DURATION_SECONDS =
(int) TimeUnit.HOURS.toSeconds(1);

// NOTE: If changing the default make sure to change SHUTDOWN_TIME variable in `bin/aws-kinesis-agent` as well...
static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = 10_000L;

private static final String CW_DEFAULT_NAMESPACE = "AWSKinesisAgent";
public static final String CONFIG_ACCESS_KEY = "awsAccessKeyId";
public static final String CONFIG_SECRET_KEY = "awsSecretAccessKey";
public static final String ASSUME_ROLE_ARN = "assumeRoleARN";
public static final String ASSUME_ROLE_SESSION = "AWSKinesisAgent";
public static final String ASSUME_ROLE_EXTERNAL_ID = "assumeRoleExternalId";
public static final String SHUTDOWN_TIMEOUT_MILLIS_KEY = "shutdownTimeoutMillis";
public static final String ENDPOINT_KEY = "endpoint";
static final String LOG_FILE_KEY = "log.file";
Expand Down Expand Up @@ -191,4 +197,8 @@ public String firehoseEndpoint() {
public String cloudwatchEndpoint() {
return this.readString("cloudwatch." + ENDPOINT_KEY, null);
}

public String stsEndpoint() {
return this.readString("sts." + ENDPOINT_KEY, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,6 @@ public void testEndpointConfig() throws IOException {
Assert.assertEquals(config.firehoseEndpoint(), "https://firehose.us-east-1.amazonaws.com");
Assert.assertTrue(Strings.isNullOrEmpty(config.kinesisEndpoint()));
Assert.assertTrue(Strings.isNullOrEmpty(config.cloudwatchEndpoint()));
Assert.assertTrue(Strings.isNullOrEmpty(config.stsEndpoint()));
}
}

0 comments on commit e1d48d8

Please sign in to comment.