From 8c0c78de952ac755f882ff43b343ab24d2ec9bf9 Mon Sep 17 00:00:00 2001 From: "stainless-app[bot]" <142633134+stainless-app[bot]@users.noreply.github.com> Date: Tue, 21 Jan 2025 20:59:00 +0000 Subject: [PATCH] chore(internal): minor restructuring (#143) --- src/internal/decoders/line.ts | 2 +- src/internal/stream-utils.ts | 32 +++++++++++++++++++++++++++++ src/streaming.ts | 38 +++-------------------------------- 3 files changed, 36 insertions(+), 36 deletions(-) create mode 100644 src/internal/stream-utils.ts diff --git a/src/internal/decoders/line.ts b/src/internal/decoders/line.ts index cc0b99f..5f563b3 100644 --- a/src/internal/decoders/line.ts +++ b/src/internal/decoders/line.ts @@ -1,6 +1,6 @@ import { WriterError } from '../../error'; -type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined; +export type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined; /** * A re-implementation of httpx's `LineDecoder` in Python that handles incrementally diff --git a/src/internal/stream-utils.ts b/src/internal/stream-utils.ts new file mode 100644 index 0000000..37f7793 --- /dev/null +++ b/src/internal/stream-utils.ts @@ -0,0 +1,32 @@ +/** + * Most browsers don't yet have async iterable support for ReadableStream, + * and Node has a very different way of reading bytes from its "ReadableStream". + * + * This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 + */ +export function ReadableStreamToAsyncIterable(stream: any): AsyncIterableIterator { + if (stream[Symbol.asyncIterator]) return stream; + + const reader = stream.getReader(); + return { + async next() { + try { + const result = await reader.read(); + if (result?.done) reader.releaseLock(); // release lock when stream becomes closed + return result; + } catch (e) { + reader.releaseLock(); // release lock when stream becomes errored + throw e; + } + }, + async return() { + const cancelPromise = reader.cancel(); + reader.releaseLock(); + await cancelPromise; + return { done: true, value: undefined }; + }, + [Symbol.asyncIterator]() { + return this; + }, + }; +} diff --git a/src/streaming.ts b/src/streaming.ts index c71a75c..d9718ed 100644 --- a/src/streaming.ts +++ b/src/streaming.ts @@ -1,6 +1,7 @@ import { ReadableStream, type Response } from './_shims/index'; import { WriterError } from './error'; import { LineDecoder } from './internal/decoders/line'; +import { ReadableStreamToAsyncIterable } from './internal/stream-utils'; import { createResponseHeaders } from './core'; import { APIError } from './error'; @@ -84,7 +85,7 @@ export class Stream implements AsyncIterable { async function* iterLines(): AsyncGenerator { const lineDecoder = new LineDecoder(); - const iter = readableStreamAsyncIterable(readableStream); + const iter = ReadableStreamToAsyncIterable(readableStream); for await (const chunk of iter) { for (const line of lineDecoder.decode(chunk)) { yield line; @@ -198,7 +199,7 @@ export async function* _iterSSEMessages( const sseDecoder = new SSEDecoder(); const lineDecoder = new LineDecoder(); - const iter = readableStreamAsyncIterable(response.body); + const iter = ReadableStreamToAsyncIterable(response.body); for await (const sseChunk of iterSSEChunks(iter)) { for (const line of lineDecoder.decode(sseChunk)) { const sse = sseDecoder.decode(line); @@ -351,36 +352,3 @@ function partition(str: string, delimiter: string): [string, string, string] { return [str, '', '']; } - -/** - * Most browsers don't yet have async iterable support for ReadableStream, - * and Node has a very different way of reading bytes from its "ReadableStream". - * - * This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490 - */ -export function readableStreamAsyncIterable(stream: any): AsyncIterableIterator { - if (stream[Symbol.asyncIterator]) return stream; - - const reader = stream.getReader(); - return { - async next() { - try { - const result = await reader.read(); - if (result?.done) reader.releaseLock(); // release lock when stream becomes closed - return result; - } catch (e) { - reader.releaseLock(); // release lock when stream becomes errored - throw e; - } - }, - async return() { - const cancelPromise = reader.cancel(); - reader.releaseLock(); - await cancelPromise; - return { done: true, value: undefined }; - }, - [Symbol.asyncIterator]() { - return this; - }, - }; -}