Skip to content

Commit

Permalink
performance - request bodu is resent in chuncks
Browse files Browse the repository at this point in the history
all parameters are taken from command line now
code cleanup
  • Loading branch information
Robin Gottfried committed Feb 18, 2020
1 parent 996428f commit 9f6a525
Show file tree
Hide file tree
Showing 10 changed files with 472 additions and 257 deletions.
142 changes: 142 additions & 0 deletions client/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
'use strict';
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2
const path = require('path');
const http = require('http');
const WebSocket = require('ws');
const WsJsonProtocol = require('../lib/ws-json');


class RequestForwarder extends Object {
constructor(ws, forward_base_uri) {
super();
this.maxChannelLivespan = 5000; // in milliseconds
if (!forward_base_uri) throw new Error("Missing the base uri to forward to.");
let parsed_uri = new URL(forward_base_uri);
if (parsed_uri.search) throw new Error("Search path is not implemented yet for forward base uri.");
if (!parsed_uri.protocol.match(/^https?:$/i)) throw new Error(`Only HTTP(s) protocol is implemented for forward base uri (got ${parsed_uri.protocol}).`);
console.log(forward_base_uri);
this._forward_base_uri = parsed_uri;
this._ws = ws;
this._activeChannels = {};
}

handle_request(message) {
const eventId = message.event;
let req;
switch(eventId) {
case 'headers':
const ireq = message.data;
console.log(`< ${message.channel}: ${ireq.method} ${ireq.url}`);
let oreq_uri = new URL(this._forward_base_uri.toString()); // clone the original uri
oreq_uri.href = path.posix.join(oreq_uri.href, ireq.url);
const req_params = {
method: ireq.method,
headers: ireq.headers,
}
let _send = this._send.bind(this);
let sender = function sender(event_id) {
return function (data) {
if (event_id != 'data')
console.log(`<: ${message.channel}: ${event_id} ${ireq.method} ${oreq_uri.pathname}`);
_send({
channel: message.channel,
id: message.id,
event: event_id,
data: data,
})
}
}
console.log(` :> ${message.channel}: ${ireq.method} ${oreq_uri.toString()}`);
req = http.request(oreq_uri.toString(), req_params, function handleResponse(res) {
res.setEncoding('utf8');
console.log(`<: ${message.channel}: ${res.statusCode} ${res.statusMessage} / ${ireq.method} ${oreq_uri.pathname}`);
sender('headers')({
statusCode: res.statusCode,
statusMessage: res.statusMessage,
headers: res.headers,
});
res.on('data', sender('data'));
res.on('end', sender('end'));
});
req.on('error', sender('error'));
this._registerChannel(message.channel, req);
break;
case 'data':
req = this._activeChannels[message.channel];
if (req) {
try {
console.log(` :> DATA[${message.data}]`);
req.write(message.data);
} catch(err) {
console.log('data is object', message);
throw err;
}
} else {
console.error(`Channel ${message.channel} not found. Did it expire?`);
}
break;
case 'end':
req = this._activeChannels[message.channel];
if (req) {
req.end();
this._destroyChannel(message.channel);
} else {
console.error(`Channel ${message.channel} not found. Did it expire?`);
}
break;
default:
throw new Error(`Invalid message event ${eventId}.`);
}
}

_registerChannel(channelUrl, request) {
this._activeChannels[channelUrl] = request;
let self=this;
setTimeout(()=>self._destroyChannel(channelUrl), this.maxChannelLivespan);
}

_destroyChannel(channelUrl) {
if (this._activeChannels[channelUrl]) delete this._activeChannels[channelUrl];
}

_send(data) {
this._ws.send(data);
}

on_message(message) {
if (!message.channel || message.channel.indexOf('/req/') != 0) {
return;
} else {
this.handle_request(message);
}
}
}

class WebSockProxyClient extends Object {

constructor(client_key) {
super();
this.key = client_key;
}

connect(host_port, {forward_to = 'http://localhost', websocket_path = '/ws'}={}) {
const ws_ = new WebSocket(`ws://${host_port}${websocket_path}/${this.key}`);
const ws = new WsJsonProtocol(ws_);

ws.on('open', function open() {
const request_forwarder = new RequestForwarder(ws, forward_to);
console.log("Client connection openned.");

ws.send({data:"Hallo."});
ws.on("message", function (message) {
request_forwarder.on_message(message);
});
ws.on("close", function onClose() {
console.log("Client connection closed.");
});
});
return ws;
}
}

exports.WebSockProxyClient = WebSockProxyClient
114 changes: 33 additions & 81 deletions client/index.js
Original file line number Diff line number Diff line change
@@ -1,94 +1,46 @@
'use strict';
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2
const path = require('path');
const http = require('http');
const WebSocket = require('ws');
const WsJsonProtocol = require('../lib/ws-json');
const WebSockProxyClient = require('./client').WebSockProxyClient;

const client_key = process.argv[2];
const server_host_port = process.argv[3];
const forward_base_uri = process.argv[4];

if (client_key === undefined) {
throw new Error("Missing client key.");
}

const ws_ = new WebSocket(`ws://${server_host_port}/ws/${client_key}`);
const ws = new WsJsonProtocol(ws_);
function usage() {
console.log(`USAGE:
class RequestForwarder extends Object {
constructor(ws, forward_base_uri) {
super();
if (!forward_base_uri) throw new Error("Missing the base uri to forward to.");
let parsed_uri = new URL(forward_base_uri);
if (parsed_uri.search) throw new Error("Search path is not implemented yet for forward base uri.");
if (!parsed_uri.protocol.match(/^https?:$/i)) throw new Error(`Only HTTP(s) protocol is implemented for forward base uri (got ${parsed_uri.protocol}).`);
this._forward_base_uri = parsed_uri;
this._ws = ws;
}
client <client-key> <server_host>[:<server_port>] [forward_to]
fire_request(message, ) {
const ireq = message.request;
console.log(`< ${message.channel}: ${ireq.method} ${ireq.url}`);
let oreq_uri = new URL(this._forward_base_uri.toString()); // clone the original uri
oreq_uri.href = path.posix.join(oreq_uri.href, ireq.url);
const req_params = {
method: ireq.method,
headers: ireq.headers,
}
let _send = this._send.bind(this);
let sender = function sender(event_id) {
return function (data) {
if (event_id != 'data')
console.log(`<: ${message.channel}: ${event_id} ${ireq.method} ${oreq_uri.pathname}`);
_send({
channel: message.channel,
id: message.id,
event: event_id,
data: data,
})
}
}
console.log(` :> ${message.channel}: ${ireq.method} ${oreq_uri.pathname}`);
const req = http.request(oreq_uri.toString(), req_params, function handleResponse(res) {
res.setEncoding('utf8');
console.log(`<: ${message.channel}: ${res.statusCode} ${res.statusMessage} / ${ireq.method} ${oreq_uri.pathname}`);
sender('headers')({
statusCode: res.statusCode,
statusMessage: res.statusMessage,
headers: res.headers,
});
res.on('data', sender('data'));
res.on('end', sender('end'));
});
req.on('error', sender('error'));
if (ireq.body) {
req.write(ireq.body);
}
req.end();
}
client-key ... unique key to identify client on server
server_host ... hostname or ip address of websocket proxy server
server_port ... port of websocket proxy server
forward_to ... base uri to forward all requests to (defaults to
http://localhost)
_send(data) {
this._ws.send(data);
}
`);
}

on_message(message) {
if (!message.channel || message.channel.indexOf('/req/') != 0) return;
else this.fire_request(message);
}
function die(message, {edify=true}={}) {
if (edify) usage();
if (message) console.log(message);
process.exit();
}

const client_key = process.argv[2];
const server_host_port = process.argv[3];
const forward_to = process.argv[4];

ws.on('open', function open() {
const request_forwarder = new RequestForwarder(ws, forward_base_uri);
console.log("Client connection openned.");

ws.send({data:"Hallo."});
ws.on("message", function (message) {
request_forwarder.on_message(message);
});
ws.on("close", function onClose() {
console.log("Client connection closed.");
process.exit()
client_key || die("Missing client key.");
server_host_port || die("Missing server host:port.");


new WebSockProxyClient(client_key)
.connect(server_host_port, {forward_to: forward_to })
.on('error', function clientError() {
console.error(`Could not connect to remote server ${server_host_port}.
Make sure the server is running on the address port specified?
`);
})
.on('close', function clientOnClose() {
console.log('Connection closed, exitting.');
process.exit();
});
});
5 changes: 5 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"author": "",
"license": "proprietary",
"dependencies": {
"daemonize2": "^0.4.2",
"uuid": "^3.4.0",
"ws": "^7.2.1"
}
Expand Down
26 changes: 25 additions & 1 deletion server/HttpError.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,47 @@
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2

class HttpError extends Error {
constructor(code, message) {
constructor(description, code, message) {
super();
this.code = code ? code : this._code;
this.message = message ? message : this._msg;
this.description = description;
}

get _code() { return 500; }
get _msg() { return "Internal server error"; }

toString() {
return `${this.code} ${this.message}`;
}

toResponse(res) {
res.writeHead(this.code, {'content-type': 'application/json; charset=utf-8'});
res.write(this.description ? this.description : this.message);
res.end();
}
}

class BadGateway extends HttpError {
get _code() {return 502;}
get _msg() { return "Bad gateway"; }
}

class BadRequest extends HttpError {
get _code() {return 400;}
get _msg() { return "Bad request"; }
}

class Unauthorized extends HttpError {
get _code() {return 401;}
get _msg() { return "Unauthorized"; }
}

class NotFound extends HttpError {
get _code() {return 404;}
get _msg() { return "Not found"; }
}

exports.HttpError = HttpError;
exports.BadGateway = BadGateway;
exports.NotFound = NotFound;
Loading

0 comments on commit 9f6a525

Please sign in to comment.