A collection of helper methods for WebStreams, inspired by ReactiveExtensions. Being built on-top of ReadableStream we can have a reactive-pipeline with non-blocking back-pressure built-in.
requires support for ReadableStream use a polyfill if they're not available
Subjects require support for WritableStream. Requires support for async / await.
turns an iterable source into a readable stream.
It will not try create an iterator until the result stream is read from.
from([1,2,3,4])
from(function*(){yield 1, yield 2, yield 3, yield 4};
from(async function*(){yield 1, yield 2, yield 3, await Promise.resolve(4)};
creates a ReadableStream where the chunks will be the in-order arguments passed to it
of(1, "foo", ()=>"bar", {})
concatenates several streams together in the order given.
It will not read from the streams until the result stream is read from.
let inputA = [1,2];
let inputB = [3,4];
let expected = [1,2,3,4];
let stream = concat(from(inputA), from(inputB));
let result = await toArray(stream);
await a callback method that returns a readable-stream
let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = await toArray(defer(x=>Promise.resolve(from(input))));
let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = await toArray(from([1,2,3,4]))
await exhaustion of the stream and return the last entry
let input = [1,2,3,4];
let expected = 4;
let result = await toPromise(from([1,2,3,4]));
immediately begins to read from src, passing each chunk to the next
callback and awaiting if it returns a promise.
once the source signals the end of the stream, complete
is called.
if the source stream throws an error, this is passed to the error
callback
returns a disposer method to stop reading
let src = from(function*(){yield 1, yield 2, yield 3})
subscribe(src,
(next)=>{ console.log("Next:", next);})
()=>{console.log("Complete")}
(err)=>{console.log("Error:", err)}
);
Given inconsistencies in browser support for anything other than ReadableStream, we opted to make an Operator a function of the form:
type Op<T, R> = (src:ReadableStream<T>)=>ReadableStream<R>
this only requires ReadableStream to be implemented/available with getReader support. To aid pipeline these operators, a pipe
method is available:
let input = [1, 2, 3, 4];
let expected = { "1": 1, "2": 2, "4": 4 };
let result = await toPromise(
pipe(
from(input),
filter(x => x != 3),
buffer(Infinity),
map(x => {
return x.reduce((p, c) => { p[c.toString()] = c; return p }, {});
}),
first()
));
buffer chunks until the buffer size is count
length, then enqueues the buffer and starts a new buffer
let input = [1,2,3,4];
let expected = [[1,2],[3,4]];
let stream = buffer(2)(from(input));
let result = await toArray(stream);
given a ReadableStream of ReadableStreams, concatenates the output of each stream.
let input = [from([1,2]), from([3,4]), from([5])];
let expected = [1,2,3,4,5];
let stream = concatAll()(from(input));
let result = await toArray(stream);
filter out chunks that fail a predicate
let input = [1,2,3,4];
let expected = [1,2,4];
let stream = filter(x=>x!=3)(from(input));
let result = await toArray(stream);
returns a stream of one chunk, the first to return true when passed to the selector, or simply the first if no predicate is supplied
let input = [1,2,3,4];
let expected = 3;
let stream = first(x=>x>=3)(from(input));
let result = await toPromise(stream);
returns a stream of one chunk, the last to return true when passed to the predicate, or simply the last if no predicate is supplied.
let input = [1,2,3,4];
let expected = 3;
let stream = last(x=>x<4)(from(input));
let result = await toPromise(stream);
given a stream of T and selector f(T)->R, return a stream of R, for all f(T) != undefined
let input = [1,2,3,4];
let expected = [2,4,6,8];
let stream = map(x=>x*2)(from(input));
let result = await toArray(stream);
skip count
elements and then stream the rest to the output
let input = [1,2,3,4,5];
let expected = [3,4,5];
let stream = pipe(from(input), skip(2));
let result = await toArray(stream);
take count
elements and close
let input = [1,2,3,4,5];
let expected = [1,2];
let stream = pipe(from(input), take(2));
let result = await toArray(stream);
allows observing each chunk, but the output is exactly the same as in the input.
let input = [1,2,3,4];
let expected = [1,2,3,4];
let result = []
let stream = tap(x=>result.push(x))(from(input));
let result = await toPromise(stream); //execute
throws an error if the duration between chunks exceeds the duration (milliseconds)
Subjects are duplex streams with automatic tee'ing of the readable. i.e. each access call to subject.readable
returns a new ReadableStream.
proof of concept - its likely there are cases not covered by the tests.
a Subject instance has the following members:
readable: ReadableStream<T>;
writable: WritableStream<T>;
next(value:T): number;
complete(): void;
error(err): void;
you can pipeTo
the subject's writable
:
let input = [1, 2, 3, 4];
let subject = new Subject<number>();
let resultPromise = toArray(subject.readable);
from(input).pipeTo(subject.writable);
let result = await resultPromise;//[1,2,3,4]
or pipeThrough
the subject:
let input = [1, 2, 3, 4];
let subject = new Subject<number>();
let result = await toArray(from(input).pipeThrough(subject));
expect(result).to.be.deep.eq(expected); // [1,2,3,4]
or manually call next
, complete
, error
let subject = new Subject<number>();
let resultPromise = toArray(subject.readable);
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.complete();
let result = await resultPromise; // [1,2,3,4]
although mixing these approaches is not advised - unpredictable behavior.