-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
added support for deaggregation for aggregated records #62
base: master
Are you sure you want to change the base?
Conversation
@@ -63,7 +64,10 @@ func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis. | |||
return nil, "", 0, err | |||
} | |||
|
|||
records = output.Records | |||
records, err = deaggregator.DeaggregateRecords(output.Records) | |||
if err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about swallowing this error. What are the possible errors from deaggregator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@garethlewin deaggregator throws error in case if it fails to unmarshal the aggregated record.
err := proto.Unmarshal(messageData, aggRecord) if err != nil { return nil, err }
Should we not return the actual payload without deaggregation in case of such error, as it can be possible the records have been aggregated using some custom logic not via amazon's aggregation format, so for those scenarios we should return the records as pushed. It should be up to user to deaggregate them in that case.
Hi Sorry I haven't been ignoring this, I'm just at a bit of a analysis paralysis option here. This change would make #49 very difficult (or more accurately #49 makes this more difficult). I am really also not sure how to handle erroneous situations. As I see it there are 3 options, and I dislike all 3: A) On error just send in the entire blob, this means clients now have to anticipate this happening and deal with the situation, which means that they have to be aware of deaggregation. B) On error swallow the record. This means data will be dropped, this seems very bad. C) On error return an error from kinsumer and error. The problem with this is that a checkpoint won't be created (or we are basically back to option B) ) and thus kinsumer will never be able to handle that shard again until the record expires off it. I am wondering what the benefits of implicit deaggregation are here vs having the clients do it on their side (which is what we do at Twitch, but then we use our own aggregation method and not the one that KCL supplies. |
Created pull request template to comply with SOC2.
This PR enables support for deaggregation of the records if the stream has aggregated records.