Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kinesis plugin eventually stops collecting messages, halting pipelines #35

Open
andrewrosezen opened this issue Nov 14, 2017 · 7 comments

Comments

@andrewrosezen
Copy link

I've been using this plugin to process a few kinesis streams, and it has generally worked very well. However, I've noticed that every few days, my nodes will stop sending data to ES. When I check on containers that should be forwarding data from these streams, I see the following in the logs:

11/13/2017 12:19:26 PMNov 13, 2017 8:19:26 PM com.amazonaws.http.JsonErrorResponseHandler parseJsonContent
11/13/2017 12:19:26 PMINFO: Unable to parse HTTP response content
11/13/2017 12:19:26 PMcom.fasterxml.jackson.core.JsonParseException: Illegal length for VALUE_STRING: 2477946696968059424
11/13/2017 12:19:26 PM at [Source: [B@4c8238be; line: -1, column: 9]
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.dataformat.cbor.CBORParser._decodeExplicitLength(CBORParser.java:2625)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.dataformat.cbor.CBORParser._finishTextToken(CBORParser.java:1800)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.dataformat.cbor.CBORParser.getText(CBORParser.java:1216)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.databind.deser.std.BaseNodeDeserializer.deserializeAny(JsonNodeDeserializer.java:315)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:73)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.databind.deser.std.JsonNodeDeserializer.deserialize(JsonNodeDeserializer.java:15)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3807)
11/13/2017 12:19:26 PM	at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2355)
11/13/2017 12:19:26 PM	at com.amazonaws.http.JsonErrorResponseHandler$JsonContent.parseJsonContent(JsonErrorResponseHandler.java:179)
11/13/2017 12:19:26 PM	at com.amazonaws.http.JsonErrorResponseHandler$JsonContent.<init>(JsonErrorResponseHandler.java:169)
11/13/2017 12:19:26 PM	at com.amazonaws.http.JsonErrorResponseHandler$JsonContent.createJsonContent(JsonErrorResponseHandler.java:162)
11/13/2017 12:19:26 PM	at com.amazonaws.http.JsonErrorResponseHandler.handle(JsonErrorResponseHandler.java:64)
11/13/2017 12:19:26 PM	at com.amazonaws.http.JsonErrorResponseHandler.handle(JsonErrorResponseHandler.java:36)
11/13/2017 12:19:26 PM	at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1347)
11/13/2017 12:19:26 PM	at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:924)
11/13/2017 12:19:26 PM	at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:702)
11/13/2017 12:19:26 PM	at com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:454)
11/13/2017 12:19:26 PM	at com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:416)
11/13/2017 12:19:26 PM	at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:365)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:1998)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:1968)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:967)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy.get(KinesisProxy.java:156)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.proxies.MetricsCollectingKinesisProxyDecorator.get(MetricsCollectingKinesisProxyDecorator.java:74)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher.getRecords(KinesisDataFetcher.java:68)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResultAndRecordMillisBehindLatest(ProcessTask.java:284)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.getRecordsResult(ProcessTask.java:249)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:120)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49)
11/13/2017 12:19:26 PM	at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24)
11/13/2017 12:19:26 PM	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
11/13/2017 12:19:26 PM	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
11/13/2017 12:19:26 PM	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
11/13/2017 12:19:26 PM	at java.lang.Thread.run(Thread.java:748)

Resource usage is minimal on affected nodes when they are in this state, and restarting the node resolves the issue. Left alone, affected nodes do not begin processing messages after any amount of time.

@codekitchen
Copy link
Collaborator

Hm this looks like Amazon's KCL library getting an HTTP error response to a Kinesis request, and then getting another error trying to parse the error response. It's probably worth reporting to them, as well.

However, the theory is that the worker should recover after an error like this, so there's likely another problem on our end that needs to be fixed.

@codekitchen
Copy link
Collaborator

Actually, while I'm being hindered here by not having much Java experience, it looks like this is purely an issue with the KCL library. The KCL only catches RuntimeException and the JsonParseException is not a RuntimeException, so the entire thread dies instead of backing off and retrying as it was designed to.

I think you'll need to report this to the KCL Project (or AWS premium support if you have that) so that they can debug with you.

@pfifer
Copy link

pfifer commented Nov 20, 2017

Java uses checked exception, which the JsonParseException is one of, that requires methods to declare if they throw a checked exception. So while JsonParseException is a checked exception so there isn't a way for it actually reach the method you linked to. In this case the AWS Java SDK handles the conversion of the JsonParseException to a AmazonServiceException which is a runtime exception. The stack trace above doesn't seem to show the points where the exception is wrapped, and thrown as a different exception. This exception should be handled by the KCL, and the request would be retried after the normal backoff period.

It seems something else could be causing the problem. One thing I would recommend is setting KinesisClientLibConfiguration#withCallProcessRecordsEvenForEmptyRecordList to true. This will cause the KCL to call the processRecords method every time it gets a response from Kinesis, regardless if it got any records or not. This can be used to log information about every request instead of those that contain records.

@pfifer
Copy link

pfifer commented Dec 18, 2017

The specific error you're seeing is related to aws/aws-sdk-java#1106. The error should be handled by the KCL, and automatically retried.

It's possible you're are running into awslabs/amazon-kinesis-client#185, we do have some mitigations that can be applied to handle it, and are investigating the issue still.

@codekitchen
Copy link
Collaborator

Thanks for the analysis, I figured it was likely my lack of Java experience meant I was misunderstanding that exception issue.

@royshadmon
Copy link

This bug is still occurring. What's the best way to fix it?
Also, how do you set KinesisClientLibConfiguration#withCallProcessRecordsEvenForEmptyRecordList to true?

Thanks in advance!

@w4 w4 mentioned this issue Jan 14, 2019
@jeffalder
Copy link

jeffalder commented Jan 17, 2020

2477946696968059424 in hex is 0x22636F6465223A20, which is ascii for "code":<space>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants