Skip to content

Commit

Permalink
ADDMETADATA feature to allow addition of arbitrary metadata in JSON f…
Browse files Browse the repository at this point in the history
…ormat (#68)

* Adding ADDMETADATA configuration option to insert JSON metadata inline with record

* Adding ADDMETADATA configuration option to insert JSON metadata inline with record

* Fixing metadata field name
  • Loading branch information
zacharya authored and chaochenq committed Jun 13, 2017
1 parent 69f7b41 commit b47dbe5
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazon.kinesis.streaming.agent.processing.processors;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.LinkedHashMap;

import com.amazon.kinesis.streaming.agent.ByteBuffers;
import com.amazon.kinesis.streaming.agent.config.Configuration;
import com.amazon.kinesis.streaming.agent.processing.exceptions.DataConversionException;
import com.amazon.kinesis.streaming.agent.processing.interfaces.IDataConverter;
import com.amazon.kinesis.streaming.agent.processing.interfaces.IJSONPrinter;
import com.amazon.kinesis.streaming.agent.processing.utils.ProcessingUtilsFactory;

/**
* Build record as JSON object with a "metadata" key for arbitrary KV pairs
* and "message" key with the raw data
*
* Remove leading and trailing spaces for each line
*
* Configuration looks like:
*
* {
* "optionName": "ADDMETADATA",
* "metadata": {
* "key": "value",
* "foo": {
* "bar": "baz"
* }
* }
* }
*
* @author zacharya
*
*/
public class AddMetadataConverter implements IDataConverter {

private Object metadata;
private final IJSONPrinter jsonProducer;

public AddMetadataConverter(Configuration config) {
metadata = config.getConfigMap().get("metadata");
jsonProducer = ProcessingUtilsFactory.getPrinter(config);
}

@Override
public ByteBuffer convert(ByteBuffer data) throws DataConversionException {

final Map<String, Object> recordMap = new LinkedHashMap<String, Object>();
String dataStr = ByteBuffers.toString(data, StandardCharsets.UTF_8);

if (dataStr.endsWith(NEW_LINE)) {
dataStr = dataStr.substring(0, (dataStr.length() - NEW_LINE.length()));
}

recordMap.put("metadata", metadata);
recordMap.put("data", dataStr);

String dataJson = jsonProducer.writeAsString(recordMap) + NEW_LINE;
return ByteBuffer.wrap(dataJson.getBytes(StandardCharsets.UTF_8));
}

@Override
public String toString() {
return getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.amazon.kinesis.streaming.agent.processing.processors.CSVToJSONDataConverter;
import com.amazon.kinesis.streaming.agent.processing.processors.LogToJSONDataConverter;
import com.amazon.kinesis.streaming.agent.processing.processors.SingleLineDataConverter;
import com.amazon.kinesis.streaming.agent.processing.processors.AddMetadataConverter;

/**
* The factory to create:
Expand All @@ -40,6 +41,7 @@
public class ProcessingUtilsFactory {

public static enum DataConversionOption {
ADDMETADATA,
SINGLELINE,
CSVTOJSON,
LOGTOJSON,
Expand Down Expand Up @@ -113,6 +115,8 @@ private static ILogParser buildLogParser(LogFormat format, String matchPattern,

private static IDataConverter buildConverter(DataConversionOption option, Configuration config) throws ConfigurationException {
switch (option) {
case ADDMETADATA:
return new AddMetadataConverter(config);
case SINGLELINE:
return new SingleLineDataConverter();
case CSVTOJSON:
Expand Down

0 comments on commit b47dbe5

Please sign in to comment.