Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for deaggregation for aggregated records #62

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

psanzay
Copy link

@psanzay psanzay commented Feb 18, 2021

This PR enables support for deaggregation of the records if the stream has aggregated records.

@@ -63,7 +64,10 @@ func getRecords(k kinesisiface.KinesisAPI, iterator string) (records []*kinesis.
return nil, "", 0, err
}

records = output.Records
records, err = deaggregator.DeaggregateRecords(output.Records)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about swallowing this error. What are the possible errors from deaggregator?

Copy link
Author

@psanzay psanzay Feb 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@garethlewin deaggregator throws error in case if it fails to unmarshal the aggregated record.
err := proto.Unmarshal(messageData, aggRecord) if err != nil { return nil, err }
Should we not return the actual payload without deaggregation in case of such error, as it can be possible the records have been aggregated using some custom logic not via amazon's aggregation format, so for those scenarios we should return the records as pushed. It should be up to user to deaggregate them in that case.

@psanzay psanzay requested a review from garethlewin February 26, 2021 12:29
@garethlewin
Copy link
Contributor

Hi Sorry I haven't been ignoring this, I'm just at a bit of a analysis paralysis option here.

This change would make #49 very difficult (or more accurately #49 makes this more difficult). I am really also not sure how to handle erroneous situations.

As I see it there are 3 options, and I dislike all 3:

A) On error just send in the entire blob, this means clients now have to anticipate this happening and deal with the situation, which means that they have to be aware of deaggregation.

B) On error swallow the record. This means data will be dropped, this seems very bad.

C) On error return an error from kinsumer and error. The problem with this is that a checkpoint won't be created (or we are basically back to option B) ) and thus kinsumer will never be able to handle that shard again until the record expires off it.

I am wondering what the benefits of implicit deaggregation are here vs having the clients do it on their side (which is what we do at Twitch, but then we use our own aggregation method and not the one that KCL supplies.

Created pull request template to comply with SOC2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants