Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bufferCountWithDebounce): add new operator #380

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jeengbe
Copy link

@jeengbe jeengbe commented Feb 4, 2025

This implementation makes a lot more sense to me. Instead of a fixed interval that runs in the background completely unrelated to the source, whenever an item is received and no timer is running, a new timeout is started for the specified time. Either the buffer runs full, in which case the timeout is cleared, and the buffer is yielded, or the timeout fires, and the partially filled buffer is yielded.
With the current implementations I get a lot of partially filled buffers because e.g:

Buffer count: 50, buffer window: 5s, source: 10 msg/s, processor: 12.5 msg/s

With a pipeline like source -> buffer (5s) -> processor (4s), on the first run, you get a full buffer, but because the interval keeps running, by the time another batch of items is requested (after 4 seconds), the timeout has almost expired and only collects another 10 items (in 1 second). The consumer has actually nothing to do with this, see #380 (comment)

It makes more sense to me to start the timer once an item arrives and essentially race a full buffer versus the timer firing.


Lastly, I think it would make most sense for this operator to keep batching items in the background so that once the consumer requests the next batch, it is already present. But that's a bigger change than this one, so I thought I'd start here.

@trxcllnt
Copy link
Member

trxcllnt commented Feb 5, 2025

Rather than redefining bufferTimeOrCount, why not make this a new operator? From your description, it sounds like you'd like a buffer operator with auditTime semantics?

@jeengbe
Copy link
Author

jeengbe commented Feb 5, 2025

I think this is how bufferCountOrTime should have worked from the beginning. It sounds more like a bug/issue with the current implementation that even though the timer is active during the succeeding operation (i.e. while the bufferCountOrTime generator is suspended), the source is not being pulled further.

The following test should pass, but currently doesn't.

it('should fill buffers', async () => {
  const sourceDelay = 900;
  const bufferSize = 3;
  const maxWaitTime = 4000;

  // maxWaitTime > sourceDelay * (bufferSize + 1)
  //
  // Setting 'maxWaitTime > sourceDelay * bufferSize' causes the timeout to finish when the 'bufferCountOrTime' generator is suspended,
  // so the next time a new value is polled, it will be the timerEvent. However, because
  // we check 'buffer.length > 0' in the case that we receive a timer event, it is necessary
  // that we wait 'bufferSize + 1' times so that the buffer fills with one more element, and it
  // is yielded.

  // Essentially, because maxWaitTime > sourceDelay * bufferSize, it causes the timeout to finish
  // right in the middle of the second "filling" of the buffer, so it's yielded half-empty.

  const source = interval(sourceDelay);

  const res = source.pipe(bufferCountOrTime(bufferSize, maxWaitTime));

  await expect(toArray(res.pipe(take(2)))).resolves.toEqual([
    [0, 1, 2],
    [3, 4, 5],
  ]); // Actually gives [[0, 1, 2], [3]]
});

This is my opinion on how the semantics of bufferCountOrTime should work, though it's not a strong opinion. If you still think it makes more sense to move this to a separate operator, I will gladly do so.

@trxcllnt
Copy link
Member

trxcllnt commented Feb 5, 2025

Ah, I see the confusion. The bufferCountOrTime operator yields a value either when its buffer is full, or on each dueTime interval. I do think the behavior you're looking for is valuable, it's just a different operator, something like bufferCountWithDebounce?

@jeengbe jeengbe force-pushed the je-buffercountortime-start-with-item branch from ceff6a7 to ecf009b Compare February 6, 2025 07:04
@jeengbe jeengbe changed the title fix(bufferCountOrTime): start timer only with first item feat(bufferCountWithDebounce): add new operator Feb 9, 2025
@jeengbe
Copy link
Author

jeengbe commented Feb 9, 2025

Fixed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants