Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update StockTradesWriter.java #4

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
public class StockTradesWriter {

private static final Log LOG = LogFactory.getLog(StockTradesWriter.class);
private static boolean table_exist = false;

private static void checkUsage(String[] args) {
if (args.length != 2) {
Expand All @@ -49,29 +50,61 @@ private static void checkUsage(String[] args) {
}
}

/**
* Checks if the stream exists and is active
*
* @param kinesisClient Amazon Kinesis client instance
* @param streamName Name of stream
*/
private static void validateStream(AmazonKinesis kinesisClient, String streamName) {
try {
DescribeStreamResult result = kinesisClient.describeStream(streamName);
if(!"ACTIVE".equals(result.getStreamDescription().getStreamStatus())) {
System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again.");
System.exit(1);
}
} catch (ResourceNotFoundException e) {
System.err.println("Stream " + streamName + " does not exist. Please create it in the console.");
System.err.println(e);
System.exit(1);
} catch (Exception e) {
System.err.println("Error found while describing the stream " + streamName);
System.err.println(e);
System.exit(1);
}
}
/**
* Checks if the stream exists and is active and if not creates another stream
* Make sure the user has access for describe and List streams on kinesis
*
* @param shardCount
*
* @param kinesisClient
* Amazon Kinesis client instance
* @param streamName
* Name of stream
* @throws InterruptedException
*/
private static void validateStream(AmazonKinesis kinesisClient, String streamName) throws InterruptedException {

String str = streamName;
com.amazonaws.services.kinesis.model.ListStreamsResult list_stream_obj = kinesisClient.listStreams();


ArrayList<String> stream_names_list = (ArrayList<String>) list_stream_obj.getStreamNames();
Iterator<String> itr = stream_names_list.iterator();
while (itr.hasNext()) {
String streams = (String) itr.next();

if (streams.equals(str)) {
System.out.println("The stream specified exists and we will push sample data..");
table_exist = true;
}

}

if (table_exist == false) {
LOG.info("The stream specified does not exists..we will be creating it..");
CreateStreamRequest createRequest = new CreateStreamRequest();
createRequest.setShardCount(1);
createRequest.setStreamName(streamName);
kinesisClient.createStream(createRequest);
}

DescribeStreamResult result = kinesisClient.describeStream(streamName);
String tabl_stat = result.getStreamDescription().getStreamStatus();
//LOG.info("Table status is..." + tabl_stat);

while ((result.getStreamDescription().getStreamStatus()).equals("UPDATING")
|| (result.getStreamDescription().getStreamStatus()).equals("CREATING")) {
Thread.sleep(10000);
LOG.info("Sleeping as shard not active yet..");

result = kinesisClient.describeStream(streamName);
tabl_stat = result.getStreamDescription().getStreamStatus();

}
LOG.info("Table status is..." + tabl_stat);

}


/**
* Uses the Kinesis client to send the stock trade to the given stream.
Expand Down