From 6e2d39149882d620768642708e9abf3abd4e1c9b Mon Sep 17 00:00:00 2001 From: Fredrik Lindberg Date: Thu, 21 Dec 2023 07:14:47 +0100 Subject: [PATCH] refactor: cluster sub-system to typescript --- src/cluster/cluster-manager.ts | 84 +++++++++--------- src/cluster/discovery-method.ts | 13 +++ src/cluster/eventbus-interface.ts | 35 ++++++++ src/cluster/eventbus.ts | 31 ++++--- ...s-discovery.js => kubernetes-discovery.ts} | 63 +++++++++----- src/cluster/memory-eventbus.js | 29 ------- src/cluster/memory-eventbus.ts | 38 +++++++++ ...st-discovery.js => multicast-discovery.ts} | 28 ++++-- .../{redis-eventbus.js => redis-eventbus.ts} | 84 +++++++++--------- .../{udp-eventbus.js => udp-eventbus.ts} | 85 ++++++++++++------- src/index.js | 10 +-- test/system/eventbus/test_redis.js | 10 +-- test/unit/cluster/test_cluster-eventbus.js | 8 +- test/unit/cluster/test_cluster-service.js | 6 +- 14 files changed, 317 insertions(+), 207 deletions(-) create mode 100644 src/cluster/discovery-method.ts create mode 100644 src/cluster/eventbus-interface.ts rename src/cluster/{kubernetes-discovery.js => kubernetes-discovery.ts} (58%) delete mode 100644 src/cluster/memory-eventbus.js create mode 100644 src/cluster/memory-eventbus.ts rename src/cluster/{multicast-discovery.js => multicast-discovery.ts} (66%) rename src/cluster/{redis-eventbus.js => redis-eventbus.ts} (72%) rename src/cluster/{udp-eventbus.js => udp-eventbus.ts} (63%) diff --git a/src/cluster/cluster-manager.ts b/src/cluster/cluster-manager.ts index 4cc7bc1..7e5af1c 100644 --- a/src/cluster/cluster-manager.ts +++ b/src/cluster/cluster-manager.ts @@ -1,19 +1,19 @@ -import { strict as assert } from 'assert'; import crypto from 'node:crypto'; import MemoryEventBus from './memory-eventbus.js'; -import RedisEventBus from './redis-eventbus.js'; +import RedisEventBus, { RedisEventBusOptions } from './redis-eventbus.js'; import Node from './cluster-node.js'; import { Logger } from '../logger.js'; -import UdpEventBus from './udp-eventbus.js'; -import EventBus from './eventbus.js'; +import UdpEventBus, { UdpEventBusOptions } from './udp-eventbus.js'; +import { EmitMeta } from './eventbus.js'; +import EventBusInterface from './eventbus-interface.js'; export type ClusterManagerOptions = { key?: string, staleTimeout?: number, removalTimeout?: number, heartbeatInterval?: number, - redis? : any, - udp: any, + redis?: RedisEventBusOptions, + udp?: UdpEventBusOptions, } export enum ClusterManagerType { @@ -38,20 +38,19 @@ type ClusterServiceNode = ClusterNode & { removalTimer?: NodeJS.Timeout, } -class ClusterManager { - //private static instance: ClusterManager | undefined; - //private static ref: number; +export type EmitCallback = (event: string, message: any, meta: EmitMeta) => void; +class ClusterManager { private static logger: any; private static _key: string = ''; private static _nodes: { [key: string]: ClusterServiceNode } = {}; - private static _listeners: Array = []; + private static _listeners: Array = []; private static _seq: number = 0; private static _window_size: number; private static _staleTimeout: number; private static _removalTimeout: number; private static _heartbeatInterval: number; - private static _bus: any; + private static _bus: EventBusInterface; private static multiNode: boolean = false; private static _heartbeat: NodeJS.Timeout | undefined; @@ -59,14 +58,6 @@ class ClusterManager { private static ready: boolean = false; public static async init(type: ClusterManagerType, opts?: ClusterManagerOptions): Promise { - //if (ClusterManager.instance instanceof ClusterManager) { - // ClusterManager.ref++; - // return ClusterManager.instance; - //} - //assert(type != null, "type not given"); - //ClusterManager.instance = this; - //ClusterManager.ref = 1; - this.logger = Logger("cluster-service"); this._key = opts?.key || ''; this._nodes = {}; @@ -87,13 +78,6 @@ class ClusterManager { this._heartbeatInterval = opts?.heartbeatInterval || 9500; this._listeners = []; - const onMessage = (payload: string) => { - this.receive(payload) - }; - - const getLearntPeers = () => { - return this.getLearntPeers(); - }; try { await new Promise((resolve, reject) => { @@ -106,18 +90,15 @@ class ClusterManager { case ClusterManagerType.REDIS: this.multiNode = true; this._bus = new RedisEventBus({ - ...opts?.redis, + ...opts?.redis, callback: ready, - handler: onMessage, }) break; case ClusterManagerType.UDP: this.multiNode = true; this._bus = new UdpEventBus({ - ...opts?.udp, + ...opts?.udp, callback: ready, - handler: onMessage, - getLearntPeers, }); break; case ClusterManagerType.SINGLE_NODE: @@ -125,19 +106,18 @@ class ClusterManager { this.multiNode = false; this._bus = new MemoryEventBus({ callback: ready, - handler: onMessage, }); break; default: reject(new Error(`no_such_cluster_type`)); } }); - } - catch (e: any) { + } catch (e: any) { throw e; } this.initialized = true; + this.ready = false; this.logger.info(`Clustering mode ${type} initialized`); } @@ -145,16 +125,23 @@ class ClusterManager { if (!this.initialized) { return; } + this.stop(); await this._bus.destroy(); - this._bus = undefined; + this._bus = undefined; + this.initialized = false; } - public static async setReady(ready: boolean = true): Promise { - if (!ready) { - clearInterval(this._heartbeat); - return this.multiNode; + public static isMultinode(): boolean { + return this.multiNode; + } + + public static async start(): Promise { + if (this.ready) { + return; } + this.ready = true; + const heartbeat = () => { this.publish("cluster:heartbeat"); }; @@ -162,7 +149,7 @@ class ClusterManager { this._heartbeat = setInterval(heartbeat, this._heartbeatInterval); if (!this.multiNode) { - return this.multiNode; + return; } const rapidHeartbeat = setInterval(heartbeat, 2000); @@ -175,15 +162,20 @@ class ClusterManager { setTimeout(resolve, waitTime); }); clearInterval(rapidHeartbeat); - return this.multiNode; } - public static attach(bus: EventBus): void { - this._listeners.push(bus); + public static stop(): void { + this.ready = false; + clearInterval(this._heartbeat); + this._heartbeat = undefined; + } + + public static attach(callback: EmitCallback): void { + this._listeners.push(callback); } - public static detach(bus: EventBus): void { - this._listeners = this._listeners.filter((x) => x != bus); + public static detach(callback: EmitCallback): void { + this._listeners = this._listeners.filter((x) => x != callback); } public static getLearntPeers(): Array { @@ -354,7 +346,7 @@ class ClusterManager { } csnode.seq_win |= (1 << rel_seq); - this._listeners.forEach((l) => l._emit(event, message, { + this._listeners.forEach((cb) => cb(event, message, { node: { id: cnode.id, ip: cnode.ip, diff --git a/src/cluster/discovery-method.ts b/src/cluster/discovery-method.ts new file mode 100644 index 0000000..9fe0d9c --- /dev/null +++ b/src/cluster/discovery-method.ts @@ -0,0 +1,13 @@ +import dgram from 'dgram'; + +export default abstract class DiscoveryMethod { + + public abstract readonly name: string; + + public abstract eligible(): number; + + public abstract init(socket: dgram.Socket | undefined, socket6: dgram.Socket | undefined): void; + + public abstract getPeers(): Promise>; + +} \ No newline at end of file diff --git a/src/cluster/eventbus-interface.ts b/src/cluster/eventbus-interface.ts new file mode 100644 index 0000000..b95cae8 --- /dev/null +++ b/src/cluster/eventbus-interface.ts @@ -0,0 +1,35 @@ +import ClusterManager from './cluster-manager.js'; + +export type EventBusInterfaceOptions = { + callback: (error?: Error) => void +} + +export default abstract class EventBusInterface { + private destroyed: boolean = false; + + constructor(opts: EventBusInterfaceOptions) { + } + + public async destroy(): Promise { + if (this.destroyed) { + return; + } + await this._destroy(); + this.destroyed = true; + } + + public async publish(message: string): Promise { + return this._publish(message); + } + + protected abstract _publish(message: string): Promise; + + protected abstract _destroy(): Promise; + + protected receive(message: string): void { + const res: Boolean | Error = ClusterManager.receive(message); + if (res instanceof Error) { + throw res; + } + } +} \ No newline at end of file diff --git a/src/cluster/eventbus.ts b/src/cluster/eventbus.ts index 789c9e9..d7609ca 100644 --- a/src/cluster/eventbus.ts +++ b/src/cluster/eventbus.ts @@ -1,6 +1,6 @@ import { EventEmitter } from 'events'; import { Logger } from '../logger.js'; -import ClusterManager from './cluster-manager.js'; +import ClusterManager, { EmitCallback } from './cluster-manager.js'; export type EmitMeta = { node: { @@ -13,7 +13,7 @@ export type EmitMeta = { class EventBus extends EventEmitter { private logger: any; - private clusterService: ClusterManager; + private emitCallback: EmitCallback; constructor() { super(); @@ -27,24 +27,23 @@ class EventBus extends EventEmitter { this.setMaxListeners(this.getMaxListeners() - 1); }); - const clusterService = this.clusterService = new ClusterManager(); - ClusterManager.attach(this); + const emitCallback: EmitCallback = this.emitCallback = (event, message, meta) => { + super.emit(event, message, meta); + this.logger.isTraceEnabled() && + this.logger.trace({ + operation: 'emit', + event, + message, + meta + }); + }; + + ClusterManager.attach(emitCallback); } public async destroy(): Promise { this.removeAllListeners(); - ClusterManager.detach(this); - } - - public _emit(event: string, message: any, meta: EmitMeta) { - super.emit(event, message, meta); - this.logger.isTraceEnabled() && - this.logger.trace({ - operation: 'emit', - event, - message, - meta - }); + ClusterManager.detach(this.emitCallback); } async publish(event: string, message: any) { diff --git a/src/cluster/kubernetes-discovery.js b/src/cluster/kubernetes-discovery.ts similarity index 58% rename from src/cluster/kubernetes-discovery.js rename to src/cluster/kubernetes-discovery.ts index f347038..986f7db 100644 --- a/src/cluster/kubernetes-discovery.js +++ b/src/cluster/kubernetes-discovery.ts @@ -1,10 +1,30 @@ import fs from 'fs'; import dns from 'dns/promises'; import { Logger } from '../logger.js'; +import DiscoveryMethod from './discovery-method.js'; +import ClusterManager from './cluster-manager.js'; -class KubernetesDiscovery { - constructor(opts) { - this.logger = opts?.logger || Logger("kubernetes-discovery"); +export type KubernetesDiscoveryOptions = { + serviceNameEnv?: string, + namespaceEnv?: string, + serviceName?: string, + namespace?: string, + clusterDomain?: string, +} + +class KubernetesDiscovery implements DiscoveryMethod { + public readonly name: string; + + private logger: any; + private _serviceName: string; + private _namespace: string; + private _clusterDomain: string; + private _serviceHost: string; + private _cacheTime: number; + private _cachedPeers: Array | undefined; + + constructor(opts: KubernetesDiscoveryOptions) { + this.logger = Logger("kubernetes-discovery"); const serviceNameEnv = opts?.serviceNameEnv || 'SERVICE_NAME'; const namespaceEnv = opts?.namespaceEnv || 'POD_NAMESPACE'; @@ -15,13 +35,11 @@ class KubernetesDiscovery { this._serviceHost = `${this._serviceName}.${this._namespace}.svc.${this._clusterDomain}`; - this._getLearntPeers = opts.getLearntPeers; - this.name = `kubernetes service ${this._serviceHost}`; this._cacheTime = Date.now() - 1000; } - eligible() { + public eligible(): number { const namespaceFile = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'; if (!fs.existsSync(namespaceFile)) { @@ -34,30 +52,29 @@ class KubernetesDiscovery { return 10; } - init() { + public init(): void { this.logger.debug({ message: `using ${this._serviceHost} headless service for pod discovery`, }); } - async getPeers() { + public async getPeers(): Promise> { if (this._cachedPeers && (Date.now() - this._cacheTime) < 1000) { return this._cachedPeers; } - const peers = await this._resolvePeers() - .then((p) => { - this._cachedPeers = p; - this._cacheTime = Date.now(); - return p; - }) - .catch((err) => { - this.logger.warn({ - message: `failed to resolve ${this._serviceHost}: ${err.message}` - }); - return []; + + let peers: Array = []; + try { + peers = await this._resolvePeers(); + this._cachedPeers = peers; + this._cacheTime = Date.now(); + } catch (err: any) { + this.logger.warn({ + message: `failed to resolve ${this._serviceHost}: ${err.message}` }); + } - const learntPeers = this._getLearntPeers(); + const learntPeers = ClusterManager.getLearntPeers(); for (let i = 0; i < learntPeers.length; i++) { if (peers.indexOf(learntPeers[i]) === -1) { peers.push(learntPeers[i]); @@ -77,8 +94,12 @@ class KubernetesDiscovery { return result4.value; } else if (result6.status == 'fulfilled' && result6.value?.length > 0) { return result6.value; + } else if (result4.status == 'rejected') { + throw result4.reason; + } else if (result6.status == 'rejected') { + throw result6.reason; } else { - throw result4?.reason || result6?.reason || new Error('unknown'); + throw new Error('unknown'); } }); } diff --git a/src/cluster/memory-eventbus.js b/src/cluster/memory-eventbus.js deleted file mode 100644 index ee25ee6..0000000 --- a/src/cluster/memory-eventbus.js +++ /dev/null @@ -1,29 +0,0 @@ -import { Logger } from '../logger.js'; - -class MemoryEventBus { - constructor(opts) { - this._handler = opts.handler; - this.logger = Logger("memory-eventbus"); - typeof opts.callback === 'function' && process.nextTick(opts.callback); - } - - async destroy() { - return true; - } - - async publish(message) { - return new Promise((resolve) => { - process.nextTick(() => { - this._handler(message); - this.logger.debug({ - operation: 'publish', - channel: message.event, - message, - }); - resolve(); - }); - }); - } -} - -export default MemoryEventBus; \ No newline at end of file diff --git a/src/cluster/memory-eventbus.ts b/src/cluster/memory-eventbus.ts new file mode 100644 index 0000000..c195f6b --- /dev/null +++ b/src/cluster/memory-eventbus.ts @@ -0,0 +1,38 @@ +import { Logger } from '../logger.js'; +import EventBusInterface, { EventBusInterfaceOptions } from './eventbus-interface.js'; + +export type MemoryEventBusOptions = EventBusInterfaceOptions; + +class MemoryEventBus extends EventBusInterface { + private logger: any; + + constructor(opts: EventBusInterfaceOptions) { + super(opts) + this.logger = Logger("memory-eventbus"); + typeof opts.callback === 'function' && process.nextTick(opts.callback); + } + + protected async _destroy(): Promise { + } + + protected async _publish(message: string): Promise { + return new Promise((resolve) => { + process.nextTick(() => { + try { + this.receive(message); + this.logger.debug({ + operation: 'publish', + message, + }); + } catch (e: any) { + this.logger.error({ + message: `failed to receive message ${message}: ${e.message}`, + }); + } + resolve(); + }); + }); + } +} + +export default MemoryEventBus; \ No newline at end of file diff --git a/src/cluster/multicast-discovery.js b/src/cluster/multicast-discovery.ts similarity index 66% rename from src/cluster/multicast-discovery.js rename to src/cluster/multicast-discovery.ts index 58d2683..4c12da3 100644 --- a/src/cluster/multicast-discovery.js +++ b/src/cluster/multicast-discovery.ts @@ -1,4 +1,8 @@ -function inCidr(ipAddress, cidrPrefix) { +import dgram from 'dgram'; +import DiscoveryMethod from "./discovery-method.js"; +import { Logger } from '../logger.js'; + +function inCidr(ipAddress: string, cidrPrefix: string): boolean { const [subnet, prefixLength] = cidrPrefix.split('/'); const subnetOctets = subnet.split('.').map(Number); const ipOctets = ipAddress.split('.')?.map(Number); @@ -13,27 +17,35 @@ function inCidr(ipAddress, cidrPrefix) { (ipOctets[2] << 8) | ipOctets[3]; - const mask = (0xffffffff << (32 - prefixLength)) >>> 0; + const mask = (0xffffffff << (32 - Number.parseInt(prefixLength))) >>> 0; return (subnetInt & mask) === (ipInt & mask); } -class MulticastDiscovery { +export type MulticastDiscoveryOptions = { + group: string, +} + +class MulticastDiscovery implements DiscoveryMethod { + public readonly name: string; + + private _multicastgroup: string; + private logger: any; - constructor(opts) { + constructor(opts: MulticastDiscoveryOptions) { this._multicastgroup = opts.group || '239.0.0.1'; if (!inCidr(this._multicastgroup, "239.0.0.0/8")) { throw new Error(`${this._multicastgroup} is not within the private multicast range 239.0.0.0/8`); } - this.logger = opts.logger; + this.logger = Logger('multicast-discovery'); this.name = `multicast group ${this._multicastgroup}`; } - eligible() { + public eligible(): number { return 0; } - init(socket) { + public init(socket: dgram.Socket): void { if (!socket) { this.logger.error({ message: `Unable to initialize multicast discovery, no IPv4 socket available` @@ -47,7 +59,7 @@ class MulticastDiscovery { }); } - async getPeers() { + public async getPeers(): Promise> { return [this._multicastgroup]; } } diff --git a/src/cluster/redis-eventbus.js b/src/cluster/redis-eventbus.ts similarity index 72% rename from src/cluster/redis-eventbus.js rename to src/cluster/redis-eventbus.ts index 61f06f5..95d38ac 100644 --- a/src/cluster/redis-eventbus.js +++ b/src/cluster/redis-eventbus.ts @@ -1,9 +1,30 @@ import assert from 'assert/strict'; -import Redis from 'redis'; +import Redis, { RedisClientType } from 'redis'; import { Logger } from '../logger.js'; +import EventBusInterface, { EventBusInterfaceOptions } from './eventbus-interface.js'; -class RedisEventBus { - constructor(opts) { +export type RedisEventBusOptions = { + redisUrl: URL +} + +type _RedisEventBusOptions = EventBusInterfaceOptions & RedisEventBusOptions & { + callback: (error?: Error) => void +} + +class RedisEventBus extends EventBusInterface{ + private logger: any; + private _channel: string; + + private _subscriber: RedisClientType; + private _subscriber_error: Error | undefined; + private _subscriber_was_ready: boolean = false; + + private _publisher: RedisClientType; + private _publisher_error: Error | undefined; + private _publisher_was_ready: boolean = false; + + constructor(opts: _RedisEventBusOptions) { + super(opts); this.logger = Logger("redis-eventbus"); this._channel = "exposr"; @@ -21,10 +42,11 @@ class RedisEventBus { }); this._publisher = this._subscriber.duplicate(); - const readyHandler = (client) => { - const clientProp = `_${client}`; - const errorProp = `_${client}_error`; - const wasReadyProp = `_${client}_was_ready`; + const readyHandler = (client: "subscriber" | "publisher") => { + + const clientProp: "_subscriber" | "_publisher" = `_${client}`; + const errorProp: "_subscriber_error" | "_publisher_error" = `_${client}_error`; + const wasReadyProp: "_subscriber_was_ready" | "_publisher_was_ready" = `_${client}_was_ready`; this[clientProp].hello() .catch(() => {}) @@ -44,10 +66,11 @@ class RedisEventBus { }); }; - const errorHandler = (err, client) => { - const clientProp = `_${client}`; - const errorProp = `_${client}_error`; - const wasReadyProp = `_${client}_was_ready`; + const errorHandler = (err: Error, client: "subscriber" | "publisher") => { + + const clientProp: "_subscriber" | "_publisher" = `_${client}`; + const errorProp: "_subscriber_error" | "_publisher_error" = `_${client}_error`; + const wasReadyProp: "_subscriber_was_ready" | "_publisher_was_ready" = `_${client}_was_ready`; if (this[errorProp]?.message != err?.message) { this.logger.error({ @@ -105,8 +128,8 @@ class RedisEventBus { this._subscriber.subscribe(this._channel, (message) => { try { - opts.handler(message); - } catch (e) { + this.receive(message); + } catch (e: any) { this.logger.debug({ message: `failed to receive message: ${e.message}`, operation: 'redis_channel_error', @@ -128,7 +151,7 @@ class RedisEventBus { errorHandler(err, 'publisher'); }); }), - ]).catch((err) => { + ]).catch((err: any) => { this.logger.error({ message: `failed to connect to ${redisUrl}: ${err.message}`, operation: 'connect', @@ -142,13 +165,7 @@ class RedisEventBus { }); } - async destroy() { - if (this.destroyed) { - return; - } - - this.destroyed = true; - + protected async _destroy(): Promise { this.logger.trace({ operation: 'destroy', message: 'initiated' @@ -156,7 +173,7 @@ class RedisEventBus { try { await this._subscriber.unsubscribe(this._channel); - } catch (err) { + } catch (err: any) { this.logger.error({ operation: 'destroy', msg: 'could not unsubscribe', @@ -164,26 +181,15 @@ class RedisEventBus { }); } - const quit = (client) => { - return client.quit((res) => { - this.logger.trace({ - client: client.name, - operation: 'destroy', - message: 'complete', - res, - }); - }); - }; - - return Promise.allSettled([ - quit(this._publisher), - quit(this._subscriber) + await Promise.allSettled([ + this._publisher.quit(), + this._subscriber.quit(), ]); } - async publish(message) { - return this._publisher.publish(this._channel, message) - .catch((err) => { + protected async _publish(message: any): Promise { + await this._publisher.publish(this._channel, message) + .catch((err: any) => { this.logger.error({ message: `failed to publish message ${message.event}: ${err.message}`, operation: 'publish', diff --git a/src/cluster/udp-eventbus.js b/src/cluster/udp-eventbus.ts similarity index 63% rename from src/cluster/udp-eventbus.js rename to src/cluster/udp-eventbus.ts index 296d0e1..ea63cc6 100644 --- a/src/cluster/udp-eventbus.js +++ b/src/cluster/udp-eventbus.ts @@ -2,12 +2,33 @@ import assert from 'assert/strict'; import dgram from 'dgram'; import net from 'net'; import { Logger } from '../logger.js'; -import MulticastDiscovery from './multicast-discovery.js'; -import KubernetesDiscovery from './kubernetes-discovery.js'; +import MulticastDiscovery, { MulticastDiscoveryOptions } from './multicast-discovery.js'; +import KubernetesDiscovery, { KubernetesDiscoveryOptions } from './kubernetes-discovery.js'; +import EventBusInterface, { EventBusInterfaceOptions } from './eventbus-interface.js'; +import DiscoveryMethod from './discovery-method.js'; + +export type UdpEventBusOptions = { + port: number, + discoveryMethod: "multicast" | "kubernetes" | undefined, + multicast: MulticastDiscoveryOptions, + kubernetes: KubernetesDiscoveryOptions, +} + +type _UdpEventBusOptions = EventBusInterfaceOptions & UdpEventBusOptions & { + callback: (error?: Error) => void +} + +class UdpEventBus extends EventBusInterface { + private logger: any; + private _port: number; + private _discoveryMethods: { [key: string]: DiscoveryMethod } = {}; + private _discoveryMethod: DiscoveryMethod | undefined; + private _socket: dgram.Socket | undefined; + private _socket6: dgram.Socket | undefined; -class UdpEventBus { + constructor(opts: _UdpEventBusOptions) { + super(opts); - constructor(opts) { this.logger = Logger("udp-eventbus"); this._port = opts.port || 1025; @@ -15,17 +36,13 @@ class UdpEventBus { try { this._discoveryMethods = { multicast: new MulticastDiscovery({ - logger: this.logger, - getLearntPeers: opts.getLearntPeers, ...opts.multicast, }), kubernetes: new KubernetesDiscovery({ - logger: this.logger, - getLearntPeers: opts.getLearntPeers, ...opts.kubernetes }), }; - } catch (e) { + } catch (e: any) { if (typeof opts.callback === 'function') { process.nextTick(() => { opts.callback(e) }); } else { @@ -58,10 +75,13 @@ class UdpEventBus { this._discoveryMethod = eligibleMethods.length > 0 ? eligibleMethods[0].method : undefined; } + if (this._discoveryMethod == undefined) { + process.nextTick(() => { opts.callback(new Error('No working discovery methods available'))}); + return + } assert(this._discoveryMethod != undefined); - assert(opts.handler != undefined); - const onMessage = (data, rinfo) => { + const onMessage = (data: Buffer, rinfo: dgram.RemoteInfo) => { if (data.length <= 5) { return; } @@ -73,15 +93,21 @@ class UdpEventBus { return; } - opts.handler(msg.toString('utf-8')); + try { + this.receive(msg.toString('utf-8')); + } catch (e: any) { + this.logger.error({ + message: `Failed to receive message ${msg}` + }); + } }; - const createSocket = (type) => { + const createSocket = (type: dgram.SocketType): Promise => { return new Promise((resolve, reject) => { - const sock = dgram.createSocket({ type: 'udp4', reuseAddr: true }); + const sock = dgram.createSocket({ type, reuseAddr: true }); sock.on('message', onMessage); - const connectError = (err) => { + const connectError = (err: Error) => { sock.close(); reject(err); }; @@ -109,47 +135,48 @@ class UdpEventBus { this._socket6 = result6.value; mode += `${mode != '' ? '/' : ''}IPv6`; } - if (!this._socket && !this._socket6) { + + if (result4.status == 'rejected' && result6.status == 'rejected') { typeof opts.callback === 'function' && process.nextTick(() => { opts.callback(result4.reason) }); return; } - this._discoveryMethod.init(this._socket, this._socket6); + this._discoveryMethod?.init(this._socket, this._socket6); this.logger.info({ - message: `Cluster interface on ${this._port} (${mode}) using discovery method ${this._discoveryMethod.name}`, + message: `Cluster interface on ${this._port} (${mode}) using discovery method ${this._discoveryMethod?.name}`, }); typeof opts.callback === 'function' && process.nextTick(opts.callback); }); } - async destroy() { - return Promise.allSettled([ + protected async _destroy(): Promise { + await Promise.allSettled([ new Promise((resolve, reject) => { if (this._socket) { - this._socket.close(resolve); + this._socket.close(() => { resolve(undefined) }); } else { - resolve(); + resolve(undefined); } }), new Promise((resolve, reject) => { if (this._socket6) { - this._socket6.close(resolve); + this._socket6.close(() => { resolve(undefined) }); } else { - resolve(); + resolve(undefined); } }) ]) } - async publish(message) { - this._receivers = await this._discoveryMethod.getPeers(); + protected async _publish(message: any): Promise { + const receivers = (await this._discoveryMethod?.getPeers()) || []; const header = Buffer.allocUnsafe(4); header.writeUInt8(0xE0, 0); header.writeUInt8(0x05, 1); header.writeUInt16BE(0, 2); - const promises = this._receivers.map((receiver) => { + const promises = receivers.map((receiver: string) => { return new Promise((resolve, reject) => { const sock = net.isIPv6(receiver) ? this._socket6 : this._socket; if (!sock) { @@ -161,13 +188,13 @@ class UdpEventBus { if (err) { reject(err); } else { - resolve(); + resolve(undefined); } }); }); }); - return Promise.allSettled(promises); + await Promise.allSettled(promises); } } export default UdpEventBus; \ No newline at end of file diff --git a/src/index.js b/src/index.js index 4c77d3c..e0cb3d1 100644 --- a/src/index.js +++ b/src/index.js @@ -176,7 +176,7 @@ export default async (argv) => { process.exit(-1); }); - await ClusterManager.setReady(); + await ClusterManager.start(); adminController.setReady(); logger.info("exposrd ready"); @@ -200,13 +200,13 @@ export default async (argv) => { let result; try { - const multiNode = ClusterManager.setReady(false); - adminController.setReady(false); - // Drain and block new tunnel connections await Promise.race([TunnelConnectionManager.stop() , timeout, force]); - if (multiNode) { + adminController.setReady(false); + ClusterManager.stop(); + + if (ClusterManager.isMultinode()) { logger.info("Waiting for connections to drain..."); await Promise.race([new Promise((resolve) => { setTimeout(resolve, drainTimeout); diff --git a/test/system/eventbus/test_redis.js b/test/system/eventbus/test_redis.js index d7ad611..b2814f2 100644 --- a/test/system/eventbus/test_redis.js +++ b/test/system/eventbus/test_redis.js @@ -38,11 +38,10 @@ describe('redis eventbus', () => { }); await bus.publish('test2', {data: 10}); - let res = await bus.publish('test', {data: 42}); - assert(res == true, `failed to publish message, got ${res}`); + await bus.publish('test', {data: 42}); - res = await recv; - assert(res.data == 42); + let res = await recv; + assert(res.data == 42, `did not get expected message, got ${res.data}`); }); it('redis bus waitfor', async () => { @@ -52,8 +51,6 @@ describe('redis eventbus', () => { let res = await bus.publish('test', {data: 42}); - assert(res == true, `failed to publish message, got ${res}`); - res = await recv; assert(res.data == 42); assert(bus.listenerCount('test') == 0, 'listener still attached'); @@ -65,7 +62,6 @@ describe('redis eventbus', () => { }, 100); let res = await bus.publish('test', {data: 10}); - assert(res == true, `failed to publish message, got ${res}`); try { await recv; diff --git a/test/unit/cluster/test_cluster-eventbus.js b/test/unit/cluster/test_cluster-eventbus.js index 322518d..879a8f9 100644 --- a/test/unit/cluster/test_cluster-eventbus.js +++ b/test/unit/cluster/test_cluster-eventbus.js @@ -325,7 +325,7 @@ describe('UDP eventbus', () => { discoveryMethod: 'kubernetes', }); - sinon.stub(ClusterManager._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns([]); sinon.stub(dns, 'resolve4') @@ -344,7 +344,7 @@ describe('UDP eventbus', () => { discoveryMethod: 'kubernetes' }); - sinon.stub(ClusterManager._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns([]); sinon.stub(dns, 'resolve4') @@ -431,7 +431,7 @@ describe('UDP eventbus', () => { discoveryMethod: 'kubernetes' }); - sinon.stub(ClusterManager._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns(["127.0.0.2"]); sinon.stub(dns, 'resolve4') @@ -452,7 +452,7 @@ describe('UDP eventbus', () => { discoveryMethod: 'kubernetes' }); - sinon.stub(ClusterManager._bus._discoveryMethod, '_getLearntPeers') + sinon.stub(ClusterManager, 'getLearntPeers') .returns(["127.0.0.1"]); sinon.stub(dns, 'resolve4') diff --git a/test/unit/cluster/test_cluster-service.js b/test/unit/cluster/test_cluster-service.js index c26ef4d..1a4d405 100644 --- a/test/unit/cluster/test_cluster-service.js +++ b/test/unit/cluster/test_cluster-service.js @@ -273,9 +273,9 @@ describe('cluster service', () => { it(`are sending heartbeat`, async () => { const spy = sinon.spy(ClusterManager, "publish"); - // Heartbeat sent on ready - await ClusterManager.setReady(); - assert(spy.calledOnceWithExactly("cluster:heartbeat"), "initial onready heartbeat not sent"); + // Heartbeat sent on start + await ClusterManager.start(); + assert(spy.calledOnceWithExactly("cluster:heartbeat"), "initial start heartbeat not sent"); // Heartbeat sent after interval await clock.tickAsync(ClusterManager._heartbeatInterval + 1);