Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Adds cachedFlatMap operator #20

Merged
merged 15 commits into from
May 7, 2024
Merged
5 changes: 5 additions & 0 deletions .changeset/tame-geese-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"windpipe": minor
---

Adds the `cachedFlatMap` operator
48 changes: 48 additions & 0 deletions src/stream/higher-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,54 @@ export class HigherOrderStream<T, E> extends StreamTransforms<T, E> {
);
}

/**
* Map over each value in the stream, produce a stream from it, cache the resultant stream
* and flatten all the value streams together
*
* @group Higher Order
*/
cachedFlatMap<U>(
cb: (value: T) => MaybePromise<Stream<U, E>>,
keyFn: (value: T) => string | number | symbol,
): Stream<U, E> {
const trace = this.trace("cachedFlatMap");

return this.consume(async function* (it) {
const cache = new Map<string | number | symbol, Atom<U, E>[]>();
mdboon marked this conversation as resolved.
Show resolved Hide resolved

for await (const atom of it) {
if (!isOk(atom)) {
yield atom;
continue;
}

const key = keyFn(atom.value);
const cachedValues = cache.get(key);

if (cachedValues !== undefined) {
yield* cachedValues;
continue;
}

// Run the flat map handler
const streamAtom = await run(() => cb(atom.value), trace);

// If an error was emitted whilst initialising the new stream, return it
if (!isOk(streamAtom)) {
yield streamAtom;
continue;
}

// Otherwise, consume the iterator
const values = await streamAtom.value.toArray({ atoms: true });

cache.set(key, values);

yield* values;
}
});
}

/**
* Produce a new stream from the stream that has any nested streams flattened
*
Expand Down
105 changes: 105 additions & 0 deletions test/higher-order.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,111 @@ describe.concurrent("higher order streams", () => {
});
});

describe.concurrent("cachedFlatMap", () => {
test("lookup non-repeating strings returning single atom", async ({ expect }) => {
expect.assertions(2);

const lookup = vi.fn((param: string) => $.of(param));

const s = $.from(["a", "b", "c"]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([$.ok("a"), $.ok("b"), $.ok("c")]);
expect(lookup).toBeCalledTimes(3);
});

test("lookup repeating strings returning single atom", async ({ expect }) => {
expect.assertions(2);

const lookup = vi.fn((param: string) => $.of(param));

const s = $.from(["a", "b", "c", "a", "a"]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok("a"),
$.ok("b"),
$.ok("c"),
$.ok("a"),
$.ok("a"),
]);
expect(lookup).toBeCalledTimes(3);
});

test("lookup repeating numbers returning multiple atoms", async ({ expect }) => {
expect.assertions(2);

const lookup = vi.fn((n: number) => $.fromArray([n, n * 2, n * 4]));

const s = $.from([1, 100, 200, 1, 10]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(2),
$.ok(4),
$.ok(100),
$.ok(200),
$.ok(400),
$.ok(200),
$.ok(400),
$.ok(800),
$.ok(1),
$.ok(2),
$.ok(4),
$.ok(10),
$.ok(20),
$.ok(40),
]);
expect(lookup).toBeCalledTimes(4);
});

test("lookup repeating numbers returning multiple atoms", async ({ expect }) => {
expect.assertions(2);

const oneHundredDividedBy = vi.fn((n: number) => {
if (n === 0) {
throw "Cannot divide by zero!";
}

return $.of(100 / n);
});

const s = $.from([5, 0, 50, 5, 5]).cachedFlatMap(oneHundredDividedBy, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(20),
$.unknown("Cannot divide by zero!", ["cachedFlatMap"]),
$.ok(2),
$.ok(20),
$.ok(20),
]);
expect(oneHundredDividedBy).toBeCalledTimes(3);
});

test("lookup repeating numbers, including an error, returning multiple atoms", async ({
expect,
}) => {
expect.assertions(2);

const lookup = vi.fn((n: number) => $.of(n));

const s = $.from<number, unknown>([
$.ok(1),
$.ok(2),
$.error("oh no!"),
$.ok(2),
$.ok(1),
]).cachedFlatMap(lookup, (v) => v);

expect(await s.toArray({ atoms: true })).toEqual([
$.ok(1),
$.ok(2),
$.error("oh no!"),
$.ok(2),
$.ok(1),
]);
expect(lookup).toBeCalledTimes(2);
});
});

describe.concurrent("flatten", () => {
test("simple nested stream", async ({ expect }) => {
expect.assertions(1);
Expand Down
Loading