Skip to content

Commit

Permalink
feat(stream): make BufferedTransformStream propagate cancel signal im…
Browse files Browse the repository at this point in the history
…mediately
  • Loading branch information
yume-chan committed Feb 20, 2025
1 parent 24b65fd commit cb21cd2
Showing 1 changed file with 19 additions and 9 deletions.
28 changes: 19 additions & 9 deletions libraries/stream-extra/src/buffered-transform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import { StructEmptyError } from "@yume-chan/struct";
import { BufferedReadableStream } from "./buffered.js";
import type { PushReadableStreamController } from "./push-readable.js";
import { PushReadableStream } from "./push-readable.js";
import type { ReadableWritablePair } from "./stream.js";
import type {
ReadableWritablePair,
WritableStreamDefaultController,
} from "./stream.js";
import { ReadableStream, WritableStream } from "./stream.js";

// TODO: BufferedTransformStream: find better implementation
Expand All @@ -25,11 +28,13 @@ export class BufferedTransformStream<T>
transform: (stream: BufferedReadableStream) => MaybePromiseLike<T>,
) {
// Convert incoming chunks to a `BufferedReadableStream`
let sourceStreamController!: PushReadableStreamController<Uint8Array>;
let bufferedStreamController!: PushReadableStreamController<Uint8Array>;

let writableStreamController!: WritableStreamDefaultController;

const buffered = new BufferedReadableStream(
new PushReadableStream<Uint8Array>((controller) => {
sourceStreamController = controller;
bufferedStreamController = controller;
}),
);

Expand All @@ -50,21 +55,26 @@ export class BufferedTransformStream<T>
}
},
cancel: (reason) => {
// Propagate cancel to the source stream
// So future writes will be rejected
return buffered.cancel(reason);
// If a `ReadableStream` is piping into `#writable`,
// This will cancel the `ReadableStream` immediately.
// If upstream is writing using `#writable`'s writer, this will
// throw errors for any future writes
return writableStreamController.error(reason);
},
});

this.#writable = new WritableStream({
start(controller) {
writableStreamController = controller;
},
async write(chunk) {
await sourceStreamController.enqueue(chunk);
await bufferedStreamController.enqueue(chunk);
},
abort() {
sourceStreamController.close();
bufferedStreamController.close();
},
close() {
sourceStreamController.close();
bufferedStreamController.close();
},
});
}
Expand Down

0 comments on commit cb21cd2

Please sign in to comment.