Skip to content

Commit

Permalink
Agent config directory for flow configurations (#113)
Browse files Browse the repository at this point in the history
* Adding support for configuration directory

* Fixing ident

* Adding config dir to the install

* Fixing try catch flow issue
  • Loading branch information
buholzer authored and chaochenq committed Dec 21, 2017
1 parent aa78921 commit 817ddb8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
2 changes: 2 additions & 0 deletions setup
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ agent_user_name=aws-kinesis-agent-user
bin_dir=/usr/bin
cron_dir=/etc/cron.d
config_dir=/etc/aws-kinesis
config_flow_dir=/etc/aws-kinesis/agent.d
jar_dir=/usr/share/${daemon_name}/lib
dependencies_dir=./dependencies
log_dir=/var/log/${daemon_name}
Expand Down Expand Up @@ -180,6 +181,7 @@ do_install () {
do_build

install -d ${config_dir}
install -d ${config_flow_dir}
install -d ${jar_dir}
install -d ${init_dir}
install -d ${cron_dir}
Expand Down
45 changes: 45 additions & 0 deletions src/com/amazon/kinesis/streaming/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@

package com.amazon.kinesis.streaming.agent;

import java.io.File;
import java.io.FileInputStream;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Map;
import java.util.HashMap;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,6 +92,8 @@ public void uncaughtException(Thread t, Throwable e) {
if (config == null) {
config = readConfigurationFile(Paths.get(opts.getConfigFile()));
}
// Read the config directory
config = readConfigurationDirectory(config);
// Initialize and start the agent
AgentContext agentContext = new AgentContext(config);
if (agentContext.flows().isEmpty()) {
Expand Down Expand Up @@ -133,6 +140,44 @@ private static AgentConfiguration tryReadConfigurationFile(Path configFile) {
return null;
}
}

private static AgentConfiguration readConfigurationDirectory(AgentConfiguration agentConfiguration) {
final String DEFAULT_CONFIG_DIRECTORY = "/etc/aws-kinesis/agent.d/";
final Logger logger = Logging.getLogger(Agent.class);

File configDir = new File(DEFAULT_CONFIG_DIRECTORY);

if (!configDir.exists() || !configDir.isDirectory()) return agentConfiguration;

// Add flows from the main configuration
List<Configuration> flows = new LinkedList();
flows.addAll((List<Configuration>) agentConfiguration.getConfigMap().get("flows"));

// Read all configuration files
File[] configFiles = configDir.listFiles();

Configuration config = null;

for (File file : configFiles) {
if (file.isFile()) {
try {
logger.info("Reading flow configuration from file: " + file.getName());
config = Configuration.get(new FileInputStream(file));
if ((config != null) && config.containsKey("flows"))
flows.addAll((List<Configuration>) config.getConfigMap().get("flows"));
} catch (Exception ex) {
logger.warn("Error reading configuration file, ignoring - " + file.getName());
}
}
}

logger.info("Found " + flows.size() + " configured flow(s)");

// Append flows
HashMap<String, Object> newConfig = new HashMap<String,Object>(agentConfiguration.getConfigMap());
newConfig.put("flows", flows);
return new AgentConfiguration(newConfig);
}

private final AgentContext agentContext;
private final HeartbeatService heartbeat;
Expand Down

0 comments on commit 817ddb8

Please sign in to comment.