Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement count windows #709

Closed
wants to merge 13 commits into from
Closed

Implement count windows #709

wants to merge 13 commits into from

Conversation

quentin-quix
Copy link
Contributor

@quentin-quix quentin-quix commented Jan 14, 2025

Implement sdf.tumbling_count_window. It creates a count-based tumbling window transformation on the StreamingDataFrame.

Tumbling windows divide messages into fixed-batch, non-overlapping windows. They allow performing stateful aggregations like sum, reduce, etc. on top of the data and emit results downstream.

This type of window close after a set count of messages.

app = Application()
sdf = app.dataframe(...)

sdf = (
    # Define a tumbling window of 10 messages
    sdf.tumbling_count_window(count=10)

    # Specify the aggregation function
    .sum()

    # Specify how the results should be emitted downstream.
    # "current()" will emit results as they come for each updated window,
    # possibly producing multiple messages per key-window pair
    # "final()" will emit windows only when they are closed and cannot
    # receive any updates anymore.
    .current()
)

@quentin-quix quentin-quix force-pushed the quent/couting-window branch 2 times, most recently from d9075c8 to 65bf164 Compare January 16, 2025 16:41
@quentin-quix quentin-quix marked this pull request as ready for review January 20, 2025 12:01
quixstreams/dataframe/dataframe.py Show resolved Hide resolved
if timestamp_ms > data["end"]:
data["end"] = timestamp_ms

data["value"].append(value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will perform similarly to reduce because the whole list must be deserialized and re-serialized on every incoming message. You must use state.add_to_collection(value) instead.

Then to collect the value for windows closure you will use state._get_values method so it will have to be made "public" for this purpose.

quixstreams/dataframe/windows/count_based.py Outdated Show resolved Hide resolved
# window is full, closing ...
state.delete(key=self.STATE_KEY)

values = state.get_from_collection(start=1, end=10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain why is this constant 1 and 10?

return [], []

# window is full, closing ...
state.delete(key=self.STATE_KEY)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should be moved by the end of the method, together with state.delete_from_collection

# window is full, closing ...
state.delete(key=self.STATE_KEY)

values = state.get_from_collection(start=1, end=10)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also maybe lets name the method get_collection and variable also collection because value in this context has slightly different meaning.

Suggested change
values = state.get_from_collection(start=1, end=10)
collection = state.get_collection(start=1, end=10)

Given that, renaming _get_values to get_collection is a good move.

@quentin-quix quentin-quix changed the title Implement tumbling_count_window Implement count windows Jan 28, 2025
@quentin-quix quentin-quix marked this pull request as draft January 29, 2025 16:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants