diff --git a/src/cluster/index.ts b/src/cluster/cluster-manager.ts similarity index 63% rename from src/cluster/index.ts rename to src/cluster/cluster-manager.ts index 6c71926..b90ae6c 100644 --- a/src/cluster/index.ts +++ b/src/cluster/cluster-manager.ts @@ -1,12 +1,27 @@ -import { strict as assert } from 'assert'; import crypto from 'node:crypto'; import MemoryEventBus from './memory-eventbus.js'; -import RedisEventBus from './redis-eventbus.js'; +import RedisEventBus, { RedisEventBusOptions } from './redis-eventbus.js'; import Node from './cluster-node.js'; import { Logger } from '../logger.js'; -import UdpEventBus from './udp-eventbus.js'; -import EventBus from './eventbus.js'; -import EventEmitter from 'node:events'; +import UdpEventBus, { UdpEventBusOptions } from './udp-eventbus.js'; +import { EmitMeta } from './eventbus.js'; +import EventBusInterface from './eventbus-interface.js'; + +export type ClusterManagerOptions = { + key?: string, + staleTimeout?: number, + removalTimeout?: number, + heartbeatInterval?: number, + redis?: RedisEventBusOptions, + udp?: UdpEventBusOptions, +} + +export enum ClusterManagerType { + REDIS = 'redis', + UDP = 'udp', + SINGLE_NODE = 'single-node', + MEM = 'mem', +} export type ClusterNode = { id: string, @@ -23,33 +38,26 @@ type ClusterServiceNode = ClusterNode & { removalTimer?: NodeJS.Timeout, } -class ClusterService extends EventEmitter { - private static instance: ClusterService | undefined; - private static ref: number; - - private logger: any; - private _key: string = ''; - private _nodes: { [key: string]: ClusterServiceNode } = {}; - private _listeners: Array = []; - private _seq: number = 0; - private _window_size!: number; - private _staleTimeout!: number; - private _removalTimeout!: number; - private _heartbeatInterval!: number; - private _bus: any; - private multiNode: boolean = false; - private _heartbeat: NodeJS.Timeout | undefined; - - constructor(type?: 'redis' | 'udp' | 'single-node' | 'mem', opts?: any) { - super(); - if (ClusterService.instance instanceof ClusterService) { - ClusterService.ref++; - return ClusterService.instance; - } - assert(type != null, "type not given"); - ClusterService.instance = this; - ClusterService.ref = 1; - +export type EmitCallback = (event: string, message: any, meta: EmitMeta) => void; + +class ClusterManager { + private static logger: any; + private static _key: string = ''; + private static _nodes: { [key: string]: ClusterServiceNode } = {}; + private static _listeners: Array = []; + private static _seq: number = 0; + private static _window_size: number; + private static _staleTimeout: number; + private static _removalTimeout: number; + private static _heartbeatInterval: number; + private static _bus: EventBusInterface; + private static multiNode: boolean = false; + private static _heartbeat: NodeJS.Timeout | undefined; + + private static initialized: boolean = false; + private static ready: boolean = false; + + public static async init(type: ClusterManagerType, opts?: ClusterManagerOptions): Promise { this.logger = Logger("cluster-service"); this._key = opts?.key || ''; this._nodes = {}; @@ -70,59 +78,70 @@ class ClusterService extends EventEmitter { this._heartbeatInterval = opts?.heartbeatInterval || 9500; this._listeners = []; - const onMessage = (payload: string) => { - this._receive(payload) - }; - const ready = async (err: Error) => { - if (err) { - await this.destroy(); - } - this.logger.info(`Clustering mode ${type} initialized`); - typeof opts.callback === 'function' && process.nextTick(() => opts.callback(err)); - }; + try { + await new Promise((resolve, reject) => { + + const ready = (err?: Error) => { + err ? reject(err) : resolve(undefined); + }; + + switch (type) { + case ClusterManagerType.REDIS: + this.multiNode = true; + this._bus = new RedisEventBus({ + ...opts?.redis, + callback: ready, + }) + break; + case ClusterManagerType.UDP: + this.multiNode = true; + this._bus = new UdpEventBus({ + ...opts?.udp, + callback: ready, + }); + break; + case ClusterManagerType.SINGLE_NODE: + case ClusterManagerType.MEM: + this.multiNode = false; + this._bus = new MemoryEventBus({ + callback: ready, + }); + break; + default: + reject(new Error(`no_such_cluster_type`)); + } + }); + } catch (e: any) { + throw e; + } - const getLearntPeers = () => { - return this._getLearntPeers(); - }; + this.initialized = true; + this.ready = false; + this.logger.info(`Clustering mode ${type} initialized`); + } - switch (type) { - case 'redis': - this.multiNode = true; - this._bus = new RedisEventBus({ - ...opts.redis, - callback: ready, - handler: onMessage, - }) - break; - case 'udp': - this.multiNode = true; - this._bus = new UdpEventBus({ - ...opts.udp, - callback: ready, - handler: onMessage, - getLearntPeers, - }); - break; - case 'single-node': - case 'mem': - this.multiNode = false; - this._bus = new MemoryEventBus({ - callback: ready, - handler: onMessage, - }); - break; - default: - assert.fail(`unknown type ${type}`); + public static async close(): Promise { + if (!this.initialized) { + return; } + this.stop(); + await this._bus.destroy(); + this._bus = undefined; + this.initialized = false; } - public async setReady(ready: boolean = true): Promise { - if (!ready) { - clearInterval(this._heartbeat); - return this.multiNode; + public static isMultinode(): boolean { + return this.multiNode; + } + + public static async start(): Promise { + if (this.ready) { + return; } + this.ready = true; + const heartbeat = () => { this.publish("cluster:heartbeat"); }; @@ -130,7 +149,7 @@ class ClusterService extends EventEmitter { this._heartbeat = setInterval(heartbeat, this._heartbeatInterval); if (!this.multiNode) { - return this.multiNode; + return; } const rapidHeartbeat = setInterval(heartbeat, 2000); @@ -143,24 +162,29 @@ class ClusterService extends EventEmitter { setTimeout(resolve, waitTime); }); clearInterval(rapidHeartbeat); - return this.multiNode; } - public attach(bus: EventBus): void { - this._listeners.push(bus); + public static stop(): void { + this.ready = false; + clearInterval(this._heartbeat); + this._heartbeat = undefined; + } + + public static attach(callback: EmitCallback): void { + this._listeners.push(callback); } - public detach(bus: EventBus): void { - this._listeners = this._listeners.filter((x) => x != bus); + public static detach(callback: EmitCallback): void { + this._listeners = this._listeners.filter((x) => x != callback); } - private _getLearntPeers(): Array { + public static getLearntPeers(): Array { return Array.from(new Set(Object.keys(this._nodes) .filter((k) => !this._nodes[k].stale) .map((k) => this._nodes[k].ip))); } - private _learnNode(node: ClusterNode): ClusterServiceNode | undefined { + private static _learnNode(node: ClusterNode): ClusterServiceNode | undefined { if (node?.id == undefined) { return undefined; } @@ -209,16 +233,15 @@ class ClusterService extends EventEmitter { return cnode; } - private _forgetNode(node: ClusterServiceNode): void { + private static _forgetNode(node: ClusterServiceNode): void { delete this._nodes[node.id]; this.logger.info({ message: `Node ${node.id} ${node.host} (${node.ip}) permanently removed from peer list`, total_nodes: Object.keys(this._nodes).length, }); - this.emit('removed', {nodeId: node.id}); } - private _staleNode(node: ClusterServiceNode): void { + private static _staleNode(node: ClusterServiceNode): void { if (!this._nodes[node?.id]) { return; } @@ -227,10 +250,9 @@ class ClusterService extends EventEmitter { message: `marking ${node.id} as stale` }); - this.emit('stale', {nodeId: node.id}); } - public getSelf(): ClusterNode { + public static getSelf(): ClusterNode { return { id: Node.identifier, host: Node.hostname, @@ -240,7 +262,7 @@ class ClusterService extends EventEmitter { }; } - public getNode(id: string): ClusterNode | undefined { + public static getNode(id: string): ClusterNode | undefined { const node: ClusterServiceNode = this._nodes[id]; if (node?.stale === false) { return { @@ -255,7 +277,7 @@ class ClusterService extends EventEmitter { } } - public getNodes(): Array { + public static getNodes(): Array { return Object.keys(this._nodes).map((k) => { return { id: this._nodes[k].id, @@ -267,7 +289,7 @@ class ClusterService extends EventEmitter { }) } - private _receive(payload: string): boolean | Error { + public static receive(payload: string): boolean | Error { try { const msg = JSON.parse(payload); const {s, ...data} = msg; @@ -297,7 +319,7 @@ class ClusterService extends EventEmitter { let csnode: ClusterServiceNode | undefined = this._nodes[cnode.id]; if (event == 'cluster:heartbeat' || csnode == undefined) { - csnode = this._learnNode(node); + csnode = this._learnNode(cnode); } if (csnode == undefined) { @@ -324,7 +346,7 @@ class ClusterService extends EventEmitter { } csnode.seq_win |= (1 << rel_seq); - this._listeners.forEach((l) => l._emit(event, message, { + this._listeners.forEach((cb) => cb(event, message, { node: { id: cnode.id, ip: cnode.ip, @@ -342,7 +364,7 @@ class ClusterService extends EventEmitter { } } - public async publish(event: any, message: any = {}) { + public static async publish(event: any, message: any = {}) { const payload = { event, message, @@ -366,14 +388,6 @@ class ClusterService extends EventEmitter { return this._bus.publish(JSON.stringify(msg)); } - public async destroy() { - if (--ClusterService.ref == 0) { - await this._bus.destroy(); - this._bus = undefined; - this.removeAllListeners(); - ClusterService.instance = undefined; - } - } } -export default ClusterService; \ No newline at end of file +export default ClusterManager; \ No newline at end of file diff --git a/src/cluster/cluster-node.js b/src/cluster/cluster-node.js deleted file mode 100644 index 2f45e9d..0000000 --- a/src/cluster/cluster-node.js +++ /dev/null @@ -1,59 +0,0 @@ -import crypto from 'crypto'; -import os from 'os'; - -class Node { - static hostname = `${process.pid}@${os.hostname}`; - static identifier = crypto.createHash('sha1').update(`${Date.now() + Math.random()}`).digest('hex'); - static interface = Node.getNetworkInterface(); - - static address4 = Node._getIP(Node.interface, 'IPv4'); - static address6 = Node._getIP(Node.interface, 'IPv6'); - static address = Node.address4 || Node.address6 || '0.0.0.0'; - - static getIP() { - return Node._getIP(Node.interface, 'IPv4') || Node._getIP(Node.interface, 'IPv6'); - } - - static _getIP(iface, family) { - const addresses = os.networkInterfaces()[iface]; - if (!addresses) { - return undefined; - } - return addresses.filter((addr) => { return addr.family == family; })[0]?.address; - } - - static getNetworkInterface(iface) { - const interfaces = os.networkInterfaces(); - - if (iface != undefined && interfaces[iface]) { - return iface; - } - - Object.keys(interfaces).forEach((element) => { - const addresses = interfaces[element].filter(entry => !entry.internal); - if (addresses.length == 0) { - delete interfaces[element]; - } - }); - - const names = Object.keys(interfaces); - names.sort((a, b) => { - - const haveProperty = (array, predicate) => { - return array.filter(predicate).length; - } - - const score = (element) => { - const addresses = interfaces[element]; - return haveProperty(addresses, (e) => {return e.family == 'IPv4'}) + - haveProperty(addresses, (e) => {return e.family == 'IPv6'}); - } - - return 2*score(b) - 2*score(a) - a.localeCompare(b); - }); - - return names[0]; - } -} - -export default Node; \ No newline at end of file diff --git a/src/cluster/cluster-node.ts b/src/cluster/cluster-node.ts new file mode 100644 index 0000000..27a4907 --- /dev/null +++ b/src/cluster/cluster-node.ts @@ -0,0 +1,70 @@ +import crypto from 'crypto'; +import os, { NetworkInterfaceInfo } from 'os'; + +class Node { + public static readonly hostname = `${process.pid}@${os.hostname}`; + public static readonly identifier = crypto.createHash('sha1').update(`${Date.now() + Math.random()}`).digest('hex'); + public static readonly interface = Node.getNetworkInterface(); + + public static readonly address4 = Node._getIP(Node.interface, 'IPv4'); + public static readonly address6 = Node._getIP(Node.interface, 'IPv6'); + public static readonly address = Node.address4 || Node.address6 || '0.0.0.0'; + + public static getIP() { + return Node._getIP(Node.interface, 'IPv4') || Node._getIP(Node.interface, 'IPv6'); + } + + private static _getIP(iface: string, family: string): string | undefined { + const addresses = os.networkInterfaces()[iface]; + if (!addresses) { + return undefined; + } + return addresses.filter((addr) => { return addr.family == family; })[0]?.address; + } + + public static getNetworkInterface(iface?: string): string { + const interfaces = os.networkInterfaces(); + + if (iface != undefined) { + if (interfaces[iface]) { + return iface; + } else { + throw new Error('no_such_network_interface'); + } + } + + if (Object.keys(interfaces).length == 0) { + throw new Error('no_network_interfaces'); + } + + Object.keys(interfaces).forEach((element) => { + const addresses = interfaces[element]?.filter(entry => !entry.internal); + if (addresses?.length == 0) { + delete interfaces[element]; + } + }); + + const names = Object.keys(interfaces); + names.sort((a: string, b: string) => { + + const haveProperty = (array: Array, predicate: (x: NetworkInterfaceInfo) => boolean) => { + return array.filter(predicate).length; + } + + const score = (element: string) => { + const addresses = interfaces[element]; + if (!addresses) { + return -1; + } + return haveProperty(addresses, (e) => {return e.family == 'IPv4'}) + + haveProperty(addresses, (e) => {return e.family == 'IPv6'}); + } + + return 2*score(b) - 2*score(a) - a.localeCompare(b); + }); + + return names[0]; + } +} + +export default Node; \ No newline at end of file diff --git a/src/cluster/discovery-method.ts b/src/cluster/discovery-method.ts new file mode 100644 index 0000000..9fe0d9c --- /dev/null +++ b/src/cluster/discovery-method.ts @@ -0,0 +1,13 @@ +import dgram from 'dgram'; + +export default abstract class DiscoveryMethod { + + public abstract readonly name: string; + + public abstract eligible(): number; + + public abstract init(socket: dgram.Socket | undefined, socket6: dgram.Socket | undefined): void; + + public abstract getPeers(): Promise>; + +} \ No newline at end of file diff --git a/src/cluster/eventbus-interface.ts b/src/cluster/eventbus-interface.ts new file mode 100644 index 0000000..b95cae8 --- /dev/null +++ b/src/cluster/eventbus-interface.ts @@ -0,0 +1,35 @@ +import ClusterManager from './cluster-manager.js'; + +export type EventBusInterfaceOptions = { + callback: (error?: Error) => void +} + +export default abstract class EventBusInterface { + private destroyed: boolean = false; + + constructor(opts: EventBusInterfaceOptions) { + } + + public async destroy(): Promise { + if (this.destroyed) { + return; + } + await this._destroy(); + this.destroyed = true; + } + + public async publish(message: string): Promise { + return this._publish(message); + } + + protected abstract _publish(message: string): Promise; + + protected abstract _destroy(): Promise; + + protected receive(message: string): void { + const res: Boolean | Error = ClusterManager.receive(message); + if (res instanceof Error) { + throw res; + } + } +} \ No newline at end of file diff --git a/src/cluster/eventbus.ts b/src/cluster/eventbus.ts index a728bc6..d7609ca 100644 --- a/src/cluster/eventbus.ts +++ b/src/cluster/eventbus.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; import { Logger } from '../logger.js'; -import ClusterService from './index.js'; +import ClusterManager, { EmitCallback } from './cluster-manager.js'; export type EmitMeta = { node: { @@ -13,7 +13,7 @@ export type EmitMeta = { class EventBus extends EventEmitter { private logger: any; - private clusterService: ClusterService; + private emitCallback: EmitCallback; constructor() { super(); @@ -27,29 +27,27 @@ class EventBus extends EventEmitter { this.setMaxListeners(this.getMaxListeners() - 1); }); - const clusterService = this.clusterService = new ClusterService(); - clusterService.attach(this); - } + const emitCallback: EmitCallback = this.emitCallback = (event, message, meta) => { + super.emit(event, message, meta); + this.logger.isTraceEnabled() && + this.logger.trace({ + operation: 'emit', + event, + message, + meta + }); + }; - public async destroy() { - this.removeAllListeners(); - this.clusterService.detach(this); - return this.clusterService.destroy(); + ClusterManager.attach(emitCallback); } - public _emit(event: string, message: any, meta: EmitMeta) { - super.emit(event, message, meta); - this.logger.isTraceEnabled() && - this.logger.trace({ - operation: 'emit', - event, - message, - meta - }); + public async destroy(): Promise { + this.removeAllListeners(); + ClusterManager.detach(this.emitCallback); } async publish(event: string, message: any) { - return this.clusterService.publish(event, message); + return ClusterManager.publish(event, message); } async waitFor(channel: string, predicate: (message: any, meta: EmitMeta) => boolean, timeout: number | undefined) { diff --git a/src/cluster/kubernetes-discovery.js b/src/cluster/kubernetes-discovery.ts similarity index 58% rename from src/cluster/kubernetes-discovery.js rename to src/cluster/kubernetes-discovery.ts index f347038..986f7db 100644 --- a/src/cluster/kubernetes-discovery.js +++ b/src/cluster/kubernetes-discovery.ts @@ -1,10 +1,30 @@ import fs from 'fs'; import dns from 'dns/promises'; import { Logger } from '../logger.js'; +import DiscoveryMethod from './discovery-method.js'; +import ClusterManager from './cluster-manager.js'; -class KubernetesDiscovery { - constructor(opts) { - this.logger = opts?.logger || Logger("kubernetes-discovery"); +export type KubernetesDiscoveryOptions = { + serviceNameEnv?: string, + namespaceEnv?: string, + serviceName?: string, + namespace?: string, + clusterDomain?: string, +} + +class KubernetesDiscovery implements DiscoveryMethod { + public readonly name: string; + + private logger: any; + private _serviceName: string; + private _namespace: string; + private _clusterDomain: string; + private _serviceHost: string; + private _cacheTime: number; + private _cachedPeers: Array | undefined; + + constructor(opts: KubernetesDiscoveryOptions) { + this.logger = Logger("kubernetes-discovery"); const serviceNameEnv = opts?.serviceNameEnv || 'SERVICE_NAME'; const namespaceEnv = opts?.namespaceEnv || 'POD_NAMESPACE'; @@ -15,13 +35,11 @@ class KubernetesDiscovery { this._serviceHost = `${this._serviceName}.${this._namespace}.svc.${this._clusterDomain}`; - this._getLearntPeers = opts.getLearntPeers; - this.name = `kubernetes service ${this._serviceHost}`; this._cacheTime = Date.now() - 1000; } - eligible() { + public eligible(): number { const namespaceFile = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'; if (!fs.existsSync(namespaceFile)) { @@ -34,30 +52,29 @@ class KubernetesDiscovery { return 10; } - init() { + public init(): void { this.logger.debug({ message: `using ${this._serviceHost} headless service for pod discovery`, }); } - async getPeers() { + public async getPeers(): Promise> { if (this._cachedPeers && (Date.now() - this._cacheTime) < 1000) { return this._cachedPeers; } - const peers = await this._resolvePeers() - .then((p) => { - this._cachedPeers = p; - this._cacheTime = Date.now(); - return p; - }) - .catch((err) => { - this.logger.warn({ - message: `failed to resolve ${this._serviceHost}: ${err.message}` - }); - return []; + + let peers: Array = []; + try { + peers = await this._resolvePeers(); + this._cachedPeers = peers; + this._cacheTime = Date.now(); + } catch (err: any) { + this.logger.warn({ + message: `failed to resolve ${this._serviceHost}: ${err.message}` }); + } - const learntPeers = this._getLearntPeers(); + const learntPeers = ClusterManager.getLearntPeers(); for (let i = 0; i < learntPeers.length; i++) { if (peers.indexOf(learntPeers[i]) === -1) { peers.push(learntPeers[i]); @@ -77,8 +94,12 @@ class KubernetesDiscovery { return result4.value; } else if (result6.status == 'fulfilled' && result6.value?.length > 0) { return result6.value; + } else if (result4.status == 'rejected') { + throw result4.reason; + } else if (result6.status == 'rejected') { + throw result6.reason; } else { - throw result4?.reason || result6?.reason || new Error('unknown'); + throw new Error('unknown'); } }); } diff --git a/src/cluster/memory-eventbus.js b/src/cluster/memory-eventbus.js deleted file mode 100644 index ee25ee6..0000000 --- a/src/cluster/memory-eventbus.js +++ /dev/null @@ -1,29 +0,0 @@ -import { Logger } from '../logger.js'; - -class MemoryEventBus { - constructor(opts) { - this._handler = opts.handler; - this.logger = Logger("memory-eventbus"); - typeof opts.callback === 'function' && process.nextTick(opts.callback); - } - - async destroy() { - return true; - } - - async publish(message) { - return new Promise((resolve) => { - process.nextTick(() => { - this._handler(message); - this.logger.debug({ - operation: 'publish', - channel: message.event, - message, - }); - resolve(); - }); - }); - } -} - -export default MemoryEventBus; \ No newline at end of file diff --git a/src/cluster/memory-eventbus.ts b/src/cluster/memory-eventbus.ts new file mode 100644 index 0000000..c195f6b --- /dev/null +++ b/src/cluster/memory-eventbus.ts @@ -0,0 +1,38 @@ +import { Logger } from '../logger.js'; +import EventBusInterface, { EventBusInterfaceOptions } from './eventbus-interface.js'; + +export type MemoryEventBusOptions = EventBusInterfaceOptions; + +class MemoryEventBus extends EventBusInterface { + private logger: any; + + constructor(opts: EventBusInterfaceOptions) { + super(opts) + this.logger = Logger("memory-eventbus"); + typeof opts.callback === 'function' && process.nextTick(opts.callback); + } + + protected async _destroy(): Promise { + } + + protected async _publish(message: string): Promise { + return new Promise((resolve) => { + process.nextTick(() => { + try { + this.receive(message); + this.logger.debug({ + operation: 'publish', + message, + }); + } catch (e: any) { + this.logger.error({ + message: `failed to receive message ${message}: ${e.message}`, + }); + } + resolve(); + }); + }); + } +} + +export default MemoryEventBus; \ No newline at end of file diff --git a/src/cluster/multicast-discovery.js b/src/cluster/multicast-discovery.ts similarity index 66% rename from src/cluster/multicast-discovery.js rename to src/cluster/multicast-discovery.ts index 58d2683..4c12da3 100644 --- a/src/cluster/multicast-discovery.js +++ b/src/cluster/multicast-discovery.ts @@ -1,4 +1,8 @@ -function inCidr(ipAddress, cidrPrefix) { +import dgram from 'dgram'; +import DiscoveryMethod from "./discovery-method.js"; +import { Logger } from '../logger.js'; + +function inCidr(ipAddress: string, cidrPrefix: string): boolean { const [subnet, prefixLength] = cidrPrefix.split('/'); const subnetOctets = subnet.split('.').map(Number); const ipOctets = ipAddress.split('.')?.map(Number); @@ -13,27 +17,35 @@ function inCidr(ipAddress, cidrPrefix) { (ipOctets[2] << 8) | ipOctets[3]; - const mask = (0xffffffff << (32 - prefixLength)) >>> 0; + const mask = (0xffffffff << (32 - Number.parseInt(prefixLength))) >>> 0; return (subnetInt & mask) === (ipInt & mask); } -class MulticastDiscovery { +export type MulticastDiscoveryOptions = { + group: string, +} + +class MulticastDiscovery implements DiscoveryMethod { + public readonly name: string; + + private _multicastgroup: string; + private logger: any; - constructor(opts) { + constructor(opts: MulticastDiscoveryOptions) { this._multicastgroup = opts.group || '239.0.0.1'; if (!inCidr(this._multicastgroup, "239.0.0.0/8")) { throw new Error(`${this._multicastgroup} is not within the private multicast range 239.0.0.0/8`); } - this.logger = opts.logger; + this.logger = Logger('multicast-discovery'); this.name = `multicast group ${this._multicastgroup}`; } - eligible() { + public eligible(): number { return 0; } - init(socket) { + public init(socket: dgram.Socket): void { if (!socket) { this.logger.error({ message: `Unable to initialize multicast discovery, no IPv4 socket available` @@ -47,7 +59,7 @@ class MulticastDiscovery { }); } - async getPeers() { + public async getPeers(): Promise> { return [this._multicastgroup]; } } diff --git a/src/cluster/redis-eventbus.js b/src/cluster/redis-eventbus.ts similarity index 72% rename from src/cluster/redis-eventbus.js rename to src/cluster/redis-eventbus.ts index 61f06f5..95d38ac 100644 --- a/src/cluster/redis-eventbus.js +++ b/src/cluster/redis-eventbus.ts @@ -1,9 +1,30 @@ import assert from 'assert/strict'; -import Redis from 'redis'; +import Redis, { RedisClientType } from 'redis'; import { Logger } from '../logger.js'; +import EventBusInterface, { EventBusInterfaceOptions } from './eventbus-interface.js'; -class RedisEventBus { - constructor(opts) { +export type RedisEventBusOptions = { + redisUrl: URL +} + +type _RedisEventBusOptions = EventBusInterfaceOptions & RedisEventBusOptions & { + callback: (error?: Error) => void +} + +class RedisEventBus extends EventBusInterface{ + private logger: any; + private _channel: string; + + private _subscriber: RedisClientType; + private _subscriber_error: Error | undefined; + private _subscriber_was_ready: boolean = false; + + private _publisher: RedisClientType; + private _publisher_error: Error | undefined; + private _publisher_was_ready: boolean = false; + + constructor(opts: _RedisEventBusOptions) { + super(opts); this.logger = Logger("redis-eventbus"); this._channel = "exposr"; @@ -21,10 +42,11 @@ class RedisEventBus { }); this._publisher = this._subscriber.duplicate(); - const readyHandler = (client) => { - const clientProp = `_${client}`; - const errorProp = `_${client}_error`; - const wasReadyProp = `_${client}_was_ready`; + const readyHandler = (client: "subscriber" | "publisher") => { + + const clientProp: "_subscriber" | "_publisher" = `_${client}`; + const errorProp: "_subscriber_error" | "_publisher_error" = `_${client}_error`; + const wasReadyProp: "_subscriber_was_ready" | "_publisher_was_ready" = `_${client}_was_ready`; this[clientProp].hello() .catch(() => {}) @@ -44,10 +66,11 @@ class RedisEventBus { }); }; - const errorHandler = (err, client) => { - const clientProp = `_${client}`; - const errorProp = `_${client}_error`; - const wasReadyProp = `_${client}_was_ready`; + const errorHandler = (err: Error, client: "subscriber" | "publisher") => { + + const clientProp: "_subscriber" | "_publisher" = `_${client}`; + const errorProp: "_subscriber_error" | "_publisher_error" = `_${client}_error`; + const wasReadyProp: "_subscriber_was_ready" | "_publisher_was_ready" = `_${client}_was_ready`; if (this[errorProp]?.message != err?.message) { this.logger.error({ @@ -105,8 +128,8 @@ class RedisEventBus { this._subscriber.subscribe(this._channel, (message) => { try { - opts.handler(message); - } catch (e) { + this.receive(message); + } catch (e: any) { this.logger.debug({ message: `failed to receive message: ${e.message}`, operation: 'redis_channel_error', @@ -128,7 +151,7 @@ class RedisEventBus { errorHandler(err, 'publisher'); }); }), - ]).catch((err) => { + ]).catch((err: any) => { this.logger.error({ message: `failed to connect to ${redisUrl}: ${err.message}`, operation: 'connect', @@ -142,13 +165,7 @@ class RedisEventBus { }); } - async destroy() { - if (this.destroyed) { - return; - } - - this.destroyed = true; - + protected async _destroy(): Promise { this.logger.trace({ operation: 'destroy', message: 'initiated' @@ -156,7 +173,7 @@ class RedisEventBus { try { await this._subscriber.unsubscribe(this._channel); - } catch (err) { + } catch (err: any) { this.logger.error({ operation: 'destroy', msg: 'could not unsubscribe', @@ -164,26 +181,15 @@ class RedisEventBus { }); } - const quit = (client) => { - return client.quit((res) => { - this.logger.trace({ - client: client.name, - operation: 'destroy', - message: 'complete', - res, - }); - }); - }; - - return Promise.allSettled([ - quit(this._publisher), - quit(this._subscriber) + await Promise.allSettled([ + this._publisher.quit(), + this._subscriber.quit(), ]); } - async publish(message) { - return this._publisher.publish(this._channel, message) - .catch((err) => { + protected async _publish(message: any): Promise { + await this._publisher.publish(this._channel, message) + .catch((err: any) => { this.logger.error({ message: `failed to publish message ${message.event}: ${err.message}`, operation: 'publish', diff --git a/src/cluster/udp-eventbus.js b/src/cluster/udp-eventbus.ts similarity index 63% rename from src/cluster/udp-eventbus.js rename to src/cluster/udp-eventbus.ts index 296d0e1..ea63cc6 100644 --- a/src/cluster/udp-eventbus.js +++ b/src/cluster/udp-eventbus.ts @@ -2,12 +2,33 @@ import assert from 'assert/strict'; import dgram from 'dgram'; import net from 'net'; import { Logger } from '../logger.js'; -import MulticastDiscovery from './multicast-discovery.js'; -import KubernetesDiscovery from './kubernetes-discovery.js'; +import MulticastDiscovery, { MulticastDiscoveryOptions } from './multicast-discovery.js'; +import KubernetesDiscovery, { KubernetesDiscoveryOptions } from './kubernetes-discovery.js'; +import EventBusInterface, { EventBusInterfaceOptions } from './eventbus-interface.js'; +import DiscoveryMethod from './discovery-method.js'; + +export type UdpEventBusOptions = { + port: number, + discoveryMethod: "multicast" | "kubernetes" | undefined, + multicast: MulticastDiscoveryOptions, + kubernetes: KubernetesDiscoveryOptions, +} + +type _UdpEventBusOptions = EventBusInterfaceOptions & UdpEventBusOptions & { + callback: (error?: Error) => void +} + +class UdpEventBus extends EventBusInterface { + private logger: any; + private _port: number; + private _discoveryMethods: { [key: string]: DiscoveryMethod } = {}; + private _discoveryMethod: DiscoveryMethod | undefined; + private _socket: dgram.Socket | undefined; + private _socket6: dgram.Socket | undefined; -class UdpEventBus { + constructor(opts: _UdpEventBusOptions) { + super(opts); - constructor(opts) { this.logger = Logger("udp-eventbus"); this._port = opts.port || 1025; @@ -15,17 +36,13 @@ class UdpEventBus { try { this._discoveryMethods = { multicast: new MulticastDiscovery({ - logger: this.logger, - getLearntPeers: opts.getLearntPeers, ...opts.multicast, }), kubernetes: new KubernetesDiscovery({ - logger: this.logger, - getLearntPeers: opts.getLearntPeers, ...opts.kubernetes }), }; - } catch (e) { + } catch (e: any) { if (typeof opts.callback === 'function') { process.nextTick(() => { opts.callback(e) }); } else { @@ -58,10 +75,13 @@ class UdpEventBus { this._discoveryMethod = eligibleMethods.length > 0 ? eligibleMethods[0].method : undefined; } + if (this._discoveryMethod == undefined) { + process.nextTick(() => { opts.callback(new Error('No working discovery methods available'))}); + return + } assert(this._discoveryMethod != undefined); - assert(opts.handler != undefined); - const onMessage = (data, rinfo) => { + const onMessage = (data: Buffer, rinfo: dgram.RemoteInfo) => { if (data.length <= 5) { return; } @@ -73,15 +93,21 @@ class UdpEventBus { return; } - opts.handler(msg.toString('utf-8')); + try { + this.receive(msg.toString('utf-8')); + } catch (e: any) { + this.logger.error({ + message: `Failed to receive message ${msg}` + }); + } }; - const createSocket = (type) => { + const createSocket = (type: dgram.SocketType): Promise => { return new Promise((resolve, reject) => { - const sock = dgram.createSocket({ type: 'udp4', reuseAddr: true }); + const sock = dgram.createSocket({ type, reuseAddr: true }); sock.on('message', onMessage); - const connectError = (err) => { + const connectError = (err: Error) => { sock.close(); reject(err); }; @@ -109,47 +135,48 @@ class UdpEventBus { this._socket6 = result6.value; mode += `${mode != '' ? '/' : ''}IPv6`; } - if (!this._socket && !this._socket6) { + + if (result4.status == 'rejected' && result6.status == 'rejected') { typeof opts.callback === 'function' && process.nextTick(() => { opts.callback(result4.reason) }); return; } - this._discoveryMethod.init(this._socket, this._socket6); + this._discoveryMethod?.init(this._socket, this._socket6); this.logger.info({ - message: `Cluster interface on ${this._port} (${mode}) using discovery method ${this._discoveryMethod.name}`, + message: `Cluster interface on ${this._port} (${mode}) using discovery method ${this._discoveryMethod?.name}`, }); typeof opts.callback === 'function' && process.nextTick(opts.callback); }); } - async destroy() { - return Promise.allSettled([ + protected async _destroy(): Promise { + await Promise.allSettled([ new Promise((resolve, reject) => { if (this._socket) { - this._socket.close(resolve); + this._socket.close(() => { resolve(undefined) }); } else { - resolve(); + resolve(undefined); } }), new Promise((resolve, reject) => { if (this._socket6) { - this._socket6.close(resolve); + this._socket6.close(() => { resolve(undefined) }); } else { - resolve(); + resolve(undefined); } }) ]) } - async publish(message) { - this._receivers = await this._discoveryMethod.getPeers(); + protected async _publish(message: any): Promise { + const receivers = (await this._discoveryMethod?.getPeers()) || []; const header = Buffer.allocUnsafe(4); header.writeUInt8(0xE0, 0); header.writeUInt8(0x05, 1); header.writeUInt16BE(0, 2); - const promises = this._receivers.map((receiver) => { + const promises = receivers.map((receiver: string) => { return new Promise((resolve, reject) => { const sock = net.isIPv6(receiver) ? this._socket6 : this._socket; if (!sock) { @@ -161,13 +188,13 @@ class UdpEventBus { if (err) { reject(err); } else { - resolve(); + resolve(undefined); } }); }); }); - return Promise.allSettled(promises); + await Promise.allSettled(promises); } } export default UdpEventBus; \ No newline at end of file diff --git a/src/controller/admin-api-controller.ts b/src/controller/admin-api-controller.ts index 0be9f70..ee66d48 100644 --- a/src/controller/admin-api-controller.ts +++ b/src/controller/admin-api-controller.ts @@ -9,7 +9,7 @@ import { ERROR_UNKNOWN_ERROR, } from '../utils/errors.js'; import KoaController from './koa-controller.js'; -import ClusterService from '../cluster/index.js'; +import ClusterManager from '../cluster/cluster-manager.js'; import Account from '../account/account.js'; import Tunnel from '../tunnel/tunnel.js'; @@ -21,7 +21,6 @@ class AdminApiController extends KoaController { private accountService!: AccountService; private _tunnelService!: TunnelService; private _transportService!: TransportService; - private _clusterService!: ClusterService; constructor(opts: any) { const logger: any = Logger("admin-api"); @@ -41,7 +40,6 @@ class AdminApiController extends KoaController { this.accountService = new AccountService(); this._tunnelService = new TunnelService(); this._transportService = new TransportService(); - this._clusterService = new ClusterService(); if (this.apiKey != undefined) { logger.info("Admin API resource enabled with API key"); @@ -378,7 +376,7 @@ class AdminApiController extends KoaController { }, handler: [handleAdminAuth, handleError, async (ctx, next) => { const now = new Date().getTime(); - const nodes = this._clusterService.getNodes().map((node) => { + const nodes = ClusterManager.getNodes().map((node) => { return { node_id: node.id, host: node.host, @@ -402,7 +400,6 @@ class AdminApiController extends KoaController { this.accountService.destroy(), this._tunnelService.destroy(), this._transportService.destroy(), - this._clusterService.destroy(), ]); } } diff --git a/src/index.js b/src/index.js index 1125fcb..e0cb3d1 100644 --- a/src/index.js +++ b/src/index.js @@ -2,7 +2,7 @@ import Config from './config.js'; import AdminApiController from './controller/admin-api-controller.js'; import AdminController from './controller/admin-controller.js'; import ApiController from './controller/api-controller.js'; -import ClusterService from './cluster/index.js'; +import ClusterManager from './cluster/cluster-manager.js'; import IngressManager from './ingress/ingress-manager.js'; import { Logger } from './logger.js'; import { StorageService } from './storage/index.js'; @@ -45,39 +45,29 @@ export default async (argv) => { } }); - const clusterServiceReady = new Promise((resolve, reject) => { - try { - const type = config.get('cluster'); - - const clusterService = new ClusterService(type, { - callback: (err) => { - err ? reject(err) : resolve(clusterService); - }, - key: config.get('cluster-key'), - redis: { - redisUrl: config.get('cluster-redis-url'), - }, - udp: { - port: config.get('cluster-udp-port'), - discoveryMethod: config.get('cluster-udp-discovery') != 'auto' ? config.get('cluster-udp-discovery'): undefined, - multicast: { - group: config.get('cluster-udp-discovery-multicast-group') - }, - kubernetes: { - serviceNameEnv: config.get('cluster-udp-discovery-kubernetes-service-env'), - namespaceEnv: config.get('cluster-udp-discovery-kubernetes-namespace-env'), - serviceName: config.get('cluster-udp-discovery-kubernetes-service'), - namespace: config.get('cluster-udp-discovery-kubernetes-namespace'), - clusterDomain: config.get('cluster-udp-discovery-kubernetes-cluster-domain'), - } - } - }); - } catch (e) { - reject(e); + const clusterType = config.get('cluster'); + const clusterServiceReady = ClusterManager.init(clusterType, { + key: config.get('cluster-key'), + redis: { + redisUrl: config.get('cluster-redis-url'), + }, + udp: { + port: config.get('cluster-udp-port'), + discoveryMethod: config.get('cluster-udp-discovery') != 'auto' ? config.get('cluster-udp-discovery'): undefined, + multicast: { + group: config.get('cluster-udp-discovery-multicast-group') + }, + kubernetes: { + serviceNameEnv: config.get('cluster-udp-discovery-kubernetes-service-env'), + namespaceEnv: config.get('cluster-udp-discovery-kubernetes-namespace-env'), + serviceName: config.get('cluster-udp-discovery-kubernetes-service'), + namespace: config.get('cluster-udp-discovery-kubernetes-namespace'), + clusterDomain: config.get('cluster-udp-discovery-kubernetes-cluster-domain'), + } } }); - const [storageService, clusterService] = await Promise + const [storageService, _] = await Promise .all([ storageServiceReady, clusterServiceReady @@ -186,7 +176,7 @@ export default async (argv) => { process.exit(-1); }); - await clusterService.setReady(); + await ClusterManager.start(); adminController.setReady(); logger.info("exposrd ready"); @@ -210,13 +200,13 @@ export default async (argv) => { let result; try { - const multiNode = clusterService.setReady(false); - adminController.setReady(false); - // Drain and block new tunnel connections await Promise.race([TunnelConnectionManager.stop() , timeout, force]); - if (multiNode) { + adminController.setReady(false); + ClusterManager.stop(); + + if (ClusterManager.isMultinode()) { logger.info("Waiting for connections to drain..."); await Promise.race([new Promise((resolve) => { setTimeout(resolve, drainTimeout); @@ -230,7 +220,7 @@ export default async (argv) => { transport.destroy(), IngressManager.close(), storageService.destroy(), - clusterService.destroy(), + ClusterManager.close(), config.destroy(), ]); diff --git a/src/transport/cluster/cluster-transport.ts b/src/transport/cluster/cluster-transport.ts index ae9c67c..fd0f3ff 100644 --- a/src/transport/cluster/cluster-transport.ts +++ b/src/transport/cluster/cluster-transport.ts @@ -2,7 +2,7 @@ import { Duplex } from "stream"; import tls from "tls"; import net from "net"; import Transport, { TransportConnectionOptions, TransportOptions } from "../transport.js"; -import ClusterService from "../../cluster/index.js"; +import ClusterManager from "../../cluster/cluster-manager.js"; export interface ClusterTransportOptions extends TransportOptions { nodeId: string, @@ -10,16 +10,14 @@ export interface ClusterTransportOptions extends TransportOptions { export default class ClusterTransport extends Transport { private nodeId: string; - private clusterService: ClusterService; constructor(opts: ClusterTransportOptions) { super(opts); this.nodeId = opts.nodeId; - this.clusterService = new ClusterService(); } public createConnection(opts: TransportConnectionOptions, callback: (err: Error | undefined, sock: Duplex) => void): Duplex { - const clusterNode = this.clusterService.getNode(this.nodeId); + const clusterNode = ClusterManager.getNode(this.nodeId); if (!clusterNode) { const sock = new net.Socket(); sock.destroy(new Error('node_does_not_exist')); @@ -63,6 +61,5 @@ export default class ClusterTransport extends Transport { } protected async _destroy(): Promise { - await this.clusterService.destroy(); } } \ No newline at end of file diff --git a/src/tunnel/tunnel-service.ts b/src/tunnel/tunnel-service.ts index f39cc0d..272ddb7 100644 --- a/src/tunnel/tunnel-service.ts +++ b/src/tunnel/tunnel-service.ts @@ -1,6 +1,5 @@ import crypto from 'node:crypto'; import EventBus, { EmitMeta } from "../cluster/eventbus.js"; -import ClusterService from "../cluster/index.js"; import { Logger } from "../logger.js"; import Storage from "../storage/index.js"; import { TunnelConfig, TunnelHttpIngressConfig, TunnelIngressConfig, TunnelIngressTypeConfig, cloneTunnelConfig } from "./tunnel-config.js"; @@ -45,7 +44,6 @@ export default class TunnelService { private storage!: Storage; private ingressService!: IngressService; private eventBus!: EventBus; - private clusterService!: ClusterService; private altNameService!: AltNameService; private accountTunnelService!: AccountTunnelService; @@ -61,7 +59,6 @@ export default class TunnelService { this.storage = new Storage("tunnel"); this.ingressService = new IngressService(); this.eventBus = new EventBus(); - this.clusterService = new ClusterService(); this.altNameService = new AltNameService(); this.accountTunnelService = new AccountTunnelService(); } @@ -76,7 +73,6 @@ export default class TunnelService { await Promise.allSettled([ this.storage.destroy(), this.eventBus.destroy(), - this.clusterService.destroy(), this.ingressService.destroy(), this.altNameService.destroy(), this.accountTunnelService.destroy(), diff --git a/test/system/eventbus/test_redis.js b/test/system/eventbus/test_redis.js index 4624148..b2814f2 100644 --- a/test/system/eventbus/test_redis.js +++ b/test/system/eventbus/test_redis.js @@ -1,28 +1,24 @@ import assert from 'assert/strict'; import Config from '../../../src/config.js'; -import ClusterService from '../../../src/cluster/index.js'; import EventBus from '../../../src/cluster/eventbus.js'; import { REDIS_URL } from '../../env.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('redis eventbus', () => { - let clusterService; let bus; let config; before(async () => { config = new Config(); - return new Promise((resolve) => { - clusterService = new ClusterService('redis', { - redis: { - redisUrl: REDIS_URL, - }, - callback: (err) => err ? rejects(err) : resolve() - }); + await ClusterManager.init(ClusterManagerType.REDIS, { + redis: { + redisUrl: REDIS_URL, + } }); }); after(async () => { - await clusterService.destroy(); + await ClusterManager.close(); await config.destroy(); }); @@ -42,11 +38,10 @@ describe('redis eventbus', () => { }); await bus.publish('test2', {data: 10}); - let res = await bus.publish('test', {data: 42}); - assert(res == true, `failed to publish message, got ${res}`); + await bus.publish('test', {data: 42}); - res = await recv; - assert(res.data == 42); + let res = await recv; + assert(res.data == 42, `did not get expected message, got ${res.data}`); }); it('redis bus waitfor', async () => { @@ -56,8 +51,6 @@ describe('redis eventbus', () => { let res = await bus.publish('test', {data: 42}); - assert(res == true, `failed to publish message, got ${res}`); - res = await recv; assert(res.data == 42); assert(bus.listenerCount('test') == 0, 'listener still attached'); @@ -69,7 +62,6 @@ describe('redis eventbus', () => { }, 100); let res = await bus.publish('test', {data: 10}); - assert(res == true, `failed to publish message, got ${res}`); try { await recv; diff --git a/test/unit/account/test_account-service.ts b/test/unit/account/test_account-service.ts index 65872f1..f1eba42 100644 --- a/test/unit/account/test_account-service.ts +++ b/test/unit/account/test_account-service.ts @@ -2,12 +2,12 @@ import assert from 'assert/strict'; import crypto from 'crypto'; import sinon from 'sinon'; import AccountService from '../../../src/account/account-service.js'; -import { initClusterService, initStorageService } from '../test-utils.js'; +import { initStorageService } from '../test-utils.js'; import Config from '../../../src/config.js'; import { StorageService } from '../../../src/storage/index.js'; import Account from '../../../src/account/account.js'; import TunnelService from '../../../src/tunnel/tunnel-service.js'; -import ClusterService from '../../../src/cluster/index.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('account service', () => { it('can generate account ids', async () => { @@ -48,14 +48,13 @@ describe('account service', () => { let config: Config; let storageService: StorageService; - let clusterService: ClusterService; let accountService: AccountService; let tunnelService: TunnelService; beforeEach(async () => { config = new Config(); storageService = await initStorageService(); - clusterService = initClusterService(); + await ClusterManager.init(ClusterManagerType.MEM); accountService = new AccountService(); tunnelService = new TunnelService(); }); @@ -64,7 +63,7 @@ describe('account service', () => { await accountService.destroy(); await tunnelService.destroy(); await storageService.destroy(); - await clusterService.destroy(); + await ClusterManager.close(); await config.destroy(); sinon.restore(); }) diff --git a/test/unit/cluster/test_cluster-eventbus.js b/test/unit/cluster/test_cluster-eventbus.js index b5080a9..879a8f9 100644 --- a/test/unit/cluster/test_cluster-eventbus.js +++ b/test/unit/cluster/test_cluster-eventbus.js @@ -3,24 +3,21 @@ import sinon from 'sinon'; import dgram from 'dgram'; import dns from 'dns/promises'; import fs from 'fs'; -import ClusterService from "../../../src/cluster/index.js"; import EventBus from "../../../src/cluster/eventbus.js"; import Config from "../../../src/config.js"; import UdpEventBus from '../../../src/cluster/udp-eventbus.js'; import MulticastDiscovery from '../../../src/cluster/multicast-discovery.js'; import KubernetesDiscovery from '../../../src/cluster/kubernetes-discovery.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('UDP eventbus', () => { - const createClusterService = async (opts = {}) => { - return new Promise((resolve, reject) => { - const res = new ClusterService('udp', { - udp: { - port: 10250, - ...opts, - }, - callback: (err) => { err ? reject(err) : resolve(res); } - }); + const initClusterManager = async (opts = {}) => { + await ClusterManager.init(ClusterManagerType.UDP, { + udp: { + port: 10250, + ...opts, + } }); }; @@ -116,7 +113,7 @@ describe('UDP eventbus', () => { it('published messages are received', async () => { const membershipSpy = sinon.spy(dgram.Socket.prototype, 'addMembership'); - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'multicast' }); @@ -136,16 +133,16 @@ describe('UDP eventbus', () => { assert(recv?.data == 42, "did not receive published message"); await bus.destroy(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it('invalid multicast message is rejected', async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'multicast' }); const bus = new EventBus(); - const spy = sinon.spy(ClusterService.prototype, "_receive"); + const spy = sinon.spy(ClusterManager, "receive"); const sock = dgram.createSocket({ type: 'udp4', reuseAddr: true }); sock.send("foo", 1025, '239.0.0.1'); @@ -153,21 +150,21 @@ describe('UDP eventbus', () => { sock.close(); await bus.destroy(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it('multicast group can be configured', async () => { const membershipSpy = sinon.spy(dgram.Socket.prototype, 'addMembership'); const loopbackSpy = sinon.spy(dgram.Socket.prototype, 'setMulticastLoopback'); - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'multicast', multicast: { group: '239.0.0.2' }, }); - assert(clusterservice._bus._discoveryMethod._multicastgroup == '239.0.0.2', "group not set to 239.0.0.2"); + assert(ClusterManager._bus._discoveryMethod._multicastgroup == '239.0.0.2', "group not set to 239.0.0.2"); assert(membershipSpy.calledWithExactly("239.0.0.2"), "group not set on socket"); assert(loopbackSpy.calledWithExactly(true), "multicast loopback not set"); @@ -184,13 +181,13 @@ describe('UDP eventbus', () => { assert(recv?.data == 42, "did not receive published message"); await bus.destroy(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it('invalid multicast group can not be configured', async () => { let error; try { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'multicast', multicast: { group: '127.0.0.1' @@ -220,7 +217,7 @@ describe('UDP eventbus', () => { }); it(`published messages are received`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); bus = new EventBus(); @@ -240,7 +237,7 @@ describe('UDP eventbus', () => { assert(recv?.data == 42, "did not receive published message"); await bus.destroy(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`headless service name can be controlled with SERVICE_NAME and POD_NAMESPACE`, async () => { @@ -250,14 +247,14 @@ describe('UDP eventbus', () => { 'SERVICE_NAME': 'my-service' }); - const clusterservice = new ClusterService('udp', { + await ClusterManager.init('udp', { discoveryMethod: 'kubernetes' }); - const serviceHost = clusterservice._bus._discoveryMethod._serviceHost; + const serviceHost = ClusterManager._bus._discoveryMethod._serviceHost; assert(serviceHost == 'my-service.my-space.svc.cluster.local', `did not get expected service host, got ${serviceHost}`); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`headless service name can be controlled with custom environment names`, async () => { @@ -267,7 +264,7 @@ describe('UDP eventbus', () => { 'MY_SERVICE_NAME': 'my-service' }); - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes', kubernetes: { serviceNameEnv: 'MY_SERVICE_NAME', @@ -275,14 +272,14 @@ describe('UDP eventbus', () => { } }); - const serviceHost = clusterservice._bus._discoveryMethod._serviceHost; + const serviceHost = ClusterManager._bus._discoveryMethod._serviceHost; assert(serviceHost == 'my-service.my-space.svc.cluster.local', `did not get expected service host, got ${serviceHost}`); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`headless service name can be set explicitly`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes', kubernetes: { serviceName: 'my-service', @@ -290,14 +287,14 @@ describe('UDP eventbus', () => { } }); - const serviceHost = clusterservice._bus._discoveryMethod._serviceHost; + const serviceHost = ClusterManager._bus._discoveryMethod._serviceHost; assert(serviceHost == 'my-service.my-space.svc.cluster.local', `did not get expected service host, got ${serviceHost}`); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers is cached`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); @@ -305,7 +302,7 @@ describe('UDP eventbus', () => { .withArgs('exposr-headless.default.svc.cluster.local') .resolves(["127.0.0.1"]); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers[0] == "127.0.0.1", "did not get expected peer"); sinon.restore(); @@ -313,48 +310,48 @@ describe('UDP eventbus', () => { .withArgs('exposr-headless.default.svc.cluster.local') .resolves(["127.0.0.2"]); - const peers2 = await clusterservice._bus._discoveryMethod.getPeers(); + const peers2 = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers2[0] == "127.0.0.1", "did not get expected peer"); const clock = sinon.useFakeTimers(Date.now() + 1000); - const peers3 = await clusterservice._bus._discoveryMethod.getPeers(); + const peers3 = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers3[0] == "127.0.0.2", "did not get expected peer"); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers with no peers available returns empty list`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes', }); - sinon.stub(clusterservice._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns([]); sinon.stub(dns, 'resolve4') .withArgs('exposr-headless.default.svc.cluster.local') .rejects(new Error('error')); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers?.length == 0, "expected empty list of peers"); sinon.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers errors are not cached`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); - sinon.stub(clusterservice._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns([]); sinon.stub(dns, 'resolve4') .withArgs('exposr-headless.default.svc.cluster.local') .rejects(new Error('error')); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers?.length == 0, "expected empty list of peers"); sinon.restore(); @@ -362,15 +359,15 @@ describe('UDP eventbus', () => { .withArgs('exposr-headless.default.svc.cluster.local') .resolves(["127.0.0.1"]); - const peers2 = await clusterservice._bus._discoveryMethod.getPeers(); + const peers2 = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers2[0] == "127.0.0.1", "did not get expected peer"); sinon.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers prefers IPv4`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); @@ -382,15 +379,15 @@ describe('UDP eventbus', () => { .withArgs('exposr-headless.default.svc.cluster.local') .resolves(["fe80::11:22:ee:ff"]); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers[0] == "127.0.0.1", `did not get expected peer, got ${peers}`); sinon.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers handles IPv4 only`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); @@ -402,15 +399,15 @@ describe('UDP eventbus', () => { .withArgs('exposr-headless.default.svc.cluster.local') .rejects(new Error('error')); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers[0] == "127.0.0.1", `did not get expected peer, got ${peers}`); sinon.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers handles IPv6 only`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); @@ -422,54 +419,52 @@ describe('UDP eventbus', () => { .withArgs('exposr-headless.default.svc.cluster.local') .resolves(["fe80::11:22:ee:ff"]); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers[0] == "fe80::11:22:ee:ff", `did not get expected peer, got ${peers}`); sinon.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers returns learnt nodes`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); - sinon.stub(clusterservice._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns(["127.0.0.2"]); sinon.stub(dns, 'resolve4') .withArgs('exposr-headless.default.svc.cluster.local') .resolves(["127.0.0.1"]); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers.length == 2, "got more peers than expected"); assert(peers[0] == '127.0.0.1', `did not get expected peer, got ${peers}`); assert(peers[1] == '127.0.0.2', `did not get expected peer, got ${peers}`); sinon.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); }); it(`getPeers do not return duplicated nodes`, async () => { - const clusterservice = await createClusterService({ + await initClusterManager({ discoveryMethod: 'kubernetes' }); - sinon.stub(clusterservice._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns(["127.0.0.1"]); sinon.stub(dns, 'resolve4') .withArgs('exposr-headless.default.svc.cluster.local') .resolves(["127.0.0.1"]); - const peers = await clusterservice._bus._discoveryMethod.getPeers(); + const peers = await ClusterManager._bus._discoveryMethod.getPeers(); assert(peers.length == 1, "got more peers than expected"); assert(peers[0] == '127.0.0.1', `did not get expected peer, got ${peers}`); sinon.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); }); - - }); }); \ No newline at end of file diff --git a/test/unit/cluster/test_cluster-service.js b/test/unit/cluster/test_cluster-service.js index 22a05de..9d55b60 100644 --- a/test/unit/cluster/test_cluster-service.js +++ b/test/unit/cluster/test_cluster-service.js @@ -1,26 +1,25 @@ import assert from 'assert/strict'; import sinon from 'sinon'; import crypto from 'crypto'; -import ClusterService from "../../../src/cluster/index.js"; import EventBus from "../../../src/cluster/eventbus.js"; import Config from "../../../src/config.js"; import Node from '../../../src/cluster/cluster-node.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('cluster service', () => { const sendingNode = crypto.createHash('sha1').update(new Date().getTime().toString()).digest('hex'); - let clusterservice; let config; let clock; - beforeEach(() => { + beforeEach(async () => { config = new Config(); clock = sinon.useFakeTimers({shouldAdvanceTime: true}); - clusterservice = new ClusterService('mem', {}); + await ClusterManager.init(ClusterManagerType.MEM); }); afterEach(async () => { clock.restore(); - await clusterservice.destroy(); + await ClusterManager.close(); config.destroy(); sinon.restore(); }) @@ -52,14 +51,14 @@ describe('cluster service', () => { }); it(`messages with invalid signatures are rejected`, async () => { - const spy = sinon.spy(ClusterService.prototype, "_receive"); + const spy = sinon.spy(ClusterManager, "receive"); const bus = new EventBus(); const recv = bus.waitFor('test', (message) => { return message.data == 42; }, 100); - clusterservice._bus.publish(JSON.stringify({ + ClusterManager._bus.publish(JSON.stringify({ event: 'test', message: { data: 42 }, node: { @@ -101,24 +100,24 @@ describe('cluster service', () => { }; let recv; - for (let i = 0; i < (clusterservice._window_size * 2) + 3; i++) { + for (let i = 0; i < (ClusterManager._window_size * 2) + 3; i++) { recv = waitmsg(); await publish(bus, 'foo', {data: 42}); assert((await recv)?.data == 42, "did not receive published message"); } - let prev_seq = clusterservice._seq; - clusterservice._seq += 2; + let prev_seq = ClusterManager._seq; + ClusterManager._seq += 2; await publish(bus, 'foo', {data: 42}); assert((await recv)?.data == 42, "did not receive published message"); - assert(clusterservice._nodes[sendingNode].seq_win.toString(2) == '1111111111111001', - `got ${clusterservice._nodes[sendingNode].seq_win.toString(2)}`); + assert(ClusterManager._nodes[sendingNode].seq_win.toString(2) == '1111111111111001', + `got ${ClusterManager._nodes[sendingNode].seq_win.toString(2)}`); - clusterservice._seq = prev_seq; + ClusterManager._seq = prev_seq; await publish(bus, 'foo', {data: 42}); assert((await recv)?.data == 42, "did not receive published message"); - assert(clusterservice._nodes[sendingNode].seq_win.toString(2) == '1111111111111101', - `got ${clusterservice._nodes[sendingNode].seq_win.toString(2)}`); + assert(ClusterManager._nodes[sendingNode].seq_win.toString(2) == '1111111111111101', + `got ${ClusterManager._nodes[sendingNode].seq_win.toString(2)}`); await bus.destroy(); }); @@ -134,32 +133,32 @@ describe('cluster service', () => { }); }; - clusterservice._seq = 2; + ClusterManager._seq = 2; let recv = waitmsg(); await publish(bus, 'foo', {data: 42}); assert((await recv)?.data == 42, "did not receive published message"); - assert(clusterservice._nodes[sendingNode].seq_win.toString(2) == '1', - `got ${clusterservice._nodes[sendingNode].seq_win.toString(2)}`); + assert(ClusterManager._nodes[sendingNode].seq_win.toString(2) == '1', + `got ${ClusterManager._nodes[sendingNode].seq_win.toString(2)}`); - clusterservice._seq = 0; + ClusterManager._seq = 0; recv = waitmsg(); await publish(bus, 'foo', {data: 43}); assert((await recv)?.data == 43, "did not receive published message"); - assert(clusterservice._nodes[sendingNode].seq_win.toString(2) == '101', - `got ${clusterservice._nodes[sendingNode].seq_win.toString(2)}`); + assert(ClusterManager._nodes[sendingNode].seq_win.toString(2) == '101', + `got ${ClusterManager._nodes[sendingNode].seq_win.toString(2)}`); - clusterservice._seq = 1; + ClusterManager._seq = 1; recv = waitmsg(); await publish(bus, 'foo', {data: 44}); assert((await recv)?.data == 44, "did not receive published message"); - assert(clusterservice._nodes[sendingNode].seq_win.toString(2) == '111', - `got ${clusterservice._nodes[sendingNode].seq_win.toString(2)}`); + assert(ClusterManager._nodes[sendingNode].seq_win.toString(2) == '111', + `got ${ClusterManager._nodes[sendingNode].seq_win.toString(2)}`); await bus.destroy(); }); it(`repeated messages are rejected`, async () => { - const spy = sinon.spy(ClusterService.prototype, "_receive"); + const spy = sinon.spy(ClusterManager, "receive"); const bus = new EventBus(); const waitmsg = () => { @@ -178,7 +177,7 @@ describe('cluster service', () => { await publish(bus, 'foo', {data: 42}); assert((await recv)?.data == 42, "did not receive published message"); - clusterservice._seq = 1; + ClusterManager._seq = 1; recv = bus.waitFor('test', (message) => { return message.data == 43; }, 100); @@ -195,94 +194,94 @@ describe('cluster service', () => { describe('cluster nodes', () => { it(`are learned when messages are received`, async () => { - const spy = sinon.spy(ClusterService.prototype, "_learnNode"); + const spy = sinon.spy(ClusterManager, "_learnNode"); const bus = new EventBus(); await publish(bus, 'foo', {data: 42}); assert(spy.calledOnce == true, "_learnNode not called"); - const node = clusterservice.getNode(sendingNode); + const node = ClusterManager.getNode(sendingNode); assert(node?.id == sendingNode, "node not learnt"); - assert(clusterservice._nodes[sendingNode].stale == false, "node marked as stale"); + assert(ClusterManager._nodes[sendingNode].stale == false, "node marked as stale"); sinon.restore(); await bus.destroy(); }); it(`are marked stale after stale timeout`, async () => { - const spy = sinon.spy(ClusterService.prototype, "_staleNode"); + const spy = sinon.spy(ClusterManager, "_staleNode"); const bus = new EventBus(); await publish(bus, 'foo', {data: 42}); - let node = clusterservice.getNode(sendingNode); + let node = ClusterManager.getNode(sendingNode); assert(node?.id == sendingNode, "node not learnt"); - await clock.tickAsync(clusterservice._staleTimeout + 1); + await clock.tickAsync(ClusterManager._staleTimeout + 1); assert(spy.calledOnce == true, "_staleNode not called"); - node = clusterservice.getNode(sendingNode) + node = ClusterManager.getNode(sendingNode) assert(node == undefined, "getNode returned stale node"); - node = clusterservice._nodes[sendingNode] - assert(clusterservice._nodes[sendingNode].stale == true, "node marked as stale"); + node = ClusterManager._nodes[sendingNode] + assert(ClusterManager._nodes[sendingNode].stale == true, "node marked as stale"); await bus.destroy(); }); it(`are not marked stale when heartbeat is received`, async () => { - const spy = sinon.spy(ClusterService.prototype, "_staleNode"); + const spy = sinon.spy(ClusterManager, "_staleNode"); const bus = new EventBus(); await publish(bus, 'cluster:heartbeat', {}); - let node = clusterservice.getNode(sendingNode); + let node = ClusterManager.getNode(sendingNode); assert(node?.id == sendingNode, "node not learnt"); - await clock.tickAsync(clusterservice._staleTimeout / 2); + await clock.tickAsync(ClusterManager._staleTimeout / 2); await publish(bus, 'cluster:heartbeat', {}); - await clock.tickAsync((clusterservice._staleTimeout / 2) + 1); + await clock.tickAsync((ClusterManager._staleTimeout / 2) + 1); assert(spy.calledOnce == false, "_staleNode called"); await bus.destroy(); }); it(`are deleted after removal timeout`, async () => { - const spy = sinon.spy(ClusterService.prototype, "_forgetNode"); + const spy = sinon.spy(ClusterManager, "_forgetNode"); const bus = new EventBus(); await publish(bus, 'foo', {data: 42}); - let node = clusterservice.getNode(sendingNode); + let node = ClusterManager.getNode(sendingNode); assert(node?.id == sendingNode, "node not learnt"); - await clock.tickAsync(clusterservice._removalTimeout + 1); + await clock.tickAsync(ClusterManager._removalTimeout + 1); assert(spy.calledOnce == true, "_forgetNode not called"); - node = clusterservice.getNode(sendingNode) + node = ClusterManager.getNode(sendingNode) assert(node == undefined, "getNode returned node, should be deleted"); - node = clusterservice._nodes[sendingNode] + node = ClusterManager._nodes[sendingNode] assert(node == undefined, "node not removed"); await bus.destroy(); }); it(`are sending heartbeat`, async () => { - const spy = sinon.spy(ClusterService.prototype, "publish"); + const spy = sinon.spy(ClusterManager, "publish"); - // Heartbeat sent on ready - clusterservice.setReady(); - assert(spy.calledOnceWithExactly("cluster:heartbeat"), "initial onready heartbeat not sent"); + // Heartbeat sent on start + await ClusterManager.start(); + assert(spy.calledOnceWithExactly("cluster:heartbeat"), "initial start heartbeat not sent"); // Heartbeat sent after interval - await clock.tickAsync(clusterservice._heartbeatInterval + 1); + await clock.tickAsync(ClusterManager._heartbeatInterval + 1); assert(spy.getCall(1)?.calledWithExactly("cluster:heartbeat"), "heartbeat not sent"); - await clock.tickAsync(clusterservice._heartbeatInterval + 1); + await clock.tickAsync(ClusterManager._heartbeatInterval + 1); assert(spy.getCall(2)?.calledWithExactly("cluster:heartbeat"), "heartbeat not sent"); sinon.restore(); @@ -292,29 +291,45 @@ describe('cluster service', () => { const bus = new EventBus(); await publish(bus, 'foo', {data: 42}); - const nodes = clusterservice._getLearntPeers(); + const nodes = ClusterManager.getLearntPeers(); assert(nodes.length == 2, "unexpected numbers of peers"); await bus.destroy(); }); it(`are not returned by _getLearntPeers if stale`, async () => { - const spy = sinon.spy(ClusterService.prototype, "_staleNode"); + const spy = sinon.spy(ClusterManager, "_staleNode"); const bus = new EventBus(); await publish(bus, 'foo', {data: 42}); - let node = clusterservice.getNode(sendingNode); + let node = ClusterManager.getNode(sendingNode); assert(node?.id == sendingNode, "node not learnt"); - await clock.tickAsync(clusterservice._staleTimeout + 1); + await clock.tickAsync(ClusterManager._staleTimeout + 1); - const nodes = clusterservice._getLearntPeers(); + const nodes = ClusterManager.getLearntPeers(); assert(nodes.length == 1, "unexpected numbers of peers"); await bus.destroy(); }); + it(`getNodes() returns all nodes`, async () => { + const bus = new EventBus(); + + await clock.tickAsync(1); + await publish(bus, 'foo', {data: 42}); + await clock.tickAsync(1); + + const nodes = ClusterManager.getNodes(); + assert(nodes.length == 2, "unexpected number of nodes"); + + assert(nodes[0].last_ts == 2); + assert(nodes[1].ip == "127.0.0.127"); + assert(nodes[1].last_ts == 1); + + await bus.destroy(); + }); }); }); \ No newline at end of file diff --git a/test/unit/cluster/test_cluster-transport.ts b/test/unit/cluster/test_cluster-transport.ts index a9e1cfc..8059d1c 100644 --- a/test/unit/cluster/test_cluster-transport.ts +++ b/test/unit/cluster/test_cluster-transport.ts @@ -4,9 +4,9 @@ import tls from 'tls'; import fs from 'fs'; import sinon from 'sinon'; import ClusterTransport from '../../../src/transport/cluster/cluster-transport.js'; -import ClusterService, { ClusterNode } from '../../../src/cluster/index.js'; import { Duplex } from 'stream'; import Config from '../../../src/config.js'; +import ClusterManager, { ClusterManagerType, ClusterNode } from '../../../src/cluster/cluster-manager.js'; describe('cluster transport', () => { it('can be created and connected', async () => { @@ -19,9 +19,9 @@ describe('cluster transport', () => { server.listen(10000, () => { resolve(undefined); }); }); - const clusterService = new ClusterService("mem"); + await ClusterManager.init(ClusterManagerType.MEM); - sinon.stub(ClusterService.prototype, "getNode").returns({ + sinon.stub(ClusterManager, "getNode").returns({ id: "some-node-id", host: "some-node-host", ip: "127.0.0.1", @@ -48,7 +48,7 @@ describe('cluster transport', () => { }); }); - await clusterService.destroy(); + await ClusterManager.close(); sock.destroy(); await new Promise((resolve) => { server.close(() => { @@ -79,9 +79,9 @@ describe('cluster transport', () => { server.listen(11000, () => { resolve(undefined); }); }); - const clusterService = new ClusterService("mem"); + await ClusterManager.init(ClusterManagerType.MEM); - sinon.stub(ClusterService.prototype, "getNode").returns({ + sinon.stub(ClusterManager, "getNode").returns({ id: "some-node-id", host: "some-node-host", ip: "127.0.0.1", @@ -113,7 +113,7 @@ describe('cluster transport', () => { }); }); - await clusterService.destroy(); + await ClusterManager.close(); sock.destroy(); await new Promise((resolve) => { server.close(() => { diff --git a/test/unit/ingress/test_http_ingress.ts b/test/unit/ingress/test_http_ingress.ts index b016895..ce57d93 100644 --- a/test/unit/ingress/test_http_ingress.ts +++ b/test/unit/ingress/test_http_ingress.ts @@ -5,14 +5,13 @@ import AccountService from "../../../src/account/account-service.js"; import Config from "../../../src/config.js"; import IngressManager, { IngressType } from "../../../src/ingress/ingress-manager.js"; import TunnelService from "../../../src/tunnel/tunnel-service.js"; -import { createEchoHttpServer, initClusterService, initStorageService, wsSocketPair, wsmPair } from "../test-utils.js"; +import { createEchoHttpServer, initStorageService, wsSocketPair, wsmPair } from "../test-utils.js"; import sinon from 'sinon'; import net from 'net' import http from 'http'; import Tunnel from '../../../src/tunnel/tunnel.js'; import Account from '../../../src/account/account.js'; import { StorageService } from '../../../src/storage/index.js'; -import ClusterService from '../../../src/cluster/index.js'; import { WebSocketMultiplex } from '@exposr/ws-multiplex'; import WebSocketTransport from '../../../src/transport/ws/ws-transport.js'; import { Duplex } from 'stream'; @@ -20,16 +19,16 @@ import CustomError, { ERROR_TUNNEL_INGRESS_BAD_ALT_NAMES } from '../../../src/ut import HttpIngress from '../../../src/ingress/http-ingress.js'; import { httpRequest } from './utils.js'; import TunnelConnectionManager from '../../../src/tunnel/tunnel-connection-manager.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('http ingress', () => { let clock: sinon.SinonFakeTimers; let storageService: StorageService; - let clusterService: ClusterService; let config: Config; before(async () => { config = new Config(); - clusterService = initClusterService(); + await ClusterManager.init(ClusterManagerType.MEM); storageService = await initStorageService(); clock = sinon.useFakeTimers({shouldAdvanceTime: true}); @@ -48,8 +47,8 @@ describe('http ingress', () => { after(async () => { await TunnelConnectionManager.stop(); await IngressManager.close(); - await clusterService.destroy(); await storageService.destroy(); + await ClusterManager.close(); await config.destroy(); await echoServer.destroy(); clock.restore(); diff --git a/test/unit/ingress/test_sni-ingress.ts b/test/unit/ingress/test_sni-ingress.ts index ebc3c10..ff3ff0b 100644 --- a/test/unit/ingress/test_sni-ingress.ts +++ b/test/unit/ingress/test_sni-ingress.ts @@ -6,11 +6,10 @@ import fs from 'fs'; import crypto from 'crypto'; import net from 'net'; import tls, { TLSSocket } from 'tls'; -import { createEchoHttpServer, initClusterService, initStorageService, wsSocketPair, wsmPair } from '../test-utils.js' +import { createEchoHttpServer, initStorageService, wsSocketPair, wsmPair } from '../test-utils.js' import Config from '../../../src/config.js'; import IngressManager, { IngressType } from '../../../src/ingress/ingress-manager.js'; import { StorageService } from '../../../src/storage/index.js'; -import ClusterService from '../../../src/cluster/index.js'; import TunnelService from '../../../src/tunnel/tunnel-service.js'; import Account from '../../../src/account/account.js'; import AccountService from '../../../src/account/account-service.js'; @@ -20,6 +19,7 @@ import WebSocketTransport from '../../../src/transport/ws/ws-transport.js'; import { Duplex } from 'stream'; import { httpRequest } from './utils.js'; import TunnelConnectionManager from '../../../src/tunnel/tunnel-connection-manager.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('sni', () => { @@ -89,7 +89,6 @@ describe('sni', () => { urlTests.forEach(({args, expected}) => { it(`baseurl for ${JSON.stringify(args)} returns ${expected}`, async () => { let storageService: StorageService; - let clusterService: ClusterService; let tunnelService: TunnelService; let accountService: AccountService; let config: Config; @@ -98,7 +97,7 @@ describe('sni', () => { config = new Config(); storageService = await initStorageService(); - clusterService = initClusterService(); + await ClusterManager.init(ClusterManagerType.MEM); await IngressManager.listen({ sni: { enabled: true, @@ -126,7 +125,7 @@ describe('sni', () => { await tunnelService.destroy(); await IngressManager.close(); await storageService.destroy(); - await clusterService.destroy(); + await ClusterManager.close(); config.destroy(); assert(url?.href == expected, `expected ${expected}, got ${url?.href}`); @@ -138,7 +137,6 @@ describe('sni', () => { describe('ingress', () => { let storageService: StorageService; - let clusterService: ClusterService; let config: Config; let echoServer: { destroy: () => Promise; }; @@ -150,7 +148,7 @@ describe('sni', () => { clock = sinon.useFakeTimers({shouldAdvanceTime: true}); config = new Config(); storageService = await initStorageService(); - clusterService = initClusterService(); + await ClusterManager.init(ClusterManagerType.MEM); await TunnelConnectionManager.start(); await IngressManager.listen({ sni: { @@ -166,7 +164,7 @@ describe('sni', () => { after(async () => { await storageService.destroy(); - await clusterService.destroy(); + await ClusterManager.close(); await TunnelConnectionManager.stop(); await IngressManager.close(); config.destroy() diff --git a/test/unit/test-utils.ts b/test/unit/test-utils.ts index 2441c31..adf6eb7 100644 --- a/test/unit/test-utils.ts +++ b/test/unit/test-utils.ts @@ -4,7 +4,6 @@ import fs from 'node:fs'; import { Duplex } from 'node:stream'; import * as url from 'node:url'; import { WebSocket, WebSocketServer } from "ws"; -import ClusterService from "../../src/cluster/index.js"; import { StorageService } from "../../src/storage/index.js"; import WebSocketTransport from "../../src/transport/ws/ws-transport.js"; import { WebSocketMultiplex } from "@exposr/ws-multiplex"; @@ -18,10 +17,6 @@ export const initStorageService = async (): Promise => { }); }; -export const initClusterService = () => { - return new ClusterService('mem', {}); -} - export const socketPair = () => { const sock1 = new Duplex({read(size) {}}); const sock2 = new Duplex({read(size) {}}); diff --git a/test/unit/transport/test_ssh-endpoint.ts b/test/unit/transport/test_ssh-endpoint.ts index 3f4873e..51f2e3a 100644 --- a/test/unit/transport/test_ssh-endpoint.ts +++ b/test/unit/transport/test_ssh-endpoint.ts @@ -1,10 +1,11 @@ import assert from 'assert/strict'; import Tunnel from '../../../src/tunnel/tunnel.js'; import SSHEndpoint from '../../../src/transport/ssh/ssh-endpoint.js'; -import { initClusterService, initStorageService } from '../test-utils.js' +import { initStorageService } from '../test-utils.js' import Config from '../../../src/config.js'; import IngressManager from '../../../src/ingress/ingress-manager.js'; import { TunnelConfig } from '../../../src/tunnel/tunnel-config.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('ssh endpoint', () => { @@ -40,7 +41,7 @@ describe('ssh endpoint', () => { it(`getEndpoint() for ${JSON.stringify(args)}, ${baseUrl} returns ${expected}`, async () => { const config = new Config(); const storageService = await initStorageService(); - const clusterService = initClusterService(); + await ClusterManager.init(ClusterManagerType.MEM); await IngressManager.listen({ http: { enabled: true, @@ -65,9 +66,9 @@ describe('ssh endpoint', () => { assert(ep.url == expected, `got ${ep.url}`); await storageService.destroy(); - await clusterService.destroy(); - await config.destroy(); + await ClusterManager.close(); await IngressManager.close(); + await config.destroy(); }); }); }); \ No newline at end of file diff --git a/test/unit/transport/test_ssh_transport.ts b/test/unit/transport/test_ssh_transport.ts index b00336e..0fa8208 100644 --- a/test/unit/transport/test_ssh_transport.ts +++ b/test/unit/transport/test_ssh_transport.ts @@ -6,7 +6,6 @@ import Config from '../../../src/config.js'; import TransportService from '../../../src/transport/transport-service.js' import { createEchoHttpServer, initStorageService } from '../test-utils.js'; import ssh, { PasswordAuthMethod } from 'ssh2'; -import ClusterService from '../../../src/cluster/index.js'; import { StorageService } from '../../../src/storage/index.js'; import AccountService from '../../../src/account/account-service.js'; import TunnelService from '../../../src/tunnel/tunnel-service.js'; @@ -15,12 +14,12 @@ import Account from '../../../src/account/account.js'; import sinon from 'sinon'; import IngressManager from '../../../src/ingress/ingress-manager.js'; import TunnelConnectionManager from '../../../src/tunnel/tunnel-connection-manager.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('SSH transport', () => { let clock: sinon.SinonFakeTimers; let config: Config; let storageservice: StorageService; - let clusterservice: ClusterService; let accountService: AccountService; let tunnelService: TunnelService; let echoServer: any; @@ -34,7 +33,7 @@ describe('SSH transport', () => { "--log-level", "debug" ]); storageservice = await initStorageService(); - clusterservice = new ClusterService('mem', {}); + await ClusterManager.init(ClusterManagerType.MEM); await TunnelConnectionManager.start(); await IngressManager.listen({ http: { @@ -58,7 +57,7 @@ describe('SSH transport', () => { await accountService.destroy(); await IngressManager.close(); await TunnelConnectionManager.stop(); - await clusterservice.destroy(); + await ClusterManager.close(); await storageservice.destroy(); await config.destroy(); await echoServer.destroy(); diff --git a/test/unit/transport/test_ws_transport.ts b/test/unit/transport/test_ws_transport.ts index 841da17..ea87b7b 100644 --- a/test/unit/transport/test_ws_transport.ts +++ b/test/unit/transport/test_ws_transport.ts @@ -6,7 +6,6 @@ import WebSocket from 'ws'; import Config from '../../../src/config.js'; import TransportService from '../../../src/transport/transport-service.js' import { createEchoHttpServer, initStorageService } from '../test-utils.js'; -import ClusterService from '../../../src/cluster/index.js'; import { StorageService } from '../../../src/storage/index.js'; import AccountService from '../../../src/account/account-service.js'; import TunnelService from '../../../src/tunnel/tunnel-service.js'; @@ -17,12 +16,12 @@ import { WebSocketMultiplex } from '@exposr/ws-multiplex'; import { Duplex } from 'stream'; import IngressManager from '../../../src/ingress/ingress-manager.js'; import TunnelConnectionManager from '../../../src/tunnel/tunnel-connection-manager.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('WS transport', () => { let clock: sinon.SinonFakeTimers; let config: Config; let storageservice: StorageService; - let clusterservice: ClusterService; let accountService: AccountService; let tunnelService: TunnelService; let echoServer: any; @@ -36,8 +35,7 @@ describe('WS transport', () => { "--log-level", "debug" ]); storageservice = await initStorageService(); - clusterservice = new ClusterService('mem', {}); - + await ClusterManager.init(ClusterManagerType.MEM); await TunnelConnectionManager.start(); await IngressManager.listen({ http: { @@ -61,7 +59,7 @@ describe('WS transport', () => { await accountService.destroy(); await IngressManager.close(); await TunnelConnectionManager.stop(); - await clusterservice.destroy(); + await ClusterManager.close(); await storageservice.destroy(); await config.destroy(); await echoServer.destroy(); diff --git a/test/unit/tunnel/test_tunnel-connection-manager.ts b/test/unit/tunnel/test_tunnel-connection-manager.ts index b85871e..9eef255 100644 --- a/test/unit/tunnel/test_tunnel-connection-manager.ts +++ b/test/unit/tunnel/test_tunnel-connection-manager.ts @@ -6,12 +6,11 @@ import EventBus from '../../../src/cluster/eventbus.js'; import Config from '../../../src/config.js'; import { initStorageService } from '../test-utils.js'; import { StorageService } from '../../../src/storage/index.js'; -import ClusterService from '../../../src/cluster/index.js'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('tunnel connection manager', () => { let config: Config; let storageService: StorageService; - let clusterService: ClusterService; let clock: sinon.SinonFakeTimers; beforeEach(async () => { @@ -19,13 +18,13 @@ describe('tunnel connection manager', () => { config = new Config(); storageService = await initStorageService(); - clusterService = new ClusterService('mem', {}); + await ClusterManager.init(ClusterManagerType.MEM); await TunnelConnectionManager.start(); }); afterEach(async () => { await TunnelConnectionManager.stop(); - await clusterService.destroy(); + await ClusterManager.close(); await storageService.destroy(); clock.restore(); sinon.restore(); diff --git a/test/unit/tunnel/test_tunnel-service.ts b/test/unit/tunnel/test_tunnel-service.ts index bf2ebba..fb9a1e0 100644 --- a/test/unit/tunnel/test_tunnel-service.ts +++ b/test/unit/tunnel/test_tunnel-service.ts @@ -5,7 +5,6 @@ import net from 'net'; import Config from '../../../src/config.js'; import { StorageService } from '../../../src/storage/index.js'; import { initStorageService, wsSocketPair } from '../test-utils.js'; -import ClusterService from '../../../src/cluster/index.js'; import AccountService from '../../../src/account/account-service.js'; import TunnelConnectionManager from '../../../src/tunnel/tunnel-connection-manager.js'; import IngressManager from '../../../src/ingress/ingress-manager.js'; @@ -14,19 +13,19 @@ import Tunnel from '../../../src/tunnel/tunnel.js'; import EventBus from '../../../src/cluster/eventbus.js'; import WebSocketTransport from '../../../src/transport/ws/ws-transport.js'; import { WebSocketMultiplex } from '@exposr/ws-multiplex'; +import ClusterManager, { ClusterManagerType } from '../../../src/cluster/cluster-manager.js'; describe('tunnel service', () => { let clock: sinon.SinonFakeTimers; let config: Config; let storageService: StorageService; - let clusterService: ClusterService; let accountService: AccountService; beforeEach(async () => { clock = sinon.useFakeTimers({shouldAdvanceTime: true, now: 10000}); config = new Config(); storageService = await initStorageService(); - clusterService = new ClusterService('mem', {}); + await ClusterManager.init(ClusterManagerType.MEM); await TunnelConnectionManager.start(); await IngressManager.listen({ http: { @@ -42,7 +41,7 @@ describe('tunnel service', () => { await accountService.destroy(); await IngressManager.close(); await TunnelConnectionManager.stop(); - await clusterService.destroy(); + await ClusterManager.close(); await storageService.destroy(); await config.destroy(); clock.restore(); @@ -378,7 +377,7 @@ describe('tunnel service', () => { assert(tunnel instanceof Tunnel, `tunnel not created, got ${tunnel}`); assert(tunnel?.id == tunnelId, `expected id ${tunnelId}, got ${tunnel?.id}`); - sinon.stub(ClusterService.prototype, 'getNode').returns({ + sinon.stub(ClusterManager, 'getNode').returns({ id: "node-1", host: "some-node-host", ip: "127.0.0.1",