From 991fc64b3585120b0a6fadf88073d78c9b9cf789 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Fri, 21 Feb 2025 10:24:56 -0600 Subject: [PATCH 1/7] Handle EPIPE error when server unexpectedly closes the connection --- src/connection/HttpConnection.ts | 124 +++++++++++++++++++------------ 1 file changed, 76 insertions(+), 48 deletions(-) diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index 5b0929c..aec5297 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -113,6 +113,17 @@ export default class HttpConnection extends BaseConnection { } debug('Starting a new request', params) + + // tracking response.end, request.finish and the value of the returnable response object here is necessary: + // we only know a request is truly finished when one of the following is true: + // - request.finish and response.end have both fired (success) + // - request.error has fired (failure) + // - request.close has fired before its listener has been removed (failure) + // - response.close has fired (failure) + let responseEnded = false + let requestFinished = false + let connectionRequestResponse: ConnectionRequestResponse | ConnectionRequestResponseAsStream + let request: http.ClientRequest try { request = this.makeRequest(requestParams) @@ -135,7 +146,6 @@ export default class HttpConnection extends BaseConnection { const onResponse = (response: http.IncomingMessage): void => { cleanListeners() - this._openRequests-- if (options.asStream === true) { return resolve({ @@ -189,29 +199,25 @@ export default class HttpConnection extends BaseConnection { } } - const onEnd = (err: Error): void => { + const onEnd = (): void => { response.removeListener('data', onData) response.removeListener('end', onEnd) - response.removeListener('error', onEnd) - request.removeListener('error', noop) - - if (err != null) { - // @ts-expect-error - if (err.message === 'aborted' && err.code === 'ECONNRESET') { - response.destroy() - return reject(new ConnectionError('Response aborted while reading the body')) - } - if (err.name === 'RequestAbortedError') { - return reject(err) - } - return reject(new ConnectionError(err.message)) - } - resolve({ + responseEnded = true + + connectionRequestResponse = { body: isCompressed || bodyIsBinary ? Buffer.concat(payload as Buffer[]) : payload as string, statusCode: response.statusCode as number, headers: response.headers - }) + } + + if (requestFinished) { + return resolve(connectionRequestResponse) + } + } + + const onResponseClose = (): void => { + return reject(new ConnectionError('Response aborted while reading the body')) } if (!isCompressed && !bodyIsBinary) { @@ -220,30 +226,38 @@ export default class HttpConnection extends BaseConnection { this.diagnostic.emit('deserialization', null, options) response.on('data', onData) - response.on('error', onEnd) response.on('end', onEnd) + response.on('close', onResponseClose) } const onTimeout = (): void => { cleanListeners() - this._openRequests-- - request.once('error', () => {}) // we need to catch the request aborted error + request.once('error', noop) // we need to catch the request aborted error request.destroy() - reject(new TimeoutError('Request timed out')) + return reject(new TimeoutError('Request timed out')) } const onError = (err: Error): void => { + // @ts-expect-error + let { name, message, code } = err + + // ignore this error, it means we got a response body for a request that didn't expect a body (e.g. HEAD) + // rather than failing, let it return a response with an empty string as body + if (code === 'HPE_INVALID_CONSTANT' || message.startsWith('Parse Error: Expected HTTP/')) { + return + } + cleanListeners() - this._openRequests-- - let message = err.message - if (err.name === 'RequestAbortedError') { + if (name === 'RequestAbortedError') { return reject(err) } - // @ts-expect-error - if (err.code === 'ECONNRESET') { + + if (code === 'ECONNRESET') { message += ` - Local: ${request.socket?.localAddress ?? 'unknown'}:${request.socket?.localPort ?? 'unknown'}, Remote: ${request.socket?.remoteAddress ?? 'unknown'}:${request.socket?.remotePort ?? 'unknown'}` + } else if (code === 'EPIPE') { + message = 'Connection closed early by server' } - reject(new ConnectionError(message)) + return reject(new ConnectionError(message)) } const onSocket = (socket: TLSSocket): void => { @@ -269,9 +283,42 @@ export default class HttpConnection extends BaseConnection { } } + const onFinish = (): void => { + requestFinished = true + + if (responseEnded) { + if (connectionRequestResponse != null) { + return resolve(connectionRequestResponse) + } else { + return reject(new Error('No response body received')) + } + } + } + + const cleanListeners = (): void => { + if (cleanedListeners) return + + this._openRequests-- + + // we do NOT stop listening to request.error here + // all errors we care about in the request/response lifecycle will bubble up to request.error, and may occur even after the request has been sent + request.removeListener('response', onResponse) + request.removeListener('timeout', onTimeout) + request.removeListener('socket', onSocket) + if (options.signal != null) { + if ('removeEventListener' in options.signal) { + options.signal.removeEventListener('abort', abortListener) + } else { + options.signal.removeListener('abort', abortListener) + } + } + cleanedListeners = true + } + request.on('response', onResponse) request.on('timeout', onTimeout) request.on('error', onError) + request.on('finish', onFinish) if (this[kCaFingerprint] != null && requestParams.protocol === 'https:') { request.on('socket', onSocket) } @@ -285,31 +332,12 @@ export default class HttpConnection extends BaseConnection { /* istanbul ignore if */ if (err != null && !cleanedListeners) { cleanListeners() - this._openRequests-- - reject(err) + return reject(err) } }) } else { request.end(params.body) } - - return request - - function cleanListeners (): void { - request.removeListener('response', onResponse) - request.removeListener('timeout', onTimeout) - request.removeListener('error', onError) - request.on('error', noop) - request.removeListener('socket', onSocket) - if (options.signal != null) { - if ('removeEventListener' in options.signal) { - options.signal.removeEventListener('abort', abortListener) - } else { - options.signal.removeListener('abort', abortListener) - } - } - cleanedListeners = true - } }) } From 120b3c608e15bcd7447bf5ce6ce1d2b9bccb4d48 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Fri, 21 Feb 2025 10:28:09 -0600 Subject: [PATCH 2/7] Add tests for EPIPE server disconnect edge case --- test/unit/http-connection.test.ts | 80 +++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/test/unit/http-connection.test.ts b/test/unit/http-connection.test.ts index 4eaacc9..db67d13 100644 --- a/test/unit/http-connection.test.ts +++ b/test/unit/http-connection.test.ts @@ -1053,6 +1053,86 @@ test('Socket destroyed while reading the body', async t => { server.stop() }) +test('Connection closed while sending the request body as stream (EPIPE)', async t => { + t.plan(2) + + let dataCounter = 0 + function handler (_req: http.IncomingMessage, res: http.ServerResponse) { + dataCounter++ + if (dataCounter === 1) { + res.writeHead(413, { 'Connection': 'close' }); + res.end('Payload Too Large'); + } else { + t.fail('Request should stop trying to send data') + } + } + const [{ port }, server] = await buildServer(handler) + + const connection = new HttpConnection({ + url: new URL(`http://localhost:${port}`), + }) + + const body = new Readable({ + async read (_size: number) { + await setTimeout(500) + // run one large request where data will be received by socket in multiple chunks + this.push('x'.repeat(99999999)) + await setTimeout(500) + this.push(null) // EOF + } + }) + + try { + // run one large request where data will be received by socket in multiple chunks + await connection.request({ + path: '/hello', + method: 'POST', + body + }, options) + t.fail('ConnectionError should have been caught') + } catch (err: any) { + t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`) + t.equal(err.message, 'Response aborted while reading the body') + } + + server.stop() +}) + +test('Connection closed while sending the request body as string (EPIPE)', async t => { + t.plan(2) + + let dataCounter = 0 + function handler (_req: http.IncomingMessage, res: http.ServerResponse) { + dataCounter++ + if (dataCounter === 1) { + res.writeHead(413, { 'Connection': 'close' }); + res.end('Payload Too Large'); + } else { + t.fail('Request should stop trying to send data') + } + } + const [{ port }, server] = await buildServer(handler) + + const connection = new HttpConnection({ + url: new URL(`http://localhost:${port}`), + }) + + try { + // run one large request where data will be received by socket in multiple chunks + await connection.request({ + path: '/hello', + method: 'POST', + body: 'x'.repeat(99999999) + }, options) + t.fail('ConnectionError should have been caught') + } catch (err: any) { + t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`) + t.equal(err.message, 'Response aborted while reading the body') + } + + server.stop() +}) + test('Compressed response should return a buffer as body (gzip)', async t => { t.plan(2) From 1a2f6096ce09b2cd9df98a4eda48802a8ef35ea1 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Fri, 21 Feb 2025 10:55:00 -0600 Subject: [PATCH 3/7] Drop irrelevant comment --- src/connection/HttpConnection.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index aec5297..aa036a7 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -118,7 +118,6 @@ export default class HttpConnection extends BaseConnection { // we only know a request is truly finished when one of the following is true: // - request.finish and response.end have both fired (success) // - request.error has fired (failure) - // - request.close has fired before its listener has been removed (failure) // - response.close has fired (failure) let responseEnded = false let requestFinished = false From a1c7abb5b97e2dd0047a6f574d4f5d367345b360 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Fri, 21 Feb 2025 11:14:31 -0600 Subject: [PATCH 4/7] Cleanup of unneeded logic --- src/connection/HttpConnection.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index aa036a7..41633ef 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -242,9 +242,7 @@ export default class HttpConnection extends BaseConnection { // ignore this error, it means we got a response body for a request that didn't expect a body (e.g. HEAD) // rather than failing, let it return a response with an empty string as body - if (code === 'HPE_INVALID_CONSTANT' || message.startsWith('Parse Error: Expected HTTP/')) { - return - } + if (code === 'HPE_INVALID_CONSTANT' && message.startsWith('Parse Error: Expected HTTP/')) return cleanListeners() if (name === 'RequestAbortedError') { @@ -253,8 +251,6 @@ export default class HttpConnection extends BaseConnection { if (code === 'ECONNRESET') { message += ` - Local: ${request.socket?.localAddress ?? 'unknown'}:${request.socket?.localPort ?? 'unknown'}, Remote: ${request.socket?.remoteAddress ?? 'unknown'}:${request.socket?.remotePort ?? 'unknown'}` - } else if (code === 'EPIPE') { - message = 'Connection closed early by server' } return reject(new ConnectionError(message)) } From 717ac1a77bc5344ae75cc9c34be2996350261414 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Fri, 21 Feb 2025 11:19:16 -0600 Subject: [PATCH 5/7] Explicitly handle EPIPE error response 'close' event catches it implicitly on Linux, but apparently not on Windows or Mac. --- src/connection/HttpConnection.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/connection/HttpConnection.ts b/src/connection/HttpConnection.ts index 41633ef..ec746d3 100644 --- a/src/connection/HttpConnection.ts +++ b/src/connection/HttpConnection.ts @@ -251,6 +251,8 @@ export default class HttpConnection extends BaseConnection { if (code === 'ECONNRESET') { message += ` - Local: ${request.socket?.localAddress ?? 'unknown'}:${request.socket?.localPort ?? 'unknown'}, Remote: ${request.socket?.remoteAddress ?? 'unknown'}:${request.socket?.remotePort ?? 'unknown'}` + } else if (code === 'EPIPE') { + message = 'Response aborted while reading the body' } return reject(new ConnectionError(message)) } From 82ef90551c8786ad0061e2f39e950c1ab24ae8f4 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Fri, 21 Feb 2025 11:30:59 -0600 Subject: [PATCH 6/7] Make error message check more permissive --- test/unit/http-connection.test.ts | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/unit/http-connection.test.ts b/test/unit/http-connection.test.ts index db67d13..d0c3a65 100644 --- a/test/unit/http-connection.test.ts +++ b/test/unit/http-connection.test.ts @@ -1092,7 +1092,10 @@ test('Connection closed while sending the request body as stream (EPIPE)', async t.fail('ConnectionError should have been caught') } catch (err: any) { t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`) - t.equal(err.message, 'Response aborted while reading the body') + t.ok( + err.message === 'Response aborted while reading the body' || err.message.startsWith('write ECONNRESET - Local:'), + `Unexpected error message: ${err.message}` + ) } server.stop() @@ -1127,7 +1130,10 @@ test('Connection closed while sending the request body as string (EPIPE)', async t.fail('ConnectionError should have been caught') } catch (err: any) { t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`) - t.equal(err.message, 'Response aborted while reading the body') + t.ok( + err.message === 'Response aborted while reading the body' || err.message.startsWith('write ECONNRESET - Local:'), + `Unexpected error message: ${err.message}` + ) } server.stop() From 7465395fa8f2c79690bfb4f734b938a7f0b7f7db Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Fri, 21 Feb 2025 11:35:28 -0600 Subject: [PATCH 7/7] Make error message check even more permissive --- test/unit/http-connection.test.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/test/unit/http-connection.test.ts b/test/unit/http-connection.test.ts index d0c3a65..6a34d71 100644 --- a/test/unit/http-connection.test.ts +++ b/test/unit/http-connection.test.ts @@ -1093,7 +1093,9 @@ test('Connection closed while sending the request body as stream (EPIPE)', async } catch (err: any) { t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`) t.ok( - err.message === 'Response aborted while reading the body' || err.message.startsWith('write ECONNRESET - Local:'), + err.message === 'Response aborted while reading the body' || + err.message.startsWith('write ECONNRESET - Local:') || + err.message.startsWith('read ECONNRESET - Local:'), `Unexpected error message: ${err.message}` ) } @@ -1131,7 +1133,9 @@ test('Connection closed while sending the request body as string (EPIPE)', async } catch (err: any) { t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`) t.ok( - err.message === 'Response aborted while reading the body' || err.message.startsWith('write ECONNRESET - Local:'), + err.message === 'Response aborted while reading the body' || + err.message.startsWith('write ECONNRESET - Local:') || + err.message.startsWith('read ECONNRESET - Local:'), `Unexpected error message: ${err.message}` ) }