Skip to content

Commit

Permalink
cleanup, lint
Browse files Browse the repository at this point in the history
  • Loading branch information
Robin Gottfried committed Feb 28, 2020
1 parent d18ff4d commit 46645cb
Show file tree
Hide file tree
Showing 24 changed files with 2,140 additions and 950 deletions.
26 changes: 26 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module.exports = {
"env": {
"browser": false,
"node": true,
"commonjs": true,
"es6": true
},
"extends": "eslint:recommended",
"globals": {
"Atomics": "readonly",
"SharedArrayBuffer": "readonly"
},
"parserOptions": {
"ecmaVersion": 2018
},
"rules": {
"indent": [
"error", 2, {
"VariableDeclarator": { "var": 2, "let": 2, "const": 3 },
"ObjectExpression": "first",
"ArrayExpression": "first",
"SwitchCase": 1,
},
]
}
};
1 change: 1 addition & 0 deletions .project.vim
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
set wildignore+=node_modules,.nyc_output
276 changes: 131 additions & 145 deletions client/client.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2
'use strict';
const path = require('path');
const http = require('http');
const { checksum } = require('../lib');
const WebSocket = require('ws');
const { Messanger } = require('../server/ws-message');
const { getLogger } = require('../lib/logger');
const path = require('path'),
http = require('http'),
WebSocket = require('ws'),
{ Messanger } = require('../server/ws-message'),
{ getLogger } = require('../lib/logger'),


const debug = getLogger.debug({prefix: '\u001b[34mC:d', postfix: '\u001b[0m\n'}),
debug = getLogger.debug({prefix: '\u001b[34mC:d', postfix: '\u001b[0m\n'}),
info = getLogger.info({prefix: '\u001b[34mC:i', postfix: '\u001b[0m\n'}),
warning = getLogger.info({prefix: '\u001b[35mC:i', postfix: '\u001b[0m\n'}),
error = getLogger.error({prefix: '\u001b[31mC:!', postfix: '\u001b[0m\n'});
Expand All @@ -23,161 +21,49 @@ class RequestForwarder extends Object {
let parsed_uri = new URL(forward_base_uri);
if (parsed_uri.search) throw new Error("Search path is not implemented yet for forward base uri.");
if (!parsed_uri.protocol.match(/^https?:$/i)) throw new Error(`Only HTTP(s) protocol is implemented for forward base uri (got ${parsed_uri.protocol}).`);
debug(forward_base_uri);
debug(forward_base_uri);
this._forward_base_uri = parsed_uri;
this._ws = ws;
this._activeChannels = {};
this.state = ['> headers', '-'];
}

setState(ws, http) {
if (ws) this.state[0] = ws;
if (http) this.state[1] = http;
}

testResponse(message) {
this._send({
channel: message.channel,
event: 'headers',
data: {
statusCode: 200,
statusMessage: 'OK',
headers: {'content-type': 'text/plain; charset=utf-8'},
}
});
this._send({
channel: message.channel,
event: 'data',
data: new Buffer.from('Pong'),
});
this._send({
channel: message.channel,
event: 'end',
});
}

handle_request(message) {
const self = this;
const eventId = message.event;
let req;
const eventId = message.event,
channelId = message.channel;

switch(eventId) {
case 'headers':
this.setState('> headers');
const ireq = message.data;
if (ireq.url == '/__ws_proxy_test__') {
return this.testResponse(message);
}
debug(`< ${message.channel}: ${ireq.method} ${ireq.url}`);
let oreq_uri = new URL(this._forward_base_uri.toString()); // clone the original uri
oreq_uri.href = path.posix.join(oreq_uri.href, ireq.url);
const req_params = {
method: ireq.method,
headers: ireq.headers,
search: ireq.search,
}
let _send = this._send.bind(this);
let sender = function sender(event_id) {
return function (data) {
if (event_id == 'data') {
debug(`<: ${message.channel}: ${event_id} ${ireq.method} ${oreq_uri.pathname} ${data.length}`);
} else if (event_id == 'error') {
info(`<: ${message.channel}: ${event_id} ${ireq.method} ${oreq_uri.pathname}`);
} else {
debug(`<: ${message.channel}: ${event_id} ${ireq.method} ${oreq_uri.pathname}`);
}

if (event_id == 'end' || event_id == 'error') {
self._destroyChannel(message.channel);
}

self.setState('< ' + event_id);
_send({
channel: message.channel,
event: event_id,
data: data,
})
}
}
info(` > ${message.channel}: ${ireq.method} ${ireq.url} -> ${oreq_uri.toString()}`);
this.setState('> headers', '> headers ');
let reqTimeout;
req = this.__http.request(oreq_uri.toString(), req_params, function handleResponse(res) {
// res.setEncoding('utf8');
self.setState(null, '< headers ');
debug(`<: ${message.channel}: ${res.statusCode} ${res.statusMessage} / ${ireq.method} ${oreq_uri.pathname}`);
sender('headers')({
statusCode: res.statusCode,
statusMessage: res.statusMessage,
headers: res.headers,
});
res.on('data', sender('data'));
res.on('end', ()=> {
sender('end')();
clearTimeout(reqTimeout);
});
});
this._registerChannel(message.channel, req);
req.on('error', sender('error'));
reqTimeout = setTimeout(() => {
req.emit('error', 'The request timed out.');
}, this.maxChannelLivespan);
this._registerChannel(
new Channel(channelId, this, (channel)=> {
this._destroyChannel(channel);
}).onHeader(message)
);
break;
case 'data':
this.setState('> data');
req = this._activeChannels[message.channel];
if (req) {
debug(` :> data`);
req.write(message.data);
} else {
error(`Channel ${message.channel} not found. Did it expire (on data sent)?`);
}
this._activeChannels[channelId].onData(message);
break;
case 'end':
debug(` :> ${message.channel} end`);
this.setState('> end');
req = this._activeChannels[message.channel];
if (req) {
req.end();
this._destroyChannel(message.channel);
} else {
error(`Channel ${message.channel} not found. Did it expire (on response end)?`);
}
this._activeChannels[channelId].onEnd(message);
break;
default:
throw new Error(`Invalid message event ${eventId}.`);
}
}

_registerChannel(channelUrl, request) {
this._activeChannels[channelUrl] = request;
let self=this;
setTimeout(()=>self._onChannelTimeout(channelUrl), this.maxChannelLivespan+10);
_registerChannel(channel) {
this._activeChannels[channel.id] = channel;
channel.destructorCallack
}

_onChannelTimeout(channelUrl) {
if (this._destroyChannel(channelUrl)) {
// channel exists aftert timeout
info(`Connection timeout ${channelUrl}`);
this._send({
channel: channelUrl,
event: 'error',
data: 'Connection timeout',
});
}
}

_destroyChannel(channelUrl) {
if (this._activeChannels[channelUrl]) {
debug(`destroying channel ${channelUrl}`);
delete this._activeChannels[channelUrl];
return true;
} else {
return false;
}
}

_send(data) {
this._ws.send(data.channel, data.event, data.data);
if (this._activeChannels[channelUrl]) {
debug(`destroying channel ${channelUrl}`);
delete this._activeChannels[channelUrl];
return true;
} else {
return false;
}
}

on_message(message) {
Expand All @@ -189,6 +75,106 @@ class RequestForwarder extends Object {

exports.RequestForwarder = RequestForwarder;

class Channel extends Object {
constructor(id, handler, destructorCallack) {
super()
this.id = id;
this.forwardTo = handler._forward_base_uri.toString();
this.http = handler.__http;
this.ws = handler._ws;
this.destructorCallack = destructorCallack;


this.timeout = handler.maxChannelLivespan;
this._timeoutTimer = setTimeout(() => {
this.request.emit('error', 'The request timed out.');
}, this.timeout);
}

_send(event, data) {
if (event == 'data') {
debug(`<: ${this.id}: ${event} ${this.id} ${this.url} ${data.length}`);
} else if (event == 'error') {
info(`<: ${this.id}: ${event} ${this.id} ${this.url}`);
} else {
debug(`<: ${this.id}: ${event} ${this.id} ${this.url}`);
}

if (event == 'end' || event == 'error') {
this.destructor();
}
this.ws.send(this.id, event, data);
}

onMessage(message) {
const event = message.event;
if (this[event]) {
this[event](message);
} else {
error(`Invalid event received for url ${this.url}, channel ${this.id}.`);
}
return this;
}

onHeader(message) {
const ireq = message.data;
debug(`< ${this.id}: ${ireq.method} ${ireq.url}`);
const forwardToUrl = new URL(this.forwardTo); // clone the original uri
forwardToUrl.href = path.posix.join(forwardToUrl.href, ireq.url);
const requestParameters = {
method: ireq.method,
headers: ireq.headers,
search: ireq.search,
}
info(` > ${this.id}: ${ireq.method} ${ireq.url} -> ${forwardToUrl.toString()}`);
this.url = forwardToUrl.toString();
this.request = this.http.request(this.url, requestParameters, (res) => {
// res.setEncoding('utf8');
debug(`<: ${this.id}: ${res.statusCode} ${res.statusMessage} / ${ireq.method} ${forwardToUrl.pathname}`);
this._send('headers', {
statusCode: res.statusCode,
statusMessage: res.statusMessage,
headers: res.headers,
});
res.on('data', this.onHttpData.bind(this));
res.on('end', this.onHttpEnd.bind(this));
});
this.request.on('error', this.onHttpError.bind(this));
return this;
}

onHttpData(chunk) {
this._send('data', chunk);
}

onHttpEnd() {
this._send('end');
this.destructor();
}

onHttpError(error) {
this._send('error', error);
this.destructor();
}

onData(message) {
debug(` :> data`);
this.request.write(message.data);
}

onEnd() {
debug(` :> ${this.id} end`);
this.request.end();
}

destructor() {
// TODO: cleanup
clearTimeout(this._timeoutTimer);
if (this.destructorCallack) this.destructorCallack(this);
}

}


class WebSockProxyClient extends Object {

Expand All @@ -202,10 +188,10 @@ class WebSockProxyClient extends Object {
}

connect(wsServer, {
forwardTo='http://localhost',
websocketPath='/ws',
requestTimeout=5000,
}={}) {
forwardTo='http://localhost',
websocketPath='/ws',
requestTimeout=5000,
}) {
if (this.ws_) {
throw new Error('Attemt to open connection while there is active socket already.');
}
Expand Down
8 changes: 4 additions & 4 deletions client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//-- vim: ft=javascript tabstop=2 softtabstop=2 expandtab shiftwidth=2
const WebSockProxyClient = require('./client').WebSockProxyClient,
config = require('../config'),
{ debug, info, error } = require('../lib/logger');
{ info, error } = require('../lib/logger');


function usage() {
Expand Down Expand Up @@ -32,14 +32,14 @@ function Client(key, forwardTo) {
this.connect = function connect(serverUrl, config={forwardTo:forwardTo}) {
return new Promise((resolve, reject) => {
const connection = this.wsProxy.connect(serverUrl, config);
connection.on('error', function clientError(err) {
error(`An error occured while connecting to ${serverUrl}.
connection.on('error', function clientError(err) {
error(`An error occured while connecting to ${serverUrl}.
Error: ${err}.
Make sure the server is running on the address port specified?
`);
})
})
.on('open', function clientOnConnect() {
info(`Tunnel ${serverUrl} -> ${config.forwardTo} set up and ready.`);
})
Expand Down
Loading

0 comments on commit 46645cb

Please sign in to comment.