-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathio.ts
34 lines (32 loc) · 1.01 KB
/
io.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import { TextLineStream, toTransformStream } from "@std/streams";
export const saveResponse = async (r: Response, path: string | URL) => {
if (r.body) {
const file = await Deno.open(path, { write: true, create: true });
await r.body.pipeTo(file.writable);
}
};
export const ndjsonStream = <In>(
stream: ReadableStream,
) =>
stream.pipeThrough(new TextDecoderStream())
.pipeThrough(new TextLineStream())
.pipeThrough<In>(
toTransformStream(async function* (src: AsyncIterable<string>) {
for await (const chunk of src) {
if (chunk.trim().length > 0) {
const parsed: In = JSON.parse(chunk);
yield parsed;
}
}
}),
);
export async function* fetchAndStreamJson<T>(url: URL | string) {
const r = await fetch(url);
if (r.ok) {
for await (const entry of await r.json() as { value: T }[]) {
yield entry?.value;
}
}
}
export const fetchAndStreamNdjson = async <T>(url: URL) =>
ndjsonStream<T>((await fetch(url))?.body!);