diff --git a/packages/local-relay/README.md b/packages/local-relay/README.md new file mode 100644 index 00000000..5af08ddd --- /dev/null +++ b/packages/local-relay/README.md @@ -0,0 +1,11 @@ +# `@fitbit/local-relay` + +> TODO: description + +## Usage + +``` +const localRelay = require('@fitbit/local-relay'); + +// TODO: DEMONSTRATE API +``` diff --git a/packages/local-relay/package.json b/packages/local-relay/package.json new file mode 100644 index 00000000..861ef366 --- /dev/null +++ b/packages/local-relay/package.json @@ -0,0 +1,50 @@ +{ + "name": "@fitbit/local-relay", + "version": "1.8.0-pre.0", + "description": "Local implementation of the Developer Relay", + "author": "Fitbit, Inc.", + "homepage": "https://github.com/Fitbit/developer-bridge/tree/main/packages/local-relay#readme", + "license": "BSD-3-Clause", + "main": "lib/local-relay.js", + "publishConfig": { + "registry": "http://registry.npmjs.org/" + }, + "repository": "github:Fitbit/developer-bridge", + "scripts": { + "build": "rm -rf lib tsconfig.tsbuildinfo && tsc -b", + "prepublishOnly": "yarn run build" + }, + "bugs": { + "url": "https://github.com/Fitbit/developer-bridge/issues" + }, + "devDependencies": { + "@babel/core": "^7.16.0", + "@babel/preset-env": "^7.16.4", + "@babel/preset-typescript": "^7.16.0", + "@types/express": "^4.17.13", + "@types/jest": "^27.0.2", + "@types/node": "^16.10.2", + "@types/supertest": "^2.0.11", + "@types/websocket": "^1.0.4", + "babel-jest": "^27.3.1", + "jest": "^27.3.1", + "supertest": "^6.1.6" + }, + "dependencies": { + "express": "^4.17.1", + "typescript": "^4.4.3", + "uuid": "^8.3.2", + "websocket": "^1.0.34" + }, + "bin": { + "fitbit": "./lib/cli.js" + }, + "files": [ + "/lib/!(*.test|*.spec).{js,d.ts}", + "/lib/!(testUtils)**/!(*.test|*.spec).{js,d.ts}", + "/lib/**/*.json" + ], + "engines": { + "node": ">=8.6.0" + } +} diff --git a/packages/local-relay/src/CloseCode.ts b/packages/local-relay/src/CloseCode.ts new file mode 100644 index 00000000..dfee7c0d --- /dev/null +++ b/packages/local-relay/src/CloseCode.ts @@ -0,0 +1,8 @@ +enum CloseCode { + // These codes are specified by the RFC + // https://tools.ietf.org/html/rfc6455#section-7.4.1 + GoingAway = 1001, + PolicyViolation = 1008, +} + +export default CloseCode; diff --git a/packages/local-relay/src/Config.ts b/packages/local-relay/src/Config.ts new file mode 100644 index 00000000..0c24f84a --- /dev/null +++ b/packages/local-relay/src/Config.ts @@ -0,0 +1,62 @@ +import * as fs from "fs"; +import * as os from "os"; +import { join } from "path"; + +export function loadConfiguration(configPath: string) { + const config: Record = {}; + + const configFile = fs.readFileSync(configPath).toString(); + for (const line of configFile.split(os.EOL)) { + let [key, value] = line.split("=", 2); + if (!key || !value) { + continue; + } + + // tslint:disable-next-line no-param-reassign + key = key.trim().toUpperCase(); + value = value.trim(); + config[key] = value; + } + + return config; +} + +export const CONTAINER_VERSION = process.env.DOCKER_TAG || "no-docker-tag"; + +export const maxPayloadSizeCap = 1024 * 1024; + +export function validateMaxPayloadSize(maxPayload: any) { + const parsedMaxPayload = parseInt(maxPayload, 10); + if (!maxPayload || parsedMaxPayload > maxPayloadSizeCap) { + return maxPayloadSizeCap; + } + + // tslint:disable-next-line triple-equals + if (parsedMaxPayload != maxPayload) { + throw new Error("MAX_PAYLOAD must be a valid number"); + } + + return parsedMaxPayload; +} + +const settingsPath = process.env.SETTINGS_FILE; +const config = settingsPath ? loadConfiguration(settingsPath) : {}; + +const propertiesPath = process.env.PROPERTIES_FILE; +Object.assign(config, propertiesPath ? loadConfiguration(propertiesPath) : {}); + +export const signalUrl = config.SIGNAL_URL || "http://localhost:9001"; +export const maxPayload = validateMaxPayloadSize(config.MAX_PAYLOAD); + +export const relayWsUrlPrefix = + config.RELAY_WS_URL_PREFIX || "wss://027-v3-api-soa.fitbit.com/dbridge"; + +export const relayPkgName = "@fitbit/local-developer-relay"; +export const relayDirectoryName = "fitbit-local-relay"; +export const relayDirectoryPath = join(os.tmpdir(), relayDirectoryName); + +export const relayPidFileName = "pid.json"; +export const relayPidFilePath = join(relayDirectoryPath, relayPidFileName); + +export const relayLogFileName = "logs.txt"; +export const relayLogFilePath = join(relayDirectoryPath, relayLogFileName); diff --git a/packages/local-relay/src/Connection.ts b/packages/local-relay/src/Connection.ts new file mode 100644 index 00000000..ac093868 --- /dev/null +++ b/packages/local-relay/src/Connection.ts @@ -0,0 +1,93 @@ +import * as websocket from "websocket"; + +import CloseCode from "./CloseCode"; +import { maxPayload } from "./Config"; + +export default class Connection { + private slavePeer?: websocket.connection; + + constructor(private masterPeer: websocket.connection) { + this.masterPeer.on("message", this.onMasterMessage); + this.masterPeer.on("close", this.onMasterClose); + } + + public isHalfOpen(): boolean { + return !this.slavePeer; + } + + public connectPeer(slavePeer: websocket.connection): void { + this.slavePeer = slavePeer; + this.slavePeer.on("message", this.onSlaveMessage); + this.slavePeer.on("close", this.onSlaveClose); + this.sendHostHello(); + } + + public close(code: CloseCode): void { + this.masterPeer.close(code); + if (this.slavePeer) { + this.slavePeer.close(code); + } + } + + private sendHostHello(): void { + this.masterPeer.send( + JSON.stringify({ + maxPayload, + relayEvent: "connection", + }) + ); + } + + private forwardMessage( + destination: websocket.connection | undefined, + data: websocket.Message + ) { + let payload; + if (data.type === "utf8") { + payload = data.utf8Data!; + } else if (data.type === "binary") { + payload = data.binaryData!; + } else { + console.error(`Invalid payload type: ${(data as any).type}`); + this.close(CloseCode.PolicyViolation); + return; + } + + if (destination) { + destination.send(payload); + } else { + this.close(CloseCode.PolicyViolation); + } + } + + private onMasterMessage = (data: websocket.Message) => { + this.forwardMessage(this.slavePeer, data); + }; + + private onSlaveMessage = (data: websocket.Message) => + this.forwardMessage(this.masterPeer, data); + + private forwardClose( + destination: websocket.connection | undefined, + code: number, + message: string + ) { + if (destination) { + const skipCloseFrame = + code === websocket.connection.CLOSE_REASON_ABNORMAL; + if (skipCloseFrame) { + destination.drop(code, message, true); + } else { + destination.close(code, message); + } + } + } + + private onMasterClose = (code: number, message: string) => { + this.forwardClose(this.slavePeer, code, message); + }; + + private onSlaveClose = (code: number, message: string) => { + this.forwardClose(this.masterPeer, code, message); + }; +} diff --git a/packages/local-relay/src/ConnectionManager.ts b/packages/local-relay/src/ConnectionManager.ts new file mode 100644 index 00000000..e55b987f --- /dev/null +++ b/packages/local-relay/src/ConnectionManager.ts @@ -0,0 +1,58 @@ +import * as websocket from "websocket"; + +import CloseCode from "./CloseCode"; +import Connection from "./Connection"; + +export default class ConnectionManager { + private connections = new Map(); + + public canConnectSlave(connectionID: string) { + const master = this.connections.get(connectionID); + return !!master && master.isHalfOpen(); + } + + public connectMaster(connectionID: string, ws: websocket.connection): void { + if (this.connections.has(connectionID)) { + throw new Error(`Connection ID ${connectionID} collided`); + } + + ws.on("close", this.onClose.bind(this, connectionID)); + + this.connections.set(connectionID, new Connection(ws)); + } + + public connectSlave( + connectionID: string, + webSocket: websocket.connection + ): void { + const existingConnection = this.connections.get(connectionID); + if (!existingConnection) { + throw new Error(`No connection ID '${connectionID}'`); + } + + existingConnection.connectPeer(webSocket); + } + + public close(connectionID: string) { + const connection = this.connections.get(connectionID); + if (connection) { + connection.close(CloseCode.GoingAway); + } else { + console.info(`Connection ID '${connectionID}' is unknown; cannot close`); + } + } + + private onClose(connectionID: string): void { + console.info(`Connection ID '${connectionID}' was closed`); + + const connection = this.connections.get(connectionID); + if (connection) { + } else { + console.error( + `Connection ID '${connectionID}' was not found while trying to close it.` + ); + } + + this.connections.delete(connectionID); + } +} diff --git a/packages/local-relay/src/Host.ts b/packages/local-relay/src/Host.ts new file mode 100644 index 00000000..0461b953 --- /dev/null +++ b/packages/local-relay/src/Host.ts @@ -0,0 +1,77 @@ +import * as websocket from "websocket"; +import CloseCode from "./CloseCode"; +import Connection from "./Connection"; + +export type HostInfo = { + id: string; + displayName: string; + roles: string[]; +}; + +export default class Host { + private _id: string; + + get id(): string { + return this._id; + } + + private _displayName: string; + + get displayName(): string { + return this._displayName; + } + + private _roles: string[]; + + get roles(): string[] { + return this._roles; + } + + private connection?: Connection; + + constructor({ id, displayName, roles }: HostInfo) { + this._id = id; + this._displayName = displayName; + this._roles = roles; + } + + connect(ws: websocket.connection): void { + ws.on("close", () => { + this.connection = undefined; + }); + + this.connection = new Connection(ws); + } + + isConnected(): boolean { + return Boolean(this.connection); + } + + disconnect(): void { + if (!this.connection) { + console.info( + `Host ${this._id} is not connected; cannot close connection` + ); + + return; + } + + this.connection.close(CloseCode.GoingAway); + } + + connectPeer(peerWs: websocket.connection) { + if (!this.connection) { + throw new Error(`Host ${this._id} is not connected`); + } + + if (!this.canConnectPeer()) { + throw new Error(`Host ${this._id} is already connected to a peer`); + } + + this.connection.connectPeer(peerWs); + } + + canConnectPeer() { + return this.connection.isHalfOpen(); + } +} diff --git a/packages/local-relay/src/HostStore.ts b/packages/local-relay/src/HostStore.ts new file mode 100644 index 00000000..3b57fd0c --- /dev/null +++ b/packages/local-relay/src/HostStore.ts @@ -0,0 +1,34 @@ +import Host, { HostInfo } from "./Host"; + +export default class HostStore { + private hosts = new Map(); + + get(id: string): Host { + return this.hosts.get(id); + } + + addOrReplace(hostInfo: HostInfo) { + const host = new Host(hostInfo); + this.hosts.set(hostInfo.id, host); + return host; + } + + delete(id: string): Host | undefined { + const host = this.hosts.get(id); + host.disconnect(); + + if (this.hosts.delete(id)) { + return host; + } + + return undefined; + } + + listAll() { + return [...this.hosts.values()]; + } + + clearAll() { + return this.hosts.clear(); + } +} diff --git a/packages/local-relay/src/RelayServer.test.ts b/packages/local-relay/src/RelayServer.test.ts new file mode 100644 index 00000000..be1c3d0b --- /dev/null +++ b/packages/local-relay/src/RelayServer.test.ts @@ -0,0 +1,77 @@ +import supertest from 'supertest'; +import * as websocket from 'websocket'; + +import RelayServer from './RelayServer'; + +let server: RelayServer; +let httpRequest: supertest.SuperTest; +let wsUrl: string; + +beforeAll(() => { + server = new RelayServer(); + const port = server.listen(); + + httpRequest = supertest(server.httpServer); + wsUrl = `ws://localhost:${port}`; +}); + +afterAll(() => { + server.close(); +}); + +describe('auth', () => { + it('rejects if both host and client auth attempted', () => + expectWsError( + wsConnection(wsUrl, { + 'x-relay-host-id': 'test host id', + 'x-relay-host-display-name': 'test display name', + 'x-relay-resource': '/test', + }), + 400, + 'Ambiguous connection, both authenticated and connection token provided', + )); + + it('rejects if neither host nor client auth attempted', () => + expectWsError( + wsConnection(wsUrl), + 401, + 'Unrecognized connection type, neither authenticated nor connection token provided', + )); +}); + +function wsConnection( + url: string, + headers?: {}, +): Promise { + return new Promise((resolve, reject) => { + const client = new websocket.client(); + client.connect(url, undefined, undefined, headers); + client.once('connect', resolve); + client.once('connectFailed', reject); + }); +} + +async function expectWsError( + wsRequest: Promise, + statusCode: number, + expectedReason?: string, +) { + try { + await wsRequest; + throw new Error('Expected the ws request to fail'); + } catch (error) { + const [statusLine, _, ...headerLines] = error.message.split('\n'); + + expect(statusLine).toMatch( + `Server responded with a non-101 status: ${statusCode}`, + ); + + if (expectedReason) { + const [__, rejectReason] = headerLines + .map((l) => l.split(':')) + .find(([header]) => header === 'x-websocket-reject-reason'); + + expect(rejectReason).toMatch(expectedReason); + } + } +} diff --git a/packages/local-relay/src/RelayServer.ts b/packages/local-relay/src/RelayServer.ts new file mode 100644 index 00000000..feab72ad --- /dev/null +++ b/packages/local-relay/src/RelayServer.ts @@ -0,0 +1,220 @@ +import express from 'express'; +import * as websocket from 'websocket'; +import * as http from 'http'; +import * as net from 'net'; + +import HostStore from './HostStore'; +import Host from './Host'; + +type RequestResponse = { + status: number; +}; + +export default class RelayServer { + private app = express(); + public readonly httpServer: http.Server; + public readonly websocketServer: websocket.server; + + private hostStore = new HostStore(); + + constructor() { + this.app.disable('x-powered-by'); + + this.httpServer = http.createServer(this.app); + this.websocketServer = new websocket.server({ + httpServer: this.httpServer, + }); + + this.app + .disable('x-powered-by') + .get('/', this.onAppGetRoot) + .get('/1/user/-/developer-relay/hosts.json', this.lookupHosts.bind(this)) + .post( + '/1/user/-/developer-relay/hosts/:id', + this.createConnectionURL.bind(this), + ) + .delete('/connections/:connectionId', this.closeConnection.bind(this)); + + this.websocketServer.on('request', this.onRequest.bind(this)); + } + + public listen(port = 0): number { + this.httpServer.listen(port); + return (this.httpServer.address() as net.AddressInfo).port; + } + + public close(): void { + this.httpServer.close(); + } + + public get port(): number { + return (this.httpServer.address() as net.AddressInfo).port; + } + + private reject( + request: websocket.request, + status: number, + reason?: string, + ): RequestResponse { + const fitbitRequestUUID = + request.httpRequest.headers['x-fitbit-request-uuid']; + + if (fitbitRequestUUID) { + request.reject(status, reason, { + 'x-fitbit-request-uuid': fitbitRequestUUID, + }); + } else { + request.reject(status, reason); + } + + return { status }; + } + + private rejectWithError( + request: websocket.request, + status: number, + message: string, + ) { + console.error(message); + return this.reject(request, status, message); + } + + private accept(request: websocket.request, status = 200): RequestResponse { + request.accept(); + return { status }; + } + + private async onRequest(request: websocket.request) { + const headers = request.httpRequest.headers; + + const hostId = headers['x-relay-host-id'] as string; + const displayName = headers['x-relay-host-display-name'] as string; + const rolesHeader = headers['x-relay-host-roles'] as string; + const isHostConnectionAttempted = hostId && displayName; + + const requestedResource = + (headers['x-relay-resource'] as string) || request.resource; + const peerHostId = requestedResource.split('/')?.[1]; + const isClientConnectionAttempted = Boolean(peerHostId); + + if (isHostConnectionAttempted && isClientConnectionAttempted) { + return this.rejectWithError( + request, + 400, + 'Ambiguous connection, both authenticated and connection token provided', + ); + } + + if (!isHostConnectionAttempted && !isClientConnectionAttempted) { + return this.rejectWithError( + request, + 401, + 'Unrecognized connection type, neither authenticated nor connection token provided', + ); + } + + if (isHostConnectionAttempted) { + const host: Host = this.hostStore.addOrReplace({ + id: hostId, + displayName, + roles: this.getHostRoles(rolesHeader), + }); + + // Emitted after request.accept() + request.on('requestAccepted', (ws: websocket.connection) => { + host.connect(ws); + console.info(`Accepting host connection ID ${host.id}`); + }); + // "else" == "else if isClientConnectionAttempted", because of the 2 if statements above + } else { + const host: Host = this.hostStore.get(hostId); + + if (!host) { + return this.reject(request, 404, 'Invalid or expired connection token'); + } + + if (!host.isConnected()) { + return this.reject(request, 500, `Host ${host.id} is not connected`); + } + + if (!host.canConnectPeer()) { + return this.reject( + request, + 403, + `Host ${host.id} is already connected to a peer`, + ); + } + + // Emitted after request.accept() + request.on('requestAccepted', (ws: websocket.connection) => { + try { + host.connectPeer(ws); + } catch (error) { + return this.rejectWithError(request, 500, (error as Error).message); + } + }); + } + + // TODO: Add comments + return this.accept(request); + } + + private onAppGetRoot(_: express.Request, response: express.Response) { + response.status(426); + response.send('Upgrade Required'); + } + + private async lookupHosts(_: express.Request, response: express.Response) { + const hosts = this.hostStore.listAll(); + + response.json({ + hosts: hosts.map((host) => ({ + id: host.id, + displayName: host.displayName, + roles: host.roles, + state: host.canConnectPeer() ? 'available' : 'busy', + })), + }); + } + + private async createConnectionURL( + request: express.Request, + response: express.Response, + ) { + const { id } = request.params; + response.setHeader('content-type', 'text/uri-list'); + response.send( + `ws://localhost:${ + (this.httpServer.address() as net.AddressInfo).port + }/${id}\r\n`, + ); + } + + private closeConnection( + request: express.Request, + response: express.Response, + ) { + const { connectionId: id } = request.params; + this.hostStore.get(id).disconnect(); + response.status(204).end(); + } + + private getHostRoles(rolesHeader: string) { + if (!rolesHeader) return []; + + const roles = rolesHeader + .split(',') + .map((role: string) => role.trim().toUpperCase()); + + const roleRegex = /^[a-zA-Z0-9_]+$/; + + for (const role of roles) { + if (!role.match(roleRegex)) { + // Should I throw? + throw new Error(`Invalid role specified: ${role}`); + } + } + + return roles; + } +} diff --git a/packages/local-relay/src/index.ts b/packages/local-relay/src/index.ts new file mode 100755 index 00000000..6d20c1d0 --- /dev/null +++ b/packages/local-relay/src/index.ts @@ -0,0 +1,25 @@ +import { writeFile, mkdir } from "fs"; +import RelayServer from "./RelayServer"; +import { relayDirectoryPath, relayPidFilePath } from "./Config"; + +const server = new RelayServer(); +const port = server.listen(); + +const info = { port, pid: process.pid }; + +mkdir(relayDirectoryPath, (error) => { + if (error) { + if (error.code !== "EEXIST") throw error; + } + + writeFile( + relayPidFilePath, + JSON.stringify({ port, pid: process.pid }), + { flag: "w" }, + (error) => { + if (error) throw error; + + console.log(`Wrote relay info to ${relayPidFilePath}`, info); + } + ); +});