Skip to content

Commit

Permalink
Generate Web ReadableStream by converting Node.js Readable
Browse files Browse the repository at this point in the history
  • Loading branch information
Borewit committed Jan 20, 2025
1 parent 75096eb commit 51c6bea
Showing 1 changed file with 40 additions and 37 deletions.
77 changes: 40 additions & 37 deletions test/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import { Readable } from 'node:stream';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
import { createReadStream } from 'node:fs';
import fs from 'node:fs/promises';
import { ReadableStream } from 'node:stream/web';

const filename = fileURLToPath(import.meta.url);
const dirname = path.dirname(filename);
Expand All @@ -24,52 +24,55 @@ export class SourceStream extends Readable {
}
}

export async function makeReadableByteFileStream(filename: string, delay = 0): Promise<{ fileSize: number, stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {
export function makeWebReadableStreamUseDefaultReader(stream: ReadableStream): ReadableStream {
const _getReader = stream.getReader.bind(stream);

// @ts-ignore
stream.getReader = options => {
return _getReader(); // Call without options for a default reader
};
return stream;
}

export async function makeReadableByteFileStream(filename: string): Promise<{ fileSize: number, stream: ReadableStream<Uint8Array>, closeFile: () => Promise<void> }> {

let position = 0;
const fileInfo = await fs.stat(filename);
const fileHandle = await fs.open(filename, 'r');
const nodeStream = createReadStream(filename);

return {
fileSize: fileInfo.size,
stream: new ReadableStream({
type: 'bytes',

async pull(controller) {
stream: fileAsByteStream(nodeStream),
closeFile: () => Promise.resolve()
};
}

// @ts-ignore
const view = controller.byobRequest.view;
function fileAsByteStream(nodeStream: Readable) {

setTimeout(async () => {
try {
const {bytesRead} = await fileHandle.read(view, 0, view.byteLength, position);
if (bytesRead === 0) {
await fileHandle.close();
controller.close();
// @ts-ignore
controller.byobRequest.respond(0);
} else {
position += bytesRead;
// @ts-ignore
controller.byobRequest.respond(bytesRead);
}
} catch (err) {
controller.error(err);
await fileHandle.close();
}
}, delay);
},
// Wrap it into a WHATWG ReadableStream with type 'bytes'
return new ReadableStream({
type: 'bytes',
start(controller) {
// Event listener for data from the Node.js stream
nodeStream.on('data', chunk => {
// Enqueue each chunk into the ReadableStream
controller.enqueue(new Uint8Array(chunk));
});

cancel() {
return fileHandle.close();
},
// Handle end of the stream
nodeStream.on('end', () => {
controller.close();
});

autoAllocateChunkSize: 1024
}),
closeFile: () => {
return fileHandle.close();
// Handle errors
nodeStream.on('error', err => {
controller.error(err);
});
},
cancel(reason) {
// If canceled, destroy the Node.js stream
nodeStream.destroy(reason);
}
};
});
}

export const samplePath = path.join(dirname, 'samples');

0 comments on commit 51c6bea

Please sign in to comment.