Skip to content

Commit

Permalink
Changed the ordering of configuration loading, added additional debug (
Browse files Browse the repository at this point in the history
…#90)

* Changed the ordering of configuration loadin, added additional debug logs to LogToJSONDataConverter and included extra depenedencies in the pom file

* Added logging to CSVToJSONDataConverter and removed debug messages on each parse in LogToJSONDataConverter

* Changed logging to use a single method call instead of making it a property of the class
  • Loading branch information
cyb3rd0g1 authored and chaochenq committed Dec 22, 2017
1 parent 62c45be commit 2062ecb
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 17 deletions.
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
</properties>

<dependencies>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.11</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-core</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion src/com/amazon/kinesis/streaming/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static void main(String[] args) throws Exception {
String configFile = opts.getConfigFile();
AgentConfiguration config = tryReadConfigurationFile(Paths.get(opts.getConfigFile()));
Path logFile = opts.getLogFile() != null ? Paths.get(opts.getLogFile()) : (config != null ? config.logFile() : null);
String logLevel = opts.getLogLevel() != null ? opts.getLogLevel() : (config != null ? config.logLevel() : null);
String logLevel = config != null ? config.logLevel() : (opts.getLogLevel() != null ? opts.getLogLevel() : null );
int logMaxBackupFileIndex = (config != null ? config.logMaxBackupIndex() : -1);
long logMaxFileSize = (config != null ? config.logMaxFileSize() : -1L);
Logging.initialize(logFile, logLevel, logMaxBackupFileIndex, logMaxFileSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.amazon.kinesis.streaming.agent.processing.interfaces.IDataConverter;
import com.amazon.kinesis.streaming.agent.processing.interfaces.IJSONPrinter;
import com.amazon.kinesis.streaming.agent.processing.utils.ProcessingUtilsFactory;
import org.slf4j.Logger;
import com.amazon.kinesis.streaming.agent.Logging;

/**
* Convert a CSV record into JSON record.
Expand All @@ -44,14 +46,17 @@
*
*/
public class CSVToJSONDataConverter implements IDataConverter {


protected final Logger logger;
private static String FIELDS_KEY = "customFieldNames";
private static String DELIMITER_KEY = "delimiter";
private final List<String> fieldNames;
private final String delimiter;
private final IJSONPrinter jsonProducer;


public CSVToJSONDataConverter(Configuration config) {
this.logger = Logging.getLogger(getClass());
fieldNames = config.readList(FIELDS_KEY, String.class);
delimiter = config.readString(DELIMITER_KEY, ",");
jsonProducer = ProcessingUtilsFactory.getPrinter(config);
Expand All @@ -73,6 +78,7 @@ public ByteBuffer convert(ByteBuffer data) throws DataConversionException {
try {
recordMap.put(fieldNames.get(i), columns[i]);
} catch (ArrayIndexOutOfBoundsException e) {
Logging.getLogger(getClass()).debug("Null field in CSV detected");
recordMap.put(fieldNames.get(i), null);
} catch (Exception e) {
throw new DataConversionException("Unable to create the column map", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2014-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* or in the "license" file accompanying this file.
* This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and limitations under the License.
*/
package com.amazon.kinesis.streaming.agent.processing.processors;
Expand All @@ -26,28 +26,32 @@
import com.amazon.kinesis.streaming.agent.processing.interfaces.IJSONPrinter;
import com.amazon.kinesis.streaming.agent.processing.interfaces.ILogParser;
import com.amazon.kinesis.streaming.agent.processing.utils.ProcessingUtilsFactory;
import org.slf4j.Logger;
import com.amazon.kinesis.streaming.agent.Logging;

/**
* Parse the log entries from log file, and convert the log entries into JSON.
*
*
* Configuration of this converter looks like:
* {
* "optionName": "LOGTOJSON",
* "logFormat": "COMMONAPACHELOG",
* "matchPattern": "OPTIONAL_REGEX",
* "customFieldNames": [ "column1", "column2", ... ]
* }
*
*
* @author chaocheq
*
*/
public class LogToJSONDataConverter implements IDataConverter {


protected final Logger logger;
private List<String> fields;
private ILogParser logParser;
private IJSONPrinter jsonProducer;

public LogToJSONDataConverter(Configuration config) {
this.logger = Logging.getLogger(getClass());
jsonProducer = ProcessingUtilsFactory.getPrinter(config);
logParser = ProcessingUtilsFactory.getLogParser(config);
if (config.containsKey(ProcessingUtilsFactory.CUSTOM_FIELDS_KEY)) {
Expand All @@ -58,23 +62,25 @@ public LogToJSONDataConverter(Configuration config) {
@Override
public ByteBuffer convert(ByteBuffer data) throws DataConversionException {
String dataStr = ByteBuffers.toString(data, StandardCharsets.UTF_8);

// Preserve the NEW_LINE at the end of the JSON record
if (dataStr.endsWith(NEW_LINE)) {
dataStr = dataStr.substring(0, (dataStr.length() - NEW_LINE.length()));
}

Map<String, Object> recordMap;

try {
recordMap = logParser.parseLogRecord(dataStr, fields);
} catch (LogParsingException e) {
// ignore the record if a LogParsingException is thrown
// the record is filtered out in this case

Logging.getLogger(getClass()).debug("Exception while parsing log: " + dataStr + ": " + e.toString());
return null;
}

String dataJson = jsonProducer.writeAsString(recordMap) + NEW_LINE;
return ByteBuffer.wrap(dataJson.getBytes(StandardCharsets.UTF_8));
}
}
}

0 comments on commit 2062ecb

Please sign in to comment.