Skip to content

Commit

Permalink
Fix #2: ByteLengthTruncateStream refactoring
Browse files Browse the repository at this point in the history
Setting 'error' event listener (as a part of byteLength getter)
without handling byteLength lead to silenced errors.

1. Remove SafeEventEmitter, use EventEmitter inherited
from stream.Transfor
2. 'byteLength' event
3. Remove byteLengthEvent promise
4. byteLength function/getter in FileIterator
5. Move abortOnFileByteLimit logic to stream implementation
  • Loading branch information
rafasofizada committed Jul 12, 2023
1 parent c061235 commit 37029d9
Show file tree
Hide file tree
Showing 5 changed files with 348 additions and 67 deletions.
87 changes: 50 additions & 37 deletions src/ByteLengthTruncateStream.ts
Original file line number Diff line number Diff line change
@@ -1,67 +1,80 @@
import { Transform, TransformCallback } from 'stream';

import { SafeEventEmitter } from './SafeEventEmitter';
import { Readable, Transform, TransformCallback } from 'stream';
import { FieldLimitError } from './error';

export class ByteLengthTruncateStream extends Transform {
public readonly byteLengthEvent: Promise<FileByteLengthInfo>;
private _bytesWritten: number = 0;
private _truncated: boolean = false;

private readonly ee: SafeEventEmitter<ByteLengthStreamEvents>;
private truncated: boolean = false;
private readBytes: number = 0;
get bytesWritten(): number {
return this._bytesWritten;
}

constructor(private readonly maxByteLength: number) {
super();
get truncated(): boolean {
return this._truncated;
}

this.ee = new SafeEventEmitter();
public on(event: 'byteLength', listener: (bytesWritten: number) => void): this;
public on(event: 'close', listener: () => void): this;
public on(event: 'data', listener: (chunk: any) => void): this;
public on(event: 'end', listener: () => void): this;
public on(event: 'error', listener: (err: Error) => void): this;
public on(event: 'pause', listener: () => void): this;
public on(event: 'readable', listener: () => void): this;
public on(event: 'resume', listener: () => void): this;
public on(event: 'close', listener: () => void): this;
public on(event: 'drain', listener: () => void): this;
public on(event: 'finish', listener: () => void): this;
public on(event: 'pipe', listener: (src: Readable) => void): this;
public on(event: 'unpipe', listener: (src: Readable) => void): this;
public on(event: string | symbol, listener: (...args: any[]) => void): this {
return super.on(event, listener);
}

// Snapshot state on 'byteLength'. `this.truncated` and `this.readBytes` are primitives, so we don't need to worry about
// them changing after the event is emitted.
this.byteLengthEvent = new Promise((resolve) => {
this.ee.on('byteLength', () => {
return resolve({
truncated: this.truncated,
readBytes: this.readBytes,
});
});
});
constructor(
private readonly limit: number,
private readonly abortOnFileByteLengthLimit: boolean,
private readonly field: string,
) {
super();
}

// encoding = 'buffer': https://nodejs.org/api/stream.html#transform_transformchunk-encoding-callback
public _transform(chunk: Buffer | string, encoding: BufferEncoding | 'buffer', callback: TransformCallback): void {
if (this.truncated) {
if (this._truncated) {
return callback();
}

const chunkBuffer = encoding === 'buffer'
? chunk as Buffer
: Buffer.from(chunk as string, encoding);

if (this.readBytes + chunkBuffer.byteLength > this.maxByteLength) {
const truncatedChunk = chunkBuffer.subarray(0, this.maxByteLength - this.readBytes);
this.readBytes += truncatedChunk.byteLength;
this.truncated = true;
if (this._bytesWritten + chunkBuffer.byteLength > this.limit) {
const truncatedChunk = chunkBuffer.subarray(0, this.limit - this._bytesWritten);
this._bytesWritten += truncatedChunk.byteLength;
this.push(truncatedChunk);

this.ee.emit('byteLength', undefined);

return callback(null, truncatedChunk);
if (this.abortOnFileByteLengthLimit) {
return callback(new FieldLimitError('maxFileByteLength', this.field!, this.limit));
} else {
this._truncated = true;
return callback();
}
} else {
this.push(chunkBuffer);
this._bytesWritten += chunkBuffer.byteLength;

return callback();
}

this.readBytes += chunkBuffer.byteLength;
return callback(null, chunk);
}

public _flush(callback: TransformCallback): void {
this.ee.emit('byteLength', undefined);
this.emit('byteLength', this._bytesWritten);

return callback();
}
}

export type FileByteLengthInfo = {
truncated: boolean;
readBytes: number;
};

type ByteLengthStreamEvents = {
byteLength: void;
truncated: boolean;
};
56 changes: 45 additions & 11 deletions src/FileIterator.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import busboy from "busboy";
import { on, Readable } from "stream";

import { ByteLengthTruncateStream } from "./ByteLengthTruncateStream";
import { TotalLimitError, FieldLimitError } from "./error";
import { ByteLengthTruncateStream, FileByteLengthInfo } from "./ByteLengthTruncateStream";
import { TotalLimitError } from "./error";
import { FileCounter } from "./FileCounter";
import { Internal } from "./types";

Expand Down Expand Up @@ -118,23 +118,57 @@ function processBusboyFileEventPayload(
// FileCounter may throw, it's a Proxy!
fileCounter[field] += 1;

const truncatedStream = stream.pipe(new ByteLengthTruncateStream(maxFileByteLength));
const lengthLimiter = new ByteLengthTruncateStream(maxFileByteLength, abortOnFileByteLengthLimit, field);
const truncatedStream = stream.pipe(lengthLimiter);

return {
field,
stream: truncatedStream,
byteLength: truncatedStream.byteLengthEvent
.then((payload) => {
if (payload.truncated && abortOnFileByteLengthLimit) {
throw new FieldLimitError("maxFileByteLength", field, maxFileByteLength);
}

return payload;
}),
// Why getter and not a property?
// So that byteLength() is called lazily. byteLength() has a side effect: it sets
// the 'error' event listener on the stream. If we 'prematurely' set the 'error' event listener
// as a part of byteLength property, but don't error-handle byteLength, we will get silenced errors.
// https://github.com/rafasofizada/pechkin/issues/2
get byteLength() {
return byteLength(truncatedStream)
},
...info,
};
}

export function byteLength(stream: ByteLengthTruncateStream): Promise<FileByteLengthInfo> {
return new Promise((resolve, reject) => {
let eventBeforeCloseEmitted = false;

const payload = (stream: ByteLengthTruncateStream) => ({
readBytes: stream.bytesWritten,
truncated: stream.truncated,
});

stream
.on('error', (error) => {
eventBeforeCloseEmitted = true;
return reject(error);
})
.on('close', () => {
if (!eventBeforeCloseEmitted) {
console.warn(`Pechkin::ByteLengthLimitStream closed, without 'finish' & 'byteLength' or 'error' events
firing beforehand. This should not happen. Probable cause: stream was destroy()'ed.`);
}

resolve(payload(stream));
})
.on('finish', () => {
eventBeforeCloseEmitted = true;
return resolve({
readBytes: stream.bytesWritten,
truncated: stream.truncated,
});
})
.on('byteLength', () => resolve(payload(stream)));
});
}

/*
> eIter = events.on(target, event, handler);
Expand Down
19 changes: 0 additions & 19 deletions src/SafeEventEmitter.ts

This file was deleted.

166 changes: 166 additions & 0 deletions test/ByteLengthLimitStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import { Readable } from 'stream';
import { describe, it, expect } from 'vitest';

import { FieldLimitError } from '../src';
import { ByteLengthTruncateStream } from '../src/ByteLengthTruncateStream';
import { byteLength } from '../src/FileIterator';

describe('ByteLengthTruncateStream', () => {
describe('_transform', () => {
it('should pass through chunk if limit is not exceeded', () => new Promise<void>((resolve, reject) => {
const limit = 100;
const field = 'content';
const stream = new ByteLengthTruncateStream(limit, true, field);

const data = 'This is a short chunk of data.';

stream.on('data', (chunk) => {
expect(chunk.toString()).toBe(data);
resolve();
});

stream.write(data);
}));

it('should truncate chunk and emit error if limit is exceeded', () => new Promise<void>((resolve, reject) => {
const limit = 10;
const field = 'content';
const stream = new ByteLengthTruncateStream(limit, true, field);

const data = 'This is a long chunk of data.';
const expectedTruncatedData = 'This is a ';
const expectedError = new FieldLimitError('maxFileByteLength', field, limit);

const chunks: any[] = [];

stream.on('data', (chunk) => {
chunks.push(chunk);
});

stream.on('error', (error) => {
expect(error).toEqual(expectedError);
expect(Buffer.concat(chunks).toString()).toEqual(expectedTruncatedData);
resolve();
});

stream.write(data);
}));
});

describe('byteLength event', () => {
it('should return the correct byte length', () => new Promise<void>((resolve, reject) => {
const limit = 100;
const field = 'content';
const stream = new ByteLengthTruncateStream(limit, true, field);

const data = 'This is some data.';
const expectedByteLength = Buffer.byteLength(data);

stream
.on('byteLength', (byteLength) => {
expect(byteLength).toEqual(expectedByteLength);
resolve();
})
.on('error', (error) => {
reject(new Error('No error should be emitted: content < limit'));
console.error(error);
})
.on('finish', () => {
reject(new Error('\'byteLength\' should be emitted before \'finish\''));
});

stream.write(data);
stream.end();
}));

it('should not be emitted if limit exceeded', () => new Promise<void>((resolve, reject) => {
const limit = 10;
const field = 'content';
const stream = new ByteLengthTruncateStream(limit, true, field);

const data = 'This is some data.';

stream.write(data);
stream.end();

stream
.on('byteLength', (byteLength) => {
reject(new Error('\'byteLength\' should not be emitted: content > limit, should error instead'))
})
.on('error', (error) => {
expect(error).toEqual(new FieldLimitError('maxFileByteLength', field, limit));
resolve();
});
}));

it('should not set .on(\'error\') listener and catch errors, process.uncaughtException should fire', () => new Promise<void>((resolve, reject) => {
const uncaughtExceptionEvents: Error[] = [];
process.on('uncaughtException', (error: Error) => {
uncaughtExceptionEvents.push(error);
});

// Create the ByteLengthTruncateStream instance
const limit = 10;
const field = 'content';
const stream = new ByteLengthTruncateStream(limit, true, field);

// Create a readable stream and pipe it to ByteLengthTruncateStream
const input = 'This is a long chunk of data.';
const readableStream = Readable.from(input);
readableStream.pipe(stream);

// Allow some time for the event loop to process data
setTimeout(() => {
try {
// Assert that the process.uncaughtException event was fired
expect(uncaughtExceptionEvents.length).toBeGreaterThan(0);
} catch (error) {
reject(error);
}

process.removeAllListeners('uncaughtException');
resolve();
}, 500);
}));

// Verifying expectations for stream behaviour

it('is not available after the stream has been destroyed', () => new Promise<void>((resolve, reject) => {
const limit = 100;
const field = 'content';
const stream = new ByteLengthTruncateStream(limit, true, field);

const data = 'This is some data.';

stream.write(data);
stream.destroy();

stream
.on('byteLength', () => {
reject(new Error('\'byteLength\' should not be emitted: stream destroyed'))
})
.on('close', () => {
resolve();
});
}));

it('is not available after the stream has been ended', () => new Promise<void>((resolve, reject) => {
const limit = 100;
const field = 'content';
const stream = new ByteLengthTruncateStream(limit, true, field);

const data = 'This is some data.';

stream.write(data);
stream.end();

stream
.on('byteLength', () => {
reject(new Error('\'byteLength\' should not be emitted: stream destroyed'))
})
.on('finish', () => {
resolve();
});
}));
});
});
Loading

0 comments on commit 37029d9

Please sign in to comment.