Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle server closing connection after response body is sent #223

Merged
merged 7 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 73 additions & 48 deletions src/connection/HttpConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ export default class HttpConnection extends BaseConnection {
}

debug('Starting a new request', params)

// tracking response.end, request.finish and the value of the returnable response object here is necessary:
// we only know a request is truly finished when one of the following is true:
// - request.finish and response.end have both fired (success)
// - request.error has fired (failure)
// - response.close has fired (failure)
let responseEnded = false
let requestFinished = false
let connectionRequestResponse: ConnectionRequestResponse | ConnectionRequestResponseAsStream

let request: http.ClientRequest
try {
request = this.makeRequest(requestParams)
Expand All @@ -135,7 +145,6 @@ export default class HttpConnection extends BaseConnection {

const onResponse = (response: http.IncomingMessage): void => {
cleanListeners()
this._openRequests--

if (options.asStream === true) {
return resolve({
Expand Down Expand Up @@ -189,29 +198,25 @@ export default class HttpConnection extends BaseConnection {
}
}

const onEnd = (err: Error): void => {
const onEnd = (): void => {
response.removeListener('data', onData)
response.removeListener('end', onEnd)
response.removeListener('error', onEnd)
request.removeListener('error', noop)

if (err != null) {
// @ts-expect-error
if (err.message === 'aborted' && err.code === 'ECONNRESET') {
response.destroy()
return reject(new ConnectionError('Response aborted while reading the body'))
}
if (err.name === 'RequestAbortedError') {
return reject(err)
}
return reject(new ConnectionError(err.message))
}

resolve({
responseEnded = true

connectionRequestResponse = {
body: isCompressed || bodyIsBinary ? Buffer.concat(payload as Buffer[]) : payload as string,
statusCode: response.statusCode as number,
headers: response.headers
})
}

if (requestFinished) {
return resolve(connectionRequestResponse)
}
}

const onResponseClose = (): void => {
return reject(new ConnectionError('Response aborted while reading the body'))
}

if (!isCompressed && !bodyIsBinary) {
Expand All @@ -220,30 +225,36 @@ export default class HttpConnection extends BaseConnection {

this.diagnostic.emit('deserialization', null, options)
response.on('data', onData)
response.on('error', onEnd)
response.on('end', onEnd)
response.on('close', onResponseClose)
}

const onTimeout = (): void => {
cleanListeners()
this._openRequests--
request.once('error', () => {}) // we need to catch the request aborted error
request.once('error', noop) // we need to catch the request aborted error
request.destroy()
reject(new TimeoutError('Request timed out'))
return reject(new TimeoutError('Request timed out'))
}

const onError = (err: Error): void => {
// @ts-expect-error
let { name, message, code } = err

// ignore this error, it means we got a response body for a request that didn't expect a body (e.g. HEAD)
// rather than failing, let it return a response with an empty string as body
if (code === 'HPE_INVALID_CONSTANT' && message.startsWith('Parse Error: Expected HTTP/')) return

cleanListeners()
this._openRequests--
let message = err.message
if (err.name === 'RequestAbortedError') {
if (name === 'RequestAbortedError') {
return reject(err)
}
// @ts-expect-error
if (err.code === 'ECONNRESET') {

if (code === 'ECONNRESET') {
message += ` - Local: ${request.socket?.localAddress ?? 'unknown'}:${request.socket?.localPort ?? 'unknown'}, Remote: ${request.socket?.remoteAddress ?? 'unknown'}:${request.socket?.remotePort ?? 'unknown'}`
} else if (code === 'EPIPE') {
message = 'Response aborted while reading the body'
}
reject(new ConnectionError(message))
return reject(new ConnectionError(message))
}

const onSocket = (socket: TLSSocket): void => {
Expand All @@ -269,9 +280,42 @@ export default class HttpConnection extends BaseConnection {
}
}

const onFinish = (): void => {
requestFinished = true

if (responseEnded) {
if (connectionRequestResponse != null) {
return resolve(connectionRequestResponse)
} else {
return reject(new Error('No response body received'))
}
}
}

const cleanListeners = (): void => {
if (cleanedListeners) return

this._openRequests--

// we do NOT stop listening to request.error here
// all errors we care about in the request/response lifecycle will bubble up to request.error, and may occur even after the request has been sent
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('socket', onSocket)
if (options.signal != null) {
if ('removeEventListener' in options.signal) {
options.signal.removeEventListener('abort', abortListener)
} else {
options.signal.removeListener('abort', abortListener)
}
}
cleanedListeners = true
}

request.on('response', onResponse)
request.on('timeout', onTimeout)
request.on('error', onError)
request.on('finish', onFinish)
if (this[kCaFingerprint] != null && requestParams.protocol === 'https:') {
request.on('socket', onSocket)
}
Expand All @@ -285,31 +329,12 @@ export default class HttpConnection extends BaseConnection {
/* istanbul ignore if */
if (err != null && !cleanedListeners) {
cleanListeners()
this._openRequests--
reject(err)
return reject(err)
}
})
} else {
request.end(params.body)
}

return request

function cleanListeners (): void {
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('error', onError)
request.on('error', noop)
request.removeListener('socket', onSocket)
if (options.signal != null) {
if ('removeEventListener' in options.signal) {
options.signal.removeEventListener('abort', abortListener)
} else {
options.signal.removeListener('abort', abortListener)
}
}
cleanedListeners = true
}
})
}

Expand Down
90 changes: 90 additions & 0 deletions test/unit/http-connection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,96 @@ test('Socket destroyed while reading the body', async t => {
server.stop()
})

test('Connection closed while sending the request body as stream (EPIPE)', async t => {
t.plan(2)

let dataCounter = 0
function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
dataCounter++
if (dataCounter === 1) {
res.writeHead(413, { 'Connection': 'close' });
res.end('Payload Too Large');
} else {
t.fail('Request should stop trying to send data')
}
}
const [{ port }, server] = await buildServer(handler)

const connection = new HttpConnection({
url: new URL(`http://localhost:${port}`),
})

const body = new Readable({
async read (_size: number) {
await setTimeout(500)
// run one large request where data will be received by socket in multiple chunks
this.push('x'.repeat(99999999))
await setTimeout(500)
this.push(null) // EOF
}
})

try {
// run one large request where data will be received by socket in multiple chunks
await connection.request({
path: '/hello',
method: 'POST',
body
}, options)
t.fail('ConnectionError should have been caught')
} catch (err: any) {
t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`)
t.ok(
err.message === 'Response aborted while reading the body' ||
err.message.startsWith('write ECONNRESET - Local:') ||
err.message.startsWith('read ECONNRESET - Local:'),
`Unexpected error message: ${err.message}`
)
}

server.stop()
})

test('Connection closed while sending the request body as string (EPIPE)', async t => {
t.plan(2)

let dataCounter = 0
function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
dataCounter++
if (dataCounter === 1) {
res.writeHead(413, { 'Connection': 'close' });
res.end('Payload Too Large');
} else {
t.fail('Request should stop trying to send data')
}
}
const [{ port }, server] = await buildServer(handler)

const connection = new HttpConnection({
url: new URL(`http://localhost:${port}`),
})

try {
// run one large request where data will be received by socket in multiple chunks
await connection.request({
path: '/hello',
method: 'POST',
body: 'x'.repeat(99999999)
}, options)
t.fail('ConnectionError should have been caught')
} catch (err: any) {
t.ok(err instanceof ConnectionError, `Not a ConnectionError: ${err}`)
t.ok(
err.message === 'Response aborted while reading the body' ||
err.message.startsWith('write ECONNRESET - Local:') ||
err.message.startsWith('read ECONNRESET - Local:'),
`Unexpected error message: ${err.message}`
)
}

server.stop()
})

test('Compressed response should return a buffer as body (gzip)', async t => {
t.plan(2)

Expand Down
Loading