-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement count windows #709
Conversation
d9075c8
to
65bf164
Compare
65bf164
to
8c8dc92
Compare
if timestamp_ms > data["end"]: | ||
data["end"] = timestamp_ms | ||
|
||
data["value"].append(value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will perform similarly to reduce
because the whole list must be deserialized and re-serialized on every incoming message. You must use state.add_to_collection(value)
instead.
Then to collect the value for windows closure you will use state._get_values
method so it will have to be made "public" for this purpose.
# window is full, closing ... | ||
state.delete(key=self.STATE_KEY) | ||
|
||
values = state.get_from_collection(start=1, end=10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you explain why is this constant 1 and 10?
return [], [] | ||
|
||
# window is full, closing ... | ||
state.delete(key=self.STATE_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe this should be moved by the end of the method, together with state.delete_from_collection
# window is full, closing ... | ||
state.delete(key=self.STATE_KEY) | ||
|
||
values = state.get_from_collection(start=1, end=10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also maybe lets name the method get_collection
and variable also collection
because value
in this context has slightly different meaning.
values = state.get_from_collection(start=1, end=10) | |
collection = state.get_collection(start=1, end=10) |
Given that, renaming _get_values
to get_collection
is a good move.
da129c1
to
39a13fc
Compare
39a13fc
to
0f1800e
Compare
Implement
sdf.tumbling_count_window
. It creates a count-based tumbling window transformation on the StreamingDataFrame.Tumbling windows divide messages into fixed-batch, non-overlapping windows. They allow performing stateful aggregations like
sum
,reduce
, etc. on top of the data and emit results downstream.This type of window close after a set count of messages.