Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bufferCountWithDebounce): add new operator #380

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions spec/asynciterable/buffercountortime-spec.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
import '../asynciterablehelpers.js';
import { of, concat } from 'ix/asynciterable/index.js';
import { bufferCountOrTime, delay } from 'ix/asynciterable/operators/index.js';
import { of, concat, toArray, interval } from 'ix/asynciterable/index.js';
import { bufferCountOrTime, delay, take } from 'ix/asynciterable/operators/index.js';

test('buffer count behaviour', async () => {
const result: number[][] = [];
const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0);

await of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
.pipe(bufferCountOrTime(5, 10))
.forEach((buf) => {
result.push(buf);
});
const res = source.pipe(bufferCountOrTime(5, 10));

expect(result).toEqual([
expect(await toArray(res)).toEqual([
[1, 2, 3, 4, 5],
[6, 7, 8, 9, 0],
]);
});

test('buffer time behaviour', async () => {
const result: number[][] = [];
const seq = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11)));
await seq.pipe(bufferCountOrTime(5, 10)).forEach((buf) => {
result.push(buf);
});
const source = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11)));

expect(result).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]);
const res = source.pipe(bufferCountOrTime(5, 10));

expect(await toArray(res)).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]);
});

test('continues the timer in the background', async () => {
const source = interval(10);

const res = source.pipe(bufferCountOrTime(3, 45));

expect(await toArray(res.pipe(take(2)))).toEqual([[0, 1, 2], [3]]);
});
33 changes: 33 additions & 0 deletions spec/asynciterable/buffercountwithdebounce-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import '../asynciterablehelpers.js';
import { of, concat, interval, toArray } from 'ix/asynciterable/index.js';
import { bufferCountWithDebounce, delay, take } from 'ix/asynciterable/operators/index.js';

test('buffer count behaviour', async () => {
const source = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0);

const res = source.pipe(bufferCountWithDebounce(5, 10));

expect(await toArray(res)).toEqual([
[1, 2, 3, 4, 5],
[6, 7, 8, 9, 0],
]);
});

test('buffer time behaviour', async () => {
const source = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11)));

const res = source.pipe(bufferCountWithDebounce(5, 10));

expect(await toArray(res)).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]);
});

test('fills buffers', async () => {
const source = interval(10);

const res = source.pipe(bufferCountWithDebounce(3, 45));

expect(await toArray(res.pipe(take(2)))).toEqual([
[0, 1, 2],
[3, 4, 5],
]);
});
12 changes: 6 additions & 6 deletions src/asynciterable/operators/buffercountortime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ const ended = {};

class BufferCountOrTime<TSource> extends AsyncIterableX<TSource[]> {
constructor(
private readonly source: AsyncIterable<TSource>,
private readonly bufferSize: number,
private readonly maxWaitTime: number
private readonly _source: AsyncIterable<TSource>,
private readonly _bufferSize: number,
private readonly _maxWaitTime: number
) {
super();
}

async *[Symbol.asyncIterator](signal?: AbortSignal) {
const buffer: TSource[] = [];
const timer = interval(this.maxWaitTime).pipe(map(() => timerEvent));
const source = concat(this.source, of(ended));
const timer = interval(this._maxWaitTime).pipe(map(() => timerEvent));
const source = concat(this._source, of(ended));
const merged = merge(source, timer);

for await (const item of wrapWithAbort(merged, signal)) {
Expand All @@ -29,7 +29,7 @@ class BufferCountOrTime<TSource> extends AsyncIterableX<TSource[]> {
if (item !== timerEvent) {
buffer.push(item as TSource);
}
if (buffer.length >= this.bufferSize || (buffer.length && item === timerEvent)) {
if (buffer.length >= this._bufferSize || (buffer.length && item === timerEvent)) {
yield buffer.slice();
buffer.length = 0;
}
Expand Down
85 changes: 85 additions & 0 deletions src/asynciterable/operators/buffercountwithdebounce.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { OperatorAsyncFunction } from '../../interfaces.js';
import { AsyncIterableX, concat, of } from '../index.js';
import { merge } from '../merge.js';
import { wrapWithAbort } from './withabort.js';
import { AsyncSink } from '../asyncsink.js';
import type { bufferCountOrTime } from './buffercountortime.js'; // Used only in jsdoc

const timerEvent = Symbol('BufferCountWithDebounce:TimerEvent');
const endedEvent = Symbol('BufferCountWithDebounce:EndedEvent');

class BufferCountWithDebounce<TSource> extends AsyncIterableX<TSource[]> {
constructor(
private readonly _source: AsyncIterable<TSource>,
private readonly _bufferSize: number,
private readonly _maxWaitTime: number
) {
super();
}

async *[Symbol.asyncIterator](signal?: AbortSignal) {
const buffer: TSource[] = [];

const timeoutSink = new AsyncSink<typeof timerEvent>();
const merged = merge(concat(this._source, of(endedEvent)), timeoutSink);

let timeout: NodeJS.Timeout | undefined;

try {
for await (const item of wrapWithAbort(merged, signal)) {
if (!timeout) {
timeout = setTimeout(() => {
timeoutSink.write(timerEvent);
}, this._maxWaitTime);
}

if (item === endedEvent) {
break;
}

if (item !== timerEvent) {
buffer.push(item);
}

if (buffer.length >= this._bufferSize || (item === timerEvent && buffer.length > 0)) {
clearTimeout(timeout);
timeout = undefined;

yield buffer.slice();
buffer.length = 0;
}
}

if (buffer.length) {
yield buffer;
}
} finally {
if (timeout) {
clearTimeout(timeout);
}
}
}
}

/**
* Projects each element of an async-iterable sequence into consecutive buffers
* which are emitted when either the threshold count or time is met.
*
* @see https://github.com/ReactiveX/IxJS/pull/380#issuecomment-2637936141 for the difference between {@link bufferCountOrTime} and {@link bufferCountWithDebounce}.
*
* @template TSource The type of elements in the source sequence.
* @param {number} count The size of the buffer.
* @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer
* @returns {OperatorAsyncFunction<TSource, TSource[]>} An operator which returns an async-iterable sequence
* of buffers
*/
export function bufferCountWithDebounce<TSource>(
count: number,
time: number
): OperatorAsyncFunction<TSource, TSource[]> {
return function bufferOperatorFunction(
source: AsyncIterable<TSource>
): AsyncIterableX<TSource[]> {
return new BufferCountWithDebounce<TSource>(source, count, time);
};
}
1 change: 1 addition & 0 deletions src/asynciterable/operators/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export * from './batch.js';
export * from './buffer.js';
export * from './buffercountortime.js';
export * from './buffercountwithdebounce.js';
export * from './catcherror.js';
export * from './combinelatestwith.js';
export * from './concatall.js';
Expand Down