Skip to content

Commit

Permalink
chore(internal): minor restructuring (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
stainless-app[bot] committed Jan 21, 2025
1 parent ff0f7bd commit 8c0c78d
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/internal/decoders/line.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WriterError } from '../../error';

type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;
export type Bytes = string | ArrayBuffer | Uint8Array | Buffer | null | undefined;

/**
* A re-implementation of httpx's `LineDecoder` in Python that handles incrementally
Expand Down
32 changes: 32 additions & 0 deletions src/internal/stream-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/**
* Most browsers don't yet have async iterable support for ReadableStream,
* and Node has a very different way of reading bytes from its "ReadableStream".
*
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
export function ReadableStreamToAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
if (stream[Symbol.asyncIterator]) return stream;

const reader = stream.getReader();
return {
async next() {
try {
const result = await reader.read();
if (result?.done) reader.releaseLock(); // release lock when stream becomes closed
return result;
} catch (e) {
reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
},
async return() {
const cancelPromise = reader.cancel();
reader.releaseLock();
await cancelPromise;
return { done: true, value: undefined };
},
[Symbol.asyncIterator]() {
return this;
},
};
}
38 changes: 3 additions & 35 deletions src/streaming.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ReadableStream, type Response } from './_shims/index';
import { WriterError } from './error';
import { LineDecoder } from './internal/decoders/line';
import { ReadableStreamToAsyncIterable } from './internal/stream-utils';

import { createResponseHeaders } from './core';
import { APIError } from './error';
Expand Down Expand Up @@ -84,7 +85,7 @@ export class Stream<Item> implements AsyncIterable<Item> {
async function* iterLines(): AsyncGenerator<string, void, unknown> {
const lineDecoder = new LineDecoder();

const iter = readableStreamAsyncIterable<Bytes>(readableStream);
const iter = ReadableStreamToAsyncIterable<Bytes>(readableStream);
for await (const chunk of iter) {
for (const line of lineDecoder.decode(chunk)) {
yield line;
Expand Down Expand Up @@ -198,7 +199,7 @@ export async function* _iterSSEMessages(
const sseDecoder = new SSEDecoder();
const lineDecoder = new LineDecoder();

const iter = readableStreamAsyncIterable<Bytes>(response.body);
const iter = ReadableStreamToAsyncIterable<Bytes>(response.body);
for await (const sseChunk of iterSSEChunks(iter)) {
for (const line of lineDecoder.decode(sseChunk)) {
const sse = sseDecoder.decode(line);
Expand Down Expand Up @@ -351,36 +352,3 @@ function partition(str: string, delimiter: string): [string, string, string] {

return [str, '', ''];
}

/**
* Most browsers don't yet have async iterable support for ReadableStream,
* and Node has a very different way of reading bytes from its "ReadableStream".
*
* This polyfill was pulled from https://github.com/MattiasBuelens/web-streams-polyfill/pull/122#issuecomment-1627354490
*/
export function readableStreamAsyncIterable<T>(stream: any): AsyncIterableIterator<T> {
if (stream[Symbol.asyncIterator]) return stream;

const reader = stream.getReader();
return {
async next() {
try {
const result = await reader.read();
if (result?.done) reader.releaseLock(); // release lock when stream becomes closed
return result;
} catch (e) {
reader.releaseLock(); // release lock when stream becomes errored
throw e;
}
},
async return() {
const cancelPromise = reader.cancel();
reader.releaseLock();
await cancelPromise;
return { done: true, value: undefined };
},
[Symbol.asyncIterator]() {
return this;
},
};
}

0 comments on commit 8c0c78d

Please sign in to comment.