Skip to content

Commit

Permalink
Merge pull request #53 from exposr/improve-error-handling-on-ws-ingress
Browse files Browse the repository at this point in the history
Improve error handling on ws ingress
  • Loading branch information
fredriklindberg authored Aug 31, 2023
2 parents 6d482d2 + 4b7a678 commit b00bb9a
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 11 deletions.
47 changes: 41 additions & 6 deletions src/ingress/http-ingress.js
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,6 @@ class HttpIngress {
}

async handleUpgradeRequest(req, sock, head, baseUrl) {

const _canonicalHttpResponse = (sock, request, response) => {
sock.write(`HTTP/${request.httpVersion} ${response.status} ${response.statusLine}\r\n`);
sock.write('\r\n');
Expand Down Expand Up @@ -391,8 +390,33 @@ class HttpIngress {
port: this.httpListener.getPort(),
}
};
const target = this.tunnelService.createConnection(tunnel.id, ctx);
if (target === undefined) {
const target = this.tunnelService.createConnection(tunnel.id, ctx, (err) => {
if (!err) {
return;
}
let statusCode;
let statusLine;
let msg;
if (err.code === 'EMFILE') {
statusCode = 429;
statusLine = 'Too Many Requests';
msg = ERROR_TUNNEL_TRANSPORT_REQUEST_LIMIT;
} else if (err.code == 'ECONNRESET') {
statusCode = 503;
statusLine = 'Service Unavailable';
msg = ERROR_TUNNEL_TARGET_CON_REFUSED;
} else {
statusCode = 503;
statusLine = 'Service Unavailable';
msg = ERROR_TUNNEL_TARGET_CON_FAILED;
}
_canonicalHttpResponse(sock, req, {
status: statusCode,
statusLine,
body: JSON.stringify({error: msg}),
});
});
if (!target) {
_canonicalHttpResponse(sock, req, {
status: 503,
statusLine: 'Service Unavailable',
Expand All @@ -402,11 +426,22 @@ class HttpIngress {
}

const headers = this._requestHeaders(req, tunnel, baseUrl);
target.on('error', (err) => {
sock.end();
});

const close = (err) => {
target.off('error', close);
target.off('close', close);
sock.off('error', close);
sock.off('close', close);
sock.destroy();
target.destroy();
};

target.on('connect', () => {
target.on('error', close);
target.on('close', close);
sock.on('error', close);
sock.on('close', close);

target.pipe(sock);
sock.pipe(target);

Expand Down
1 change: 1 addition & 0 deletions src/listener/http-listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class HttpListener extends ListenerInterface {
}
} catch (e) {
this.logger.error(e.message);
this.logger.debug(e.stack);
ctx.res.statusCode = 500;
ctx.res.end();
}
Expand Down
5 changes: 3 additions & 2 deletions src/tunnel/tunnel-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,7 @@ class TunnelService {
return connection.transport.createConnection(ctx.opts, callback);
}

let prev;
do {
const node = this._clusterService.getNode(next.node);
if (node && !next.local) {
Expand All @@ -690,9 +691,9 @@ class TunnelService {
port: ctx.ingress.port,
}, callback);
}
const prev = next;
prev = next;
next = this._tunnels.getNextConnection(tunnelId);
} while (next != undefined && next.id != prev.id);
} while (next != undefined && next.id != prev?.id);

return false;
}
Expand Down
80 changes: 78 additions & 2 deletions test/system/ingress/test_http_ingress.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import EventBus from "../../../src/cluster/eventbus.js";
import Config from "../../../src/config.js";
import Ingress from "../../../src/ingress/index.js";
import TunnelService from "../../../src/tunnel/tunnel-service.js";
import { initClusterService, initStorageService, wsSocketPair, wsmPair } from "../../unit/test-utils.ts";
import WebSocketTransport from '../../../src/transport/ws/ws-transport.js';
import { createEchoHttpServer, initClusterService, initStorageService, wsSocketPair, wsmPair } from "../../unit/test-utils.ts";
import { setTimeout } from 'timers/promises';
import sinon from 'sinon';
import net from 'net'
import http from 'http';

describe('http ingress', () => {
let clock;
Expand Down Expand Up @@ -117,4 +118,79 @@ describe('http ingress', () => {
await sockPair.terminate();

}).timeout(2000);

it(`http ingress can handle websocket upgrades`, async () => {
const sockPair = await wsSocketPair.create(9000)
const [sock1, sock2] = wsmPair(sockPair)
const echoServer = await createEchoHttpServer(20000);

sock2.on('connection', (sock) => {
const targetSock = new net.Socket();
targetSock.connect({
host: 'localhost',
port: 20000
}, () => {
targetSock.pipe(sock);
sock.pipe(targetSock);
});

const close = () => {
targetSock.unpipe(sock);
sock.unpipe(targetSock);
sock.destroy();
targetSock.destroy();
};

targetSock.on('close', close);
sock.on('close', close);
sock.on('error', () => {
close();
});
targetSock.on('error', () => {
close();
});
});

let res = await tunnelService.connect(tunnel.id, account.id, sock1, {peer: "127.0.0.1"});
assert(res == true, "failed to connect tunnel");

let i = 0;
let tun;
do {
await setTimeout(100);
tun = await tunnelService._get(tunnel.id)
} while (tun.state().connected == false && i++ < 10);
assert(tun.state().connected == true, "tunnel not connected")

const req = http.request({
hostname: 'localhost',
port: 10000,
method: 'GET',
path: '/ws',
headers: {
"Host": `${tunnel.id}.localhost.example`,
"Connection": 'Upgrade',
"Upgrade": 'websocket',
"Origin": `http://${tunnel.id}.localhost.example`,
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
"Sec-WebSocket-Version": "13"
}
});

const done = (resolve) => {
req.on('upgrade', (res, socket, head) => {
const body = head.subarray(2);
resolve(body);
});
};
req.end();

const wsRes = await new Promise(done);
assert(wsRes.equals(Buffer.from("ws echo connected")), `got ${wsRes}`);

await sock1.destroy();
await sock2.destroy();
await sockPair.terminate();
echoServer.destroy();
});
});
88 changes: 87 additions & 1 deletion test/unit/test-utils.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import * as http from 'node:http';
import { Duplex } from 'node:stream';
import * as url from 'node:url';
import { WebSocket, WebSocketServer } from "ws";
import ClusterService from "../../src/cluster/index.js";
import { StorageService } from "../../src/storage/index.js";
import { Duplex } from 'stream';
import { WebSocketMultiplex } from "@exposr/ws-multiplex";

export const initStorageService = async () => {
Expand Down Expand Up @@ -89,4 +91,88 @@ export const wsmPair = (socketPair: wsSocketPair, options?: Object): Array<WebSo
reference: "wsm2"
});
return [wsm1, wsm2];
};

export const createEchoHttpServer = async (port = 20000) => {

const echoRequest = (request: http.IncomingMessage, response: http.ServerResponse) => {
let body: Array<Buffer> = [];
request.on('data', (chunk: Buffer) => {
body.push(chunk);
}).on('end', () => {
const buf = Buffer.concat(body).toString();
response.statusCode = 200;
response.end(buf);
});
};

const fileGenerator = (size: number, chunkSize: number, response: http.ServerResponse) => {
let sentBytes: number = 0;

response.statusCode = 200;
response.setHeader("Content-Type", "application/octet-stream");
response.setHeader('Content-Disposition', 'attachment; filename="file.bin"');
response.setHeader("Content-Length", size);

const writeChunk = () => {
if (sentBytes < size) {
const remainingBytes = size - sentBytes;
const chunkToSend = Math.min(chunkSize, remainingBytes);

const buffer = Buffer.alloc(chunkToSend);
response.write(buffer);

sentBytes += chunkToSend;

setTimeout(writeChunk, 0);
} else {
response.end();
}
}

writeChunk();
};

const wss = new WebSocketServer({ noServer: true });
const handleUpgrade = (async (request: http.IncomingMessage, socket: Duplex, head: Buffer) => {
const parsedUrl = url.parse(<string>request.url, true)
if (parsedUrl.pathname != '/ws') {
socket.write(`HTTP/${request.httpVersion} 404 Not found\r\n`);
socket.end();
socket.destroy();
}

wss.handleUpgrade(request, socket, head, (ws) => {
ws.send("ws echo connected");
ws.on('message', (data) => {
ws.send(data);
});
});
});

const handleRequest = (request: http.IncomingMessage, response: http.ServerResponse) => {

const parsedUrl = url.parse(<string>request.url, true)

if (request.method == "GET" && parsedUrl.pathname == '/file') {
const size = Number(parsedUrl.query["size"] || "32");
const chunkSize = Number(parsedUrl.query["chunk"] || "262144");
return fileGenerator(size, chunkSize, response);
} else {
return echoRequest(request, response);
}
}

const server = http.createServer();
server.on('request', handleRequest);
server.on('upgrade', handleUpgrade);

server.listen(port);
return {
destroy: () => {
server.removeAllListeners('request');
server.removeAllListeners('upgrade');
server.close();
}
};
};

0 comments on commit b00bb9a

Please sign in to comment.