feat(bufferCountWithDebounce): add new operator #380
+142
−21
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This implementation makes a lot more sense to me. Instead of a fixed interval that runs in the background completely unrelated to the source, whenever an item is received and no timer is running, a new timeout is started for the specified time. Either the buffer runs full, in which case the timeout is cleared, and the buffer is yielded, or the timeout fires, and the partially filled buffer is yielded.
With the current implementations I get a lot of partially filled buffers because e.g:
Buffer count: 50, buffer window: 5s, source: 10 msg/s, processor: 12.5 msg/sWith a pipeline likeThe consumer has actually nothing to do with this, see #380 (comment)source -> buffer (5s) -> processor (4s)
, on the first run, you get a full buffer, but because the interval keeps running, by the time another batch of items is requested (after 4 seconds), the timeout has almost expired and only collects another 10 items (in 1 second).It makes more sense to me to start the timer once an item arrives and essentially race a full buffer versus the timer firing.
Lastly, I think it would make most sense for this operator to keep batching items in the background so that once the consumer requests the next batch, it is already present. But that's a bigger change than this one, so I thought I'd start here.