Skip to content

Commit

Permalink
Close and destroy underlying socket after any connection error
Browse files Browse the repository at this point in the history
  • Loading branch information
Xabier Napal committed Aug 21, 2020
1 parent e3e1016 commit dac924f
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 8 deletions.
11 changes: 9 additions & 2 deletions lib/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,11 @@ function connect(url, socketOptions, openCallback) {
if (timeout) sock.setTimeout(0);
if (err === null) {
openCallback(null, c);
} else {
sock.end();
sock.destroy();
openCallback(err);
}
else openCallback(err);
});
}

Expand All @@ -174,7 +177,11 @@ function connect(url, socketOptions, openCallback) {
}

sock.once('error', function(err) {
if (!sockok) openCallback(err);
if (!sockok) {
sock.end();
sock.destroy();
openCallback(err);
}
});

}
Expand Down
33 changes: 27 additions & 6 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,15 @@ C.open = function(allFields, openCallback0) {
function send(Method) {
// This can throw an exception if there's some problem with the
// options; e.g., something is a string instead of a number.
try { self.sendMethod(0, Method, tunedOptions); }
catch (err) { bail(err); }
try {
self.sendMethod(0, Method, tunedOptions);
} catch (err) {
bail(err);

// We are in an inconsistent state so we can't continue,
// rethrowing the exception will stop further execution.
throw err;
}
}

function negotiate(server, desired) {
Expand All @@ -206,8 +213,13 @@ C.open = function(allFields, openCallback0) {
return;
}
self.serverProperties = start.fields.serverProperties;
send(defs.ConnectionStartOk);
wait(afterStartOk);
try {
send(defs.ConnectionStartOk);
wait(afterStartOk);
} catch {
// Exit, callback with error already called,
return;
}
}

function afterStartOk(reply) {
Expand All @@ -228,8 +240,13 @@ C.open = function(allFields, openCallback0) {
negotiate(fields.channelMax, allFields.channelMax);
tunedOptions.heartbeat =
negotiate(fields.heartbeat, allFields.heartbeat);
send(defs.ConnectionTuneOk);
send(defs.ConnectionOpen);
try {
send(defs.ConnectionTuneOk);
send(defs.ConnectionOpen);
} catch {
// Exit, callback with error already called,
return;
}
expect(defs.ConnectionOpenOk, onOpenOk);
break;
default:
Expand Down Expand Up @@ -257,6 +274,10 @@ C.open = function(allFields, openCallback0) {
// If the server closes the connection, it's probably because of
// something we did
function endWhileOpening(err) {
self.stream.removeListener('end', endWhileOpening);
self.stream.removeListener('error', endWhileOpening);
self.stream.end();

bail(err || new Error('Socket closed abruptly ' +
'during opening handshake'));
}
Expand Down

0 comments on commit dac924f

Please sign in to comment.