Skip to content

Commit

Permalink
Local relay implementation
Browse files Browse the repository at this point in the history
Signed-off-by: rafasofizada <[email protected]>
  • Loading branch information
rafasofizada committed Nov 17, 2021
1 parent 9dcf9f7 commit 91d1135
Show file tree
Hide file tree
Showing 11 changed files with 715 additions and 0 deletions.
11 changes: 11 additions & 0 deletions packages/local-relay/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# `@fitbit/local-relay`

> TODO: description
## Usage

```
const localRelay = require('@fitbit/local-relay');
// TODO: DEMONSTRATE API
```
50 changes: 50 additions & 0 deletions packages/local-relay/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"name": "@fitbit/local-relay",
"version": "1.8.0-pre.0",
"description": "Local implementation of the Developer Relay",
"author": "Fitbit, Inc.",
"homepage": "https://github.com/Fitbit/developer-bridge/tree/main/packages/local-relay#readme",
"license": "BSD-3-Clause",
"main": "lib/local-relay.js",
"publishConfig": {
"registry": "http://registry.npmjs.org/"
},
"repository": "github:Fitbit/developer-bridge",
"scripts": {
"build": "rm -rf lib tsconfig.tsbuildinfo && tsc -b",
"prepublishOnly": "yarn run build"
},
"bugs": {
"url": "https://github.com/Fitbit/developer-bridge/issues"
},
"devDependencies": {
"@babel/core": "^7.16.0",
"@babel/preset-env": "^7.16.4",
"@babel/preset-typescript": "^7.16.0",
"@types/express": "^4.17.13",
"@types/jest": "^27.0.2",
"@types/node": "^16.10.2",
"@types/supertest": "^2.0.11",
"@types/websocket": "^1.0.4",
"babel-jest": "^27.3.1",
"jest": "^27.3.1",
"supertest": "^6.1.6"
},
"dependencies": {
"express": "^4.17.1",
"typescript": "^4.4.3",
"uuid": "^8.3.2",
"websocket": "^1.0.34"
},
"bin": {
"fitbit": "./lib/cli.js"
},
"files": [
"/lib/!(*.test|*.spec).{js,d.ts}",
"/lib/!(testUtils)**/!(*.test|*.spec).{js,d.ts}",
"/lib/**/*.json"
],
"engines": {
"node": ">=8.6.0"
}
}
8 changes: 8 additions & 0 deletions packages/local-relay/src/CloseCode.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
enum CloseCode {
// These codes are specified by the RFC
// https://tools.ietf.org/html/rfc6455#section-7.4.1
GoingAway = 1001,
PolicyViolation = 1008,
}

export default CloseCode;
62 changes: 62 additions & 0 deletions packages/local-relay/src/Config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import * as fs from "fs";
import * as os from "os";
import { join } from "path";

export function loadConfiguration(configPath: string) {
const config: Record<string, string> = {};

const configFile = fs.readFileSync(configPath).toString();
for (const line of configFile.split(os.EOL)) {
let [key, value] = line.split("=", 2);
if (!key || !value) {
continue;
}

// tslint:disable-next-line no-param-reassign
key = key.trim().toUpperCase();
value = value.trim();
config[key] = value;
}

return config;
}

export const CONTAINER_VERSION = process.env.DOCKER_TAG || "no-docker-tag";

export const maxPayloadSizeCap = 1024 * 1024;

export function validateMaxPayloadSize(maxPayload: any) {
const parsedMaxPayload = parseInt(maxPayload, 10);
if (!maxPayload || parsedMaxPayload > maxPayloadSizeCap) {
return maxPayloadSizeCap;
}

// tslint:disable-next-line triple-equals
if (parsedMaxPayload != maxPayload) {
throw new Error("MAX_PAYLOAD must be a valid number");
}

return parsedMaxPayload;
}

const settingsPath = process.env.SETTINGS_FILE;
const config = settingsPath ? loadConfiguration(settingsPath) : {};

const propertiesPath = process.env.PROPERTIES_FILE;
Object.assign(config, propertiesPath ? loadConfiguration(propertiesPath) : {});

export const signalUrl = config.SIGNAL_URL || "http://localhost:9001";
export const maxPayload = validateMaxPayloadSize(config.MAX_PAYLOAD);

export const relayWsUrlPrefix =
config.RELAY_WS_URL_PREFIX || "wss://027-v3-api-soa.fitbit.com/dbridge";

export const relayPkgName = "@fitbit/local-developer-relay";
export const relayDirectoryName = "fitbit-local-relay";
export const relayDirectoryPath = join(os.tmpdir(), relayDirectoryName);

export const relayPidFileName = "pid.json";
export const relayPidFilePath = join(relayDirectoryPath, relayPidFileName);

export const relayLogFileName = "logs.txt";
export const relayLogFilePath = join(relayDirectoryPath, relayLogFileName);
93 changes: 93 additions & 0 deletions packages/local-relay/src/Connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import * as websocket from "websocket";

import CloseCode from "./CloseCode";
import { maxPayload } from "./Config";

export default class Connection {
private slavePeer?: websocket.connection;

constructor(private masterPeer: websocket.connection) {
this.masterPeer.on("message", this.onMasterMessage);
this.masterPeer.on("close", this.onMasterClose);
}

public isHalfOpen(): boolean {
return !this.slavePeer;
}

public connectPeer(slavePeer: websocket.connection): void {
this.slavePeer = slavePeer;
this.slavePeer.on("message", this.onSlaveMessage);
this.slavePeer.on("close", this.onSlaveClose);
this.sendHostHello();
}

public close(code: CloseCode): void {
this.masterPeer.close(code);
if (this.slavePeer) {
this.slavePeer.close(code);
}
}

private sendHostHello(): void {
this.masterPeer.send(
JSON.stringify({
maxPayload,
relayEvent: "connection",
})
);
}

private forwardMessage(
destination: websocket.connection | undefined,
data: websocket.Message
) {
let payload;
if (data.type === "utf8") {
payload = data.utf8Data!;
} else if (data.type === "binary") {
payload = data.binaryData!;
} else {
console.error(`Invalid payload type: ${(data as any).type}`);
this.close(CloseCode.PolicyViolation);
return;
}

if (destination) {
destination.send(payload);
} else {
this.close(CloseCode.PolicyViolation);
}
}

private onMasterMessage = (data: websocket.Message) => {
this.forwardMessage(this.slavePeer, data);
};

private onSlaveMessage = (data: websocket.Message) =>
this.forwardMessage(this.masterPeer, data);

private forwardClose(
destination: websocket.connection | undefined,
code: number,
message: string
) {
if (destination) {
const skipCloseFrame =
code === websocket.connection.CLOSE_REASON_ABNORMAL;
if (skipCloseFrame) {
destination.drop(code, message, true);
} else {
destination.close(code, message);
}
}
}

private onMasterClose = (code: number, message: string) => {
this.forwardClose(this.slavePeer, code, message);
};

private onSlaveClose = (code: number, message: string) => {
this.forwardClose(this.masterPeer, code, message);
};
}
58 changes: 58 additions & 0 deletions packages/local-relay/src/ConnectionManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import * as websocket from "websocket";

import CloseCode from "./CloseCode";
import Connection from "./Connection";

export default class ConnectionManager {
private connections = new Map<string, Connection>();

public canConnectSlave(connectionID: string) {
const master = this.connections.get(connectionID);
return !!master && master.isHalfOpen();
}

public connectMaster(connectionID: string, ws: websocket.connection): void {
if (this.connections.has(connectionID)) {
throw new Error(`Connection ID ${connectionID} collided`);
}

ws.on("close", this.onClose.bind(this, connectionID));

this.connections.set(connectionID, new Connection(ws));
}

public connectSlave(
connectionID: string,
webSocket: websocket.connection
): void {
const existingConnection = this.connections.get(connectionID);
if (!existingConnection) {
throw new Error(`No connection ID '${connectionID}'`);
}

existingConnection.connectPeer(webSocket);
}

public close(connectionID: string) {
const connection = this.connections.get(connectionID);
if (connection) {
connection.close(CloseCode.GoingAway);
} else {
console.info(`Connection ID '${connectionID}' is unknown; cannot close`);
}
}

private onClose(connectionID: string): void {
console.info(`Connection ID '${connectionID}' was closed`);

const connection = this.connections.get(connectionID);
if (connection) {
} else {
console.error(
`Connection ID '${connectionID}' was not found while trying to close it.`
);
}

this.connections.delete(connectionID);
}
}
77 changes: 77 additions & 0 deletions packages/local-relay/src/Host.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import * as websocket from "websocket";
import CloseCode from "./CloseCode";
import Connection from "./Connection";

export type HostInfo = {
id: string;
displayName: string;
roles: string[];
};

export default class Host {
private _id: string;

get id(): string {
return this._id;
}

private _displayName: string;

get displayName(): string {
return this._displayName;
}

private _roles: string[];

get roles(): string[] {
return this._roles;
}

private connection?: Connection;

constructor({ id, displayName, roles }: HostInfo) {
this._id = id;
this._displayName = displayName;
this._roles = roles;
}

connect(ws: websocket.connection): void {
ws.on("close", () => {
this.connection = undefined;
});

this.connection = new Connection(ws);
}

isConnected(): boolean {
return Boolean(this.connection);
}

disconnect(): void {
if (!this.connection) {
console.info(
`Host ${this._id} is not connected; cannot close connection`
);

return;
}

this.connection.close(CloseCode.GoingAway);
}

connectPeer(peerWs: websocket.connection) {
if (!this.connection) {
throw new Error(`Host ${this._id} is not connected`);
}

if (!this.canConnectPeer()) {
throw new Error(`Host ${this._id} is already connected to a peer`);
}

this.connection.connectPeer(peerWs);
}

canConnectPeer() {
return this.connection.isHalfOpen();
}
}
Loading

0 comments on commit 91d1135

Please sign in to comment.