Skip to content

Commit

Permalink
Agent extensions library, plus data pre-processing extension support (#…
Browse files Browse the repository at this point in the history
…11)

* Add amazon-kinesis-agent-extension dependency.
Add gitignore file.

* Add data conversion functionality to parser classes.

* Implement converter test.

* Fix example converter class name.

* Fix build scripts (setup and ant).

* Add script to manually run Kinesis Agent on older Linux versions.
Add script to read N records from the stream, basically simplification for aws cli kinesis commands.
Update Agent config with "initialPosition" options.

* Fix K Stream reader script.

* Better output for K Stream reader script

* Fix K Agent manual runner: remove JAVA library directory from classpath.
  • Loading branch information
chupakabr authored and chaochenq committed Apr 11, 2016
1 parent 7bb13d4 commit 919f853
Show file tree
Hide file tree
Showing 14 changed files with 348 additions and 20 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
amazon-kinesis-agent.iml
target
21 changes: 21 additions & 0 deletions bin/aws-kinesis-agent-manual-run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/bin/bash

# Log files are in /var/log/aws-kinesis-agent/

JAVA_START_HEAP="256m"
JAVA_MAX_HEAP="512m"

JAVA_DIR="/usr/share/java"
LIB_DIR="/usr/share/aws-kinesis-agent/lib"
#CLASSPATH="$JAVA_DIR"/*:"$LIB_DIR":$(find "$LIB_DIR" -type f -name \*.jar | paste -s -d:):"$CLASSPATH"
CLASSPATH="$LIB_DIR":$(find "$LIB_DIR" -type f -name \*.jar | paste -s -d:):"$CLASSPATH"

JAVACMD="java"
JVM_ARGS="-server -Xms${JAVA_START_HEAP} -Xmx${JAVA_MAX_HEAP} $JVM_ARGS"

MAIN_CLASS="com.amazon.kinesis.streaming.agent.Agent"

exec $JAVACMD $JVM_ARGS \
-cp "$CLASSPATH" \
$MAIN_CLASS "$@"

54 changes: 54 additions & 0 deletions bin/kstream-read-latest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#!/bin/bash

# Stream name parameter (mandatory)
streamName="$1"
if [ -z "$streamName" ]; then
echo "Usage: $0 <stream-name> [latest records number to display]"
exit 1
fi

# Records number to return parameter (optional, default 10)
recordsNumber=$2
if [ -z $recordsNumber ]; then
recordsNumber=10
fi
recordsNumber=$(( recordsNumber ))

iteratorType="LATEST"
iteratorType="TRIM_HORIZON"

# Get shard ID, first shard in the list
shardId=$(aws kinesis describe-stream \
--stream-name $streamName | grep SHARDS | awk '{print $2}')
if [ -z "$shardId" ]; then
echo "Shard not found: [$shardId]"
exit 2
fi

# Get shard iterator for latest entries
shardIter=$(aws kinesis get-shard-iterator \
--stream-name "$streamName" \
--shard-id "$shardId" \
--shard-iterator-type "$iteratorType")

# Read latest records
tmpFile=$(mktemp)
kMaxReadDepth=1
kRecordsRead=0
while [ $kRecordsRead -lt $recordsNumber ] && [ $kMaxReadDepth -gt 0 ]; do
aws kinesis get-records --shard-iterator "$shardIter" --limit $recordsNumber > "$tmpFile"
foundRowsNum=$(cat "$tmpFile" | awk 'NR==1{print $1; exit}')
shardIter=$(cat "$tmpFile" | awk 'NR==1{print $2; exit}')
cat "$tmpFile" | awk '{print $3}' | while read line; do
echo ""
echo -n ">> $kRecordsRead: "
echo $line | base64 --decode
kRecordsRead=$(( kRecordsRead + 1 ))
done
kMaxReadDepth=$(( kMaxReadDepth - 1 ))
done

rm -f "$tmpFile"

echo ""

1 change: 0 additions & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
<property name="build.dependencies" location="dependencies" />

<path id="classpath">
<fileset dir="/usr/share/java" includes="**/*.jar" />
<fileset dir="${build.dependencies}" includes="**/*.jar" />
</path>

Expand Down
21 changes: 21 additions & 0 deletions configuration/example/agent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"checkpointFile": "/tmp/aws-kinesis-agent-checkpoints/main.log",
"cloudwatch.emitMetrics": true,
"cloudwatch.endpoint": "https://monitoring.us-west-2.amazonaws.com",
"kinesis.endpoint": "https://kinesis.us-west-2.amazonaws.com",
"awsAccessKeyId": "ACCESSKEY",
"awsSecretAccessKey": "SECRETKEY",
"flows": [
{
"filePattern": "/tmp/aws-kinesis-agent-test1.log*",
"initialPosition": "END_OF_FILE",
"kinesisStream": "aws-kinesis-agent-test1",
"converterClass": "com.amazon.kinesis.streaming.agent.extension.BracketsDataConverter"
},
{
"filePattern": "/tmp/aws-kinesis-agent-test2.log*",
"initialPosition": "START_OF_FILE",
"kinesisStream": "aws-kinesis-agent-test2"
}
]
}
45 changes: 45 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>${aws-java-sdk.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-agent-extension</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
Expand Down Expand Up @@ -157,6 +162,12 @@
<artifactId>lombok</artifactId>
<version>1.16.6</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>

<developers>
Expand Down Expand Up @@ -206,6 +217,40 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.10</version>
<executions>
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>test/java</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-resource</id>
<phase>generate-test-resources</phase>
<goals>
<goal>add-test-resource</goal>
</goals>
<configuration>
<resources>
<resource>
<directory>test/resources</directory>
<targetPath>resources</targetPath>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
16 changes: 13 additions & 3 deletions setup
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,17 @@ download_jar() {
path=${1//.//}
jar_name=$artifact_id-$version.jar
url=${prefix}${path}/${artifact_id}/${version}/${jar_name}
[[ -f ${dependencies_dir}/${jar_name} ]] || wget -P ${dependencies_dir} $url
[[ -f ${dependencies_dir}/${jar_name} ]] || wget -O "${dependencies_dir}/${jar_name}" -P ${dependencies_dir} $url
}

download_dependencies() {
install -d ${dependencies_dir}

echo "Downloading dependencies ..."
aws_java_sdk_version="1.10.26"


# TODO Add "com.amazonaws:amazon-kinesis-agent-extension:1.0" if put into maven central,
# use local amazon-kinesis-agent-extension JAR as a temporary workaround.
remote_mvn_pkg="com.amazonaws:aws-java-sdk-core:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-kinesis:${aws_java_sdk_version} \
com.amazonaws:aws-java-sdk-cloudwatch:${aws_java_sdk_version} \
Expand All @@ -75,11 +77,19 @@ download_dependencies() {
org.xerial:sqlite-jdbc:3.8.11.2 \
joda-time:joda-time:2.8.2 \
org.projectlombok:lombok:1.16.6"

for package in ${remote_mvn_pkg}
do
download_jar $(echo $package | tr : " ")
done

# TODO Replace this with maven dependency after extension sub-project is put into maven central
# Build and copy extension JAR if exists
local_jars="../amazon-kinesis-agent-extension/target/amazon-kinesis-agent-extension-1.0.jar"
for local_jar in ${local_jars};
do
cp -f "${local_jar}" "${dependencies_dir}/"
done
}

do_uninstall () {
Expand Down
27 changes: 14 additions & 13 deletions src/com/amazon/kinesis/streaming/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,6 @@

package com.amazon.kinesis.streaming.agent;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;

import com.amazon.kinesis.streaming.agent.config.AgentConfiguration;
import com.amazon.kinesis.streaming.agent.config.AgentOptions;
import com.amazon.kinesis.streaming.agent.config.Configuration;
Expand All @@ -41,6 +29,17 @@
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* Main class for AWS Firehose Agent.
Expand Down Expand Up @@ -86,11 +85,13 @@ public void uncaughtException(Thread t, Throwable e) {
if (config == null) {
config = readConfigurationFile(Paths.get(opts.getConfigFile()));
}
// Initialize and start the agent
// Initialize agent context
AgentContext agentContext = new AgentContext(config);
if (agentContext.flows().isEmpty()) {
throw new ConfigurationException("There are no flows configured in configuration file.");
}

// Start the agent
final Agent agent = new Agent(agentContext);

// Make sure everything terminates cleanly when process is killed
Expand Down
3 changes: 2 additions & 1 deletion src/com/amazon/kinesis/streaming/agent/AgentContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ public AgentContext(Configuration configuration, FileFlowFactory fileFlowFactory
if (containsKey("flows")) {
for (Configuration c : readList("flows", Configuration.class)) {
FileFlow<?> flow = fileFlowFactory.getFileFlow(this, c);
if (flows.containsKey(flow.getId()))
if (flows.containsKey(flow.getId())) {
throw new ConfigurationException("Duplicate flow: " + flow.getId());
}
flows.put(flow.getId(), flow);
}
}
Expand Down
47 changes: 45 additions & 2 deletions src/com/amazon/kinesis/streaming/agent/tailing/AbstractParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.amazon.kinesis.streaming.agent.tailing;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
Expand All @@ -22,6 +23,9 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import com.amazon.kinesis.streaming.agent.extension.DataConversionException;
import com.amazon.kinesis.streaming.agent.extension.DummyDataConverter;
import com.amazon.kinesis.streaming.agent.extension.IDataConverter;
import lombok.Getter;

import org.slf4j.Logger;
Expand Down Expand Up @@ -49,6 +53,7 @@ public abstract class AbstractParser<R extends IRecord> implements IParser<R> {
@Getter protected final String name;
@Getter protected final ISplitter recordSplitter;
@Getter protected final int bufferSize;
@Getter protected final IDataConverter dataConverter;

@Getter protected TrackedFile currentFile;
@VisibleForTesting
Expand All @@ -70,6 +75,7 @@ public abstract class AbstractParser<R extends IRecord> implements IParser<R> {
private final AtomicLong totalRecordsParsed = new AtomicLong();
private final AtomicLong totalRecordsLargerThanBuffer = new AtomicLong();
private final AtomicLong totalUndhandledErrors = new AtomicLong();
private final AtomicLong totalDataProcessingErrors = new AtomicLong();

public AbstractParser(FileFlow<R> flow) {
this(flow, flow.getParserBufferSize());
Expand All @@ -83,6 +89,32 @@ public AbstractParser(FileFlow<R> flow, int bufferSize) {
this.recordSplitter = this.flow.getRecordSplitter();
this.bufferSize = bufferSize;
this.logger = Logging.getLogger(getClass());

if (flow.hasConverter()) {
// Make sure we can create converter class, fail with error otherwise
try {
this.dataConverter = flow.buildConverter();
} catch (NoSuchMethodException e) {
logger.error("No public constructor defined for data converter " + IDataConverter.class, e);
throw new IllegalArgumentException(e);
} catch (ClassNotFoundException e) {
logger.error("Data converter implementation class not found for " + IDataConverter.class, e);
throw new IllegalArgumentException(e);
} catch (IllegalAccessException e) {
logger.error("No public constructor defined for data converter " + IDataConverter.class, e);
throw new IllegalArgumentException(e);
} catch (InvocationTargetException e) {
logger.error("Cannot call constructor of data converter implementing " + IDataConverter.class, e);
throw new IllegalArgumentException(e);
} catch (InstantiationException e) {
logger.error("Cannot instantiate data converter implementing " + IDataConverter.class, e);
throw new IllegalArgumentException(e);
}
} else {
// Use dummy converter implementation if non is specified. This converter basically returns
// the same data without applying any conversions
this.dataConverter = new DummyDataConverter();
}
}

@Override
Expand Down Expand Up @@ -460,11 +492,21 @@ private R buildRecord(int offset, int length) {
ByteBuffer data = ByteBuffers.getPartialView(currentBuffer, offset, length);
++recordsFromCurrentBuffer;
Preconditions.checkNotNull(currentBufferFile);
R record = buildRecord(currentBufferFile, data, toChannelOffset(offset));
totalRecordsParsed.incrementAndGet();
R record = null;
try {
record = buildRecord(currentBufferFile, convertData(data), toChannelOffset(offset));
totalRecordsParsed.incrementAndGet();
} catch (DataConversionException e) {
totalDataProcessingErrors.incrementAndGet();
logger.warn("Cannot process input data: " + e.getMessage());
}
return record;
}

private ByteBuffer convertData(ByteBuffer data) throws DataConversionException {
return dataConverter.convert(data);
}

private long toChannelOffset(int bufferOffset) {
Preconditions.checkState(currentBufferStartOffset >= 0, "Buffer start offset (%s) is expected to be non-negative!", currentBufferStartOffset);
Preconditions.checkState(bufferOffset >= 0, "Buffer offset (%s) is expected to be non-negative!", bufferOffset);
Expand All @@ -491,6 +533,7 @@ public Map<String, Object> getMetrics() {
put(className + ".TotalRecordsParsed", totalRecordsParsed);
put(className + ".TotalBytesDiscarded", totalBytesDiscarded);
put(className + ".TotalRecordsLargerThanBuffer", totalRecordsLargerThanBuffer);
put(className + ".TotalDataProcessingErrors", totalDataProcessingErrors);
put(className + ".TotalUnhandledErrors", totalUndhandledErrors);
}};
}
Expand Down
Loading

0 comments on commit 919f853

Please sign in to comment.