Skip to content

Commit

Permalink
EC2 metadata processor (#112)
Browse files Browse the repository at this point in the history
* Adding EC2 aws sdk package

* Adding EC2 aws sdk dependency

* Initial commit

* Fixing configuration example

* Adding support for the AddEC2MetadataConverter

* Adding support for metadata auto refresh

* Updated metadata timestamp to iso time format

* Fine tuning log messages
  • Loading branch information
buholzer authored and chaochenq committed Dec 21, 2017
1 parent 807ed63 commit aa78921
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<artifactId>aws-java-sdk-sts</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ec2</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down
1 change: 1 addition & 0 deletions setup
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ download_dependencies() {
com.amazonaws:aws-java-sdk-kinesis:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-cloudwatch:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-sts:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-ec2:${aws_java_sdk_version} \
com.fasterxml.jackson.core:jackson-annotations:2.6.3 \
com.fasterxml.jackson.core:jackson-core:2.6.3 \
com.fasterxml.jackson.core:jackson-databind:2.6.3 \
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/

package com.amazon.kinesis.streaming.agent.processing.processors;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.Date;
import java.text.SimpleDateFormat;

import com.amazon.kinesis.streaming.agent.ByteBuffers;
import com.amazon.kinesis.streaming.agent.config.Configuration;
import com.amazon.kinesis.streaming.agent.processing.exceptions.DataConversionException;
import com.amazon.kinesis.streaming.agent.processing.exceptions.LogParsingException;
import com.amazon.kinesis.streaming.agent.processing.interfaces.IDataConverter;
import com.amazon.kinesis.streaming.agent.processing.interfaces.IJSONPrinter;
import com.amazon.kinesis.streaming.agent.processing.interfaces.ILogParser;
import com.amazon.kinesis.streaming.agent.processing.utils.ProcessingUtilsFactory;
import com.amazonaws.util.EC2MetadataUtils;
import com.amazonaws.services.ec2.AmazonEC2;
import com.amazonaws.services.ec2.AmazonEC2ClientBuilder;
import com.amazonaws.services.ec2.model.DescribeTagsRequest;
import com.amazonaws.services.ec2.model.DescribeTagsResult;
import com.amazonaws.services.ec2.model.TagDescription;
import com.amazonaws.services.ec2.model.Filter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.type.TypeReference;
import org.slf4j.Logger;
import com.amazon.kinesis.streaming.agent.Logging;

/**
* Parse the log entries from log file, and convert the log entries into JSON.
*
* Configuration of this converter looks like:
* {
* "optionName": "ADDEC2METADATA",
* "logFormat": "RFC3339SYSLOG"
* }
*
* @author buholzer
*
*/
public class AddEC2MetadataConverter implements IDataConverter {

private static final Logger LOGGER = Logging.getLogger(AddEC2MetadataConverter.class);
private ILogParser logParser;
private IJSONPrinter jsonProducer;
private Map<String, Object> metadata;
private long metadataTimestamp;
private long metadataTTL = 1000 * 60 * 60; // Update metadata every hour

public AddEC2MetadataConverter(Configuration config) {
jsonProducer = ProcessingUtilsFactory.getPrinter(config);
logParser = ProcessingUtilsFactory.getLogParser(config);

if (config.containsKey("metadataTTL")) {
try {
metadataTTL = config.readInteger("metadataTTL") * 1000;
LOGGER.info("Setting metadata TTL to " + metadataTTL + " millis");
} catch(Exception ex) {
LOGGER.warn("Error converting metadataTTL, ignoring");
}
}

refreshEC2Metadata();
}

@Override
public ByteBuffer convert(ByteBuffer data) throws DataConversionException {

if ((metadataTimestamp + metadataTTL) < System.currentTimeMillis()) refreshEC2Metadata();

if (metadata == null || metadata.isEmpty()) {
LOGGER.warn("Unable to append metadata, no metadata found");
return data;
}

String dataStr = ByteBuffers.toString(data, StandardCharsets.UTF_8);

ObjectMapper mapper = new ObjectMapper();
TypeReference<LinkedHashMap<String,Object>> typeRef =
new TypeReference<LinkedHashMap<String,Object>>() {};

LinkedHashMap<String,Object> dataObj = null;
try {
dataObj = mapper.readValue(dataStr, typeRef);
} catch (Exception ex) {
throw new DataConversionException("Error converting json source data to map", ex);
}

// Appending EC2 metadata
dataObj.putAll(metadata);

String dataJson = jsonProducer.writeAsString(dataObj) + NEW_LINE;
return ByteBuffer.wrap(dataJson.getBytes(StandardCharsets.UTF_8));
}

private void refreshEC2Metadata() {
LOGGER.info("Refreshing EC2 metadata");

metadataTimestamp = System.currentTimeMillis();

try {
EC2MetadataUtils.InstanceInfo info = EC2MetadataUtils.getInstanceInfo();

metadata = new LinkedHashMap<String, Object>();
metadata.put("privateIp", info.getPrivateIp());
metadata.put("availabilityZone", info.getAvailabilityZone());
metadata.put("instanceId", info.getInstanceId());
metadata.put("instanceType", info.getInstanceType());
metadata.put("accountId", info.getAccountId());
metadata.put("amiId", info.getImageId());
metadata.put("region", info.getRegion());
metadata.put("metadataTimestamp",
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ")
.format(new Date(metadataTimestamp)));

final AmazonEC2 ec2 = AmazonEC2ClientBuilder.defaultClient();
DescribeTagsResult result = ec2.describeTags(
new DescribeTagsRequest().withFilters(
new Filter().withName("resource-id").withValues(info.getInstanceId())));
List<TagDescription> tags = result.getTags();

Map<String, Object> metadataTags = new LinkedHashMap<String, Object>();
for (TagDescription tag : tags) {
metadataTags.put(tag.getKey().toLowerCase(), tag.getValue());
}
metadata.put("tags", metadataTags);
} catch (Exception ex) {
LOGGER.warn("Error while updating EC2 metadata - " + ex.getMessage() + ", ignoring");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazon.kinesis.streaming.agent.processing.processors.LogToJSONDataConverter;
import com.amazon.kinesis.streaming.agent.processing.processors.SingleLineDataConverter;
import com.amazon.kinesis.streaming.agent.processing.processors.AddMetadataConverter;
import com.amazon.kinesis.streaming.agent.processing.processors.AddEC2MetadataConverter;

/**
* The factory to create:
Expand All @@ -42,6 +43,7 @@ public class ProcessingUtilsFactory {

public static enum DataConversionOption {
ADDMETADATA,
ADDEC2METADATA,
SINGLELINE,
CSVTOJSON,
LOGTOJSON,
Expand Down Expand Up @@ -117,6 +119,8 @@ private static IDataConverter buildConverter(DataConversionOption option, Config
switch (option) {
case ADDMETADATA:
return new AddMetadataConverter(config);
case ADDEC2METADATA:
return new AddEC2MetadataConverter(config);
case SINGLELINE:
return new SingleLineDataConverter();
case CSVTOJSON:
Expand Down

0 comments on commit aa78921

Please sign in to comment.