Skip to content

Commit

Permalink
refactor: storage subsystem
Browse files Browse the repository at this point in the history
* Convert to typescript
* Convert storageservice into a static storagemanager
* Simplify storage provider interface
  • Loading branch information
fredriklindberg committed Jan 25, 2024
1 parent b012e07 commit 186c93a
Show file tree
Hide file tree
Showing 45 changed files with 2,209 additions and 1,758 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
"devDependencies": {
"@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0",
"@types/better-sqlite3": "^7.6.8",
"@types/koa-joi-router": "^8.0.5",
"@types/mocha": "^10.0.1",
"@types/node": "^20.5.0",
"@types/pg": "^8.10.9",
"@types/port-numbers": "^5.0.0",
"@types/sinon": "^10.0.17",
"@types/ssh2": "^1.11.14",
Expand Down
36 changes: 24 additions & 12 deletions src/account/account-service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Storage from '../storage/index.js';
import Account from './account.js';
import crypto from 'crypto';
import { Logger } from '../logger.js';
import TunnelService from '../tunnel/tunnel-service.js';
import Storage, { ListState } from '../storage/storage.js';

type AccountListResult = {
cursor: string | null,
Expand Down Expand Up @@ -60,19 +60,22 @@ class AccountService {
return undefined;
}

const account = await this._db.read(normalizedId, Account);
const account = await this._db.read<Account>(normalizedId, Account);
if (!(account instanceof Account)) {
return undefined
}
return account;
}

public async create(): Promise<undefined | Account> {
let maxTries = 100;
let created;
let account;
let created: boolean | null;
let account: Account;
do {
account = new Account(AccountService.generateId());
account.created_at = new Date().toISOString();
account.updated_at = account.created_at;
created = await this._db.create(account.id, account);
created = await this._db.create(<string>account.id, account);
} while (!created && maxTries-- > 0);

if (!created) {
Expand All @@ -93,39 +96,48 @@ class AccountService {
await Promise.allSettled(tunnels.map((tunnelId) => {
return this.tunnelService.delete(tunnelId, accountId)
}));

return await this._db.delete(<string>account.id);
} catch (e) {
this.logger.error({
message: `Failed to delete account`,
accountId
});
return false;
}

await this._db.delete(account.id);
return true;
}

async update(accountId: string, callback: (account: Account) => void): Promise<undefined | Account> {
const normalizedId = AccountService.normalizeId(accountId);
if (normalizedId == undefined) {
return undefined;
}
return this._db.update(AccountService.normalizeId(normalizedId), Account, (account: Account) => {

const updatedAccount = await this._db.update(normalizedId, Account, async (account: Account) => {
callback(account);
account.updated_at = new Date().toISOString();
return true;
});
return updatedAccount ?? undefined
}

public async list(cursor: string | undefined, count: number = 10, verbose: boolean = false): Promise<AccountListResult> {
const res = await this._db.list(<any>cursor, count);

const data: Array<Account> = verbose ? (await this._db.read(res.data, Account) || []) : res.data.map((id: string) => {
const listState: ListState | undefined = cursor ? { cursor } : undefined;
let res = await this._db.list(listState, count);

const keys = res.keys;
if (res.pending > 0) {
res = await this._db.list(res, res.pending);
keys.push(...res.keys);
}

const data: Array<Account | null> = verbose ? (await this._db.read(keys, Account) || []) : keys.map((id: string) => {
return new Account(id);
});
return {
cursor: res.cursor,
accounts: data,
accounts: data.filter((d) => d != null) as Array<Account>,
}
}

Expand Down
29 changes: 21 additions & 8 deletions src/account/account-tunnel-service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import Tunnel from "../tunnel/tunnel.js";
import Account from "./account.js";
import Storage from '../storage/index.js';
import AccountService from "./account-service.js";
import { TunnelConfig } from "../tunnel/tunnel-config.js";
import Storage from "../storage/storage.js";

export default class AccountTunnelService {

Expand All @@ -17,10 +17,14 @@ export default class AccountTunnelService {
}

public async assignTunnel(tunnelConfig: TunnelConfig): Promise<boolean> {
const normalizedId = AccountService.normalizeId(<string>tunnelConfig.account);
if (normalizedId == undefined) {
return false;
}

const res = await this.storage.update(AccountService.normalizeId(tunnelConfig.account), Account, (account: Account) => {
if (!account.tunnels.includes(tunnelConfig.account)) {
account.tunnels.push(tunnelConfig.id);
const res = await this.storage.update(normalizedId, Account, async (account: Account) => {
if (!account.tunnels.includes(<string>tunnelConfig.account)) {
account.tunnels.push(<string>tunnelConfig.id);
}
account.updated_at = new Date().toISOString();
return true;
Expand All @@ -29,9 +33,13 @@ export default class AccountTunnelService {
}

public async unassignTunnel(tunnelConfig: TunnelConfig): Promise<boolean> {
const normalizedId = AccountService.normalizeId(<string>tunnelConfig.account);
if (normalizedId == undefined) {
return false;
}

const res = await this.storage.update(AccountService.normalizeId(tunnelConfig.account), Account, (account: Account) => {
const pos = account.tunnels.indexOf(tunnelConfig.id);
const res = await this.storage.update(normalizedId, Account, async (account: Account) => {
const pos = account.tunnels.indexOf(<string>tunnelConfig.id);
if (pos >= 0) {
account.tunnels.splice(pos, 1);
}
Expand All @@ -42,11 +50,16 @@ export default class AccountTunnelService {
}

public async authorizedAccount(tunnel: Tunnel): Promise<Account> {
const account = await this.storage.read(AccountService.normalizeId(tunnel.account), Account);
const normalizedId = AccountService.normalizeId(<string>tunnel.account);
if (normalizedId == undefined) {
throw new Error("no_account_on_tunnel");
}

const account = await this.storage.read(normalizedId, Account);
if (!(account instanceof Account)) {
throw new Error("dangling_account");
}
if (!account.tunnels.includes(tunnel.id)) {
if (!account.tunnels.includes(<string>tunnel.id)) {
this.assignTunnel(tunnel.config);
}
return account;
Expand Down
6 changes: 3 additions & 3 deletions src/account/account.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ type AccountStatus = {
}

class Account implements Serializable {
public accountId: string;
public id: string;
public accountId?: string;
public id?: string;
public created_at?: string;
public updated_at?: string;
public tunnels: Array<string>;
public status: AccountStatus;

constructor(accountId: string) {
constructor(accountId?: string) {
this.accountId = accountId;
this.id = accountId;
this.created_at = undefined;
Expand Down
6 changes: 3 additions & 3 deletions src/controller/admin-api-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class AdminApiController extends KoaController {
const accountProps = (account: Account) => {
return {
account_id: account.id,
account_id_hr: AccountService.formatId(account.id),
account_id_hr: AccountService.formatId(<string>account.id),
tunnels: account.tunnels,
status: account.status,
created_at: account.created_at,
Expand Down Expand Up @@ -298,7 +298,7 @@ class AdminApiController extends KoaController {
handler: [handleAdminAuth, handleError, async (ctx, next) => {
try {
const tunnel = await this._tunnelService.lookup(ctx.params.tunnel_id);
const result = await this._tunnelService.delete(tunnel.id, tunnel.account);
const result = await this._tunnelService.delete(<string>tunnel.id, <string>tunnel.account);
if (result) {
ctx.status = 204;
} else {
Expand Down Expand Up @@ -327,7 +327,7 @@ class AdminApiController extends KoaController {
handler: [handleAdminAuth, handleError, async (ctx, next) => {
try {
const tunnel = await this._tunnelService.lookup(ctx.params.tunnel_id);
const res = await this._tunnelService.disconnect(tunnel.id, tunnel.account);
const res = await this._tunnelService.disconnect(<string>tunnel.id, <string>tunnel.account);
ctx.status = 200;
ctx.body = {
result: res
Expand Down
4 changes: 2 additions & 2 deletions src/controller/api-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class ApiController extends KoaController {
const accountProps = (account: Account) => {
return {
account_id: account.id,
account_id_hr: AccountService.formatId(account.id),
account_id_hr: AccountService.formatId(<string>account.id),
}
};

Expand Down Expand Up @@ -363,7 +363,7 @@ class ApiController extends KoaController {
}
ctx.status = 201;
ctx.body = {
token: Buffer.from(account.id).toString('base64'),
token: Buffer.from(<string>account.id).toString('base64'),
};
}]
});
Expand Down
89 changes: 41 additions & 48 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ import AdminController from './controller/admin-controller.js';
import ApiController from './controller/api-controller.js';
import ClusterManager from './cluster/cluster-manager.js';
import IngressManager from './ingress/ingress-manager.js';
import StorageManager from './storage/storage-manager.js';
import { Logger } from './logger.js';
import { StorageService } from './storage/index.js';
import TransportService from './transport/transport-service.js';
import Version from './version.js';
import Node from './cluster/cluster-node.js';
import TunnelService from './tunnel/tunnel-service.js';
import TunnelConnectionManager from './tunnel/tunnel-connection-manager.js';

export default async (argv) => {
const config = new Config(argv);
const logger = Logger();

const exitError = (err) => {
logger.error(`Failed to start up: ${err.message}`);
logger.debug(err.stack);
process.exit(-1);
}

logger.info(`exposrd ${Version.version.version}`);
logger.info({
node_id: Node.identifier,
Expand All @@ -28,55 +34,42 @@ export default async (argv) => {
process.exit(-1);
});

// Initialize storage and cluster service
const storageServiceReady = new Promise((resolve, reject) => {
try {
const storage = new StorageService({
callback: (err) => {
err ? reject(err) : resolve(storage);
},
url: config.get('storage-url'),
pgsql: {
poolSize: config.get('storage-pgsql-connection-pool-size'),
}
});
} catch (e) {
reject(e);
}
});

const clusterType = config.get('cluster');
const clusterServiceReady = ClusterManager.init(clusterType, {
key: config.get('cluster-key'),
redis: {
redisUrl: config.get('cluster-redis-url'),
},
udp: {
port: config.get('cluster-udp-port'),
discoveryMethod: config.get('cluster-udp-discovery') != 'auto' ? config.get('cluster-udp-discovery'): undefined,
multicast: {
group: config.get('cluster-udp-discovery-multicast-group')
try {
StorageManager.init(config.get('storage-url'), {
pgsql: {
poolSize: config.get('storage-pgsql-connection-pool-size'),
}
});
} catch (e) {
exitError(e);
}

try {
const clusterType = config.get('cluster');
ClusterManager.init(clusterType, {
key: config.get('cluster-key'),
redis: {
redisUrl: config.get('cluster-redis-url'),
},
kubernetes: {
serviceNameEnv: config.get('cluster-udp-discovery-kubernetes-service-env'),
namespaceEnv: config.get('cluster-udp-discovery-kubernetes-namespace-env'),
serviceName: config.get('cluster-udp-discovery-kubernetes-service'),
namespace: config.get('cluster-udp-discovery-kubernetes-namespace'),
clusterDomain: config.get('cluster-udp-discovery-kubernetes-cluster-domain'),
udp: {
port: config.get('cluster-udp-port'),
discoveryMethod: config.get('cluster-udp-discovery') != 'auto' ? config.get('cluster-udp-discovery'): undefined,
multicast: {
group: config.get('cluster-udp-discovery-multicast-group')
},
kubernetes: {
serviceNameEnv: config.get('cluster-udp-discovery-kubernetes-service-env'),
namespaceEnv: config.get('cluster-udp-discovery-kubernetes-namespace-env'),
serviceName: config.get('cluster-udp-discovery-kubernetes-service'),
namespace: config.get('cluster-udp-discovery-kubernetes-namespace'),
clusterDomain: config.get('cluster-udp-discovery-kubernetes-cluster-domain'),
}
}
}
});

const [storageService, _] = await Promise
.all([
storageServiceReady,
clusterServiceReady
])
.catch((err) => {
logger.error(`Failed to start up: ${err.message}`);
logger.debug(err.stack);
process.exit(-1);
});
} catch (e) {
exitError(e);
}

// Setup tunnel data ingress (incoming tunnel data)
const ingressReady = IngressManager.listen({
Expand Down Expand Up @@ -219,8 +212,8 @@ export default async (argv) => {
adminController.destroy(),
transport.destroy(),
IngressManager.close(),
storageService.destroy(),
ClusterManager.close(),
StorageManager.close(),
config.destroy(),
]);

Expand Down
6 changes: 3 additions & 3 deletions src/ingress/http-ingress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ export default class HttpIngress implements IngressBase {
headers[HTTP_HEADER_EXPOSR_VIA] = Node.identifier;
}

if (TunnelConnectionManager.isLocalConnected(tunnel.id)) {
if (TunnelConnectionManager.isLocalConnected(<string>tunnel.id)) {
// Delete connection header if tunnel is
// locally connected and it's not an upgrade request
if (!isUpgrade) {
Expand Down Expand Up @@ -344,7 +344,7 @@ export default class HttpIngress implements IngressBase {
method: req.method,
};

const agent = opt.agent = this._getAgent(tunnel.id, req);
const agent = opt.agent = this._getAgent(<string>tunnel.id, req);
opt.headers = this._requestHeaders(req, tunnel, baseUrl, false);

this.logger.isTraceEnabled() &&
Expand Down Expand Up @@ -430,7 +430,7 @@ export default class HttpIngress implements IngressBase {
port: this.httpListener.getPort(),
}
};
const target = TunnelConnectionManager.createConnection(tunnel.id, ctx, (err) => {
const target = TunnelConnectionManager.createConnection(<string>tunnel.id, ctx, (err) => {
if (!err) {
return;
}
Expand Down
2 changes: 1 addition & 1 deletion src/ingress/sni-ingress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ export default class SNIIngress implements IngressBase {
},
};

const targetSock = TunnelConnectionManager.createConnection(tunnel.id, ctx, (err, sock) => {
const targetSock = TunnelConnectionManager.createConnection(<string>tunnel.id, ctx, (err, sock) => {
if (err) {
logError(err);
return;
Expand Down
Loading

0 comments on commit 186c93a

Please sign in to comment.