Skip to content

Commit

Permalink
Merge pull request #66 from exposr/refactor-cluster
Browse files Browse the repository at this point in the history
Refactor cluster sub-system
  • Loading branch information
fredriklindberg authored Jan 6, 2024
2 parents aa0fb8b + fd757fe commit b012e07
Show file tree
Hide file tree
Showing 29 changed files with 662 additions and 547 deletions.
224 changes: 119 additions & 105 deletions src/cluster/index.ts → src/cluster/cluster-manager.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
import { strict as assert } from 'assert';
import crypto from 'node:crypto';
import MemoryEventBus from './memory-eventbus.js';
import RedisEventBus from './redis-eventbus.js';
import RedisEventBus, { RedisEventBusOptions } from './redis-eventbus.js';
import Node from './cluster-node.js';
import { Logger } from '../logger.js';
import UdpEventBus from './udp-eventbus.js';
import EventBus from './eventbus.js';
import EventEmitter from 'node:events';
import UdpEventBus, { UdpEventBusOptions } from './udp-eventbus.js';
import { EmitMeta } from './eventbus.js';
import EventBusInterface from './eventbus-interface.js';

export type ClusterManagerOptions = {
key?: string,
staleTimeout?: number,
removalTimeout?: number,
heartbeatInterval?: number,
redis?: RedisEventBusOptions,
udp?: UdpEventBusOptions,
}

export enum ClusterManagerType {
REDIS = 'redis',
UDP = 'udp',
SINGLE_NODE = 'single-node',
MEM = 'mem',
}

export type ClusterNode = {
id: string,
Expand All @@ -23,33 +38,26 @@ type ClusterServiceNode = ClusterNode & {
removalTimer?: NodeJS.Timeout,
}

class ClusterService extends EventEmitter {
private static instance: ClusterService | undefined;
private static ref: number;

private logger: any;
private _key: string = '';
private _nodes: { [key: string]: ClusterServiceNode } = {};
private _listeners: Array<EventBus> = [];
private _seq: number = 0;
private _window_size!: number;
private _staleTimeout!: number;
private _removalTimeout!: number;
private _heartbeatInterval!: number;
private _bus: any;
private multiNode: boolean = false;
private _heartbeat: NodeJS.Timeout | undefined;

constructor(type?: 'redis' | 'udp' | 'single-node' | 'mem', opts?: any) {
super();
if (ClusterService.instance instanceof ClusterService) {
ClusterService.ref++;
return ClusterService.instance;
}
assert(type != null, "type not given");
ClusterService.instance = this;
ClusterService.ref = 1;

export type EmitCallback = (event: string, message: any, meta: EmitMeta) => void;

class ClusterManager {
private static logger: any;
private static _key: string = '';
private static _nodes: { [key: string]: ClusterServiceNode } = {};
private static _listeners: Array<EmitCallback> = [];
private static _seq: number = 0;
private static _window_size: number;
private static _staleTimeout: number;
private static _removalTimeout: number;
private static _heartbeatInterval: number;
private static _bus: EventBusInterface;
private static multiNode: boolean = false;
private static _heartbeat: NodeJS.Timeout | undefined;

private static initialized: boolean = false;
private static ready: boolean = false;

public static async init(type: ClusterManagerType, opts?: ClusterManagerOptions): Promise<void> {
this.logger = Logger("cluster-service");
this._key = opts?.key || '';
this._nodes = {};
Expand All @@ -70,67 +78,78 @@ class ClusterService extends EventEmitter {
this._heartbeatInterval = opts?.heartbeatInterval || 9500;

this._listeners = [];
const onMessage = (payload: string) => {
this._receive(payload)
};

const ready = async (err: Error) => {
if (err) {
await this.destroy();
}
this.logger.info(`Clustering mode ${type} initialized`);
typeof opts.callback === 'function' && process.nextTick(() => opts.callback(err));
};
try {
await new Promise((resolve, reject) => {

const ready = (err?: Error) => {
err ? reject(err) : resolve(undefined);
};

switch (type) {
case ClusterManagerType.REDIS:
this.multiNode = true;
this._bus = new RedisEventBus({
...<RedisEventBusOptions>opts?.redis,
callback: ready,
})
break;
case ClusterManagerType.UDP:
this.multiNode = true;
this._bus = new UdpEventBus({
...<UdpEventBusOptions>opts?.udp,
callback: ready,
});
break;
case ClusterManagerType.SINGLE_NODE:
case ClusterManagerType.MEM:
this.multiNode = false;
this._bus = new MemoryEventBus({
callback: ready,
});
break;
default:
reject(new Error(`no_such_cluster_type`));
}
});
} catch (e: any) {
throw e;
}

const getLearntPeers = () => {
return this._getLearntPeers();
};
this.initialized = true;
this.ready = false;
this.logger.info(`Clustering mode ${type} initialized`);
}

switch (type) {
case 'redis':
this.multiNode = true;
this._bus = new RedisEventBus({
...opts.redis,
callback: ready,
handler: onMessage,
})
break;
case 'udp':
this.multiNode = true;
this._bus = new UdpEventBus({
...opts.udp,
callback: ready,
handler: onMessage,
getLearntPeers,
});
break;
case 'single-node':
case 'mem':
this.multiNode = false;
this._bus = new MemoryEventBus({
callback: ready,
handler: onMessage,
});
break;
default:
assert.fail(`unknown type ${type}`);
public static async close(): Promise<void> {
if (!this.initialized) {
return;
}
this.stop();
await this._bus.destroy();
this._bus = <any>undefined;
this.initialized = false;
}

public async setReady(ready: boolean = true): Promise<boolean> {
if (!ready) {
clearInterval(this._heartbeat);
return this.multiNode;
public static isMultinode(): boolean {
return this.multiNode;
}

public static async start(): Promise<void> {
if (this.ready) {
return;
}

this.ready = true;

const heartbeat = () => {
this.publish("cluster:heartbeat");
};
heartbeat();
this._heartbeat = setInterval(heartbeat, this._heartbeatInterval);

if (!this.multiNode) {
return this.multiNode;
return;
}

const rapidHeartbeat = setInterval(heartbeat, 2000);
Expand All @@ -143,24 +162,29 @@ class ClusterService extends EventEmitter {
setTimeout(resolve, waitTime);
});
clearInterval(rapidHeartbeat);
return this.multiNode;
}

public attach(bus: EventBus): void {
this._listeners.push(bus);
public static stop(): void {
this.ready = false;
clearInterval(this._heartbeat);
this._heartbeat = undefined;
}

public static attach(callback: EmitCallback): void {
this._listeners.push(callback);
}

public detach(bus: EventBus): void {
this._listeners = this._listeners.filter((x) => x != bus);
public static detach(callback: EmitCallback): void {
this._listeners = this._listeners.filter((x) => x != callback);
}

private _getLearntPeers(): Array<string> {
public static getLearntPeers(): Array<string> {
return Array.from(new Set(Object.keys(this._nodes)
.filter((k) => !this._nodes[k].stale)
.map((k) => this._nodes[k].ip)));
}

private _learnNode(node: ClusterNode): ClusterServiceNode | undefined {
private static _learnNode(node: ClusterNode): ClusterServiceNode | undefined {
if (node?.id == undefined) {
return undefined;
}
Expand Down Expand Up @@ -209,16 +233,15 @@ class ClusterService extends EventEmitter {
return cnode;
}

private _forgetNode(node: ClusterServiceNode): void {
private static _forgetNode(node: ClusterServiceNode): void {
delete this._nodes[node.id];
this.logger.info({
message: `Node ${node.id} ${node.host} (${node.ip}) permanently removed from peer list`,
total_nodes: Object.keys(this._nodes).length,
});
this.emit('removed', {nodeId: node.id});
}

private _staleNode(node: ClusterServiceNode): void {
private static _staleNode(node: ClusterServiceNode): void {
if (!this._nodes[node?.id]) {
return;
}
Expand All @@ -227,10 +250,9 @@ class ClusterService extends EventEmitter {
message: `marking ${node.id} as stale`
});

this.emit('stale', {nodeId: node.id});
}

public getSelf(): ClusterNode {
public static getSelf(): ClusterNode {
return {
id: Node.identifier,
host: Node.hostname,
Expand All @@ -240,7 +262,7 @@ class ClusterService extends EventEmitter {
};
}

public getNode(id: string): ClusterNode | undefined {
public static getNode(id: string): ClusterNode | undefined {
const node: ClusterServiceNode = this._nodes[id];
if (node?.stale === false) {
return {
Expand All @@ -255,7 +277,7 @@ class ClusterService extends EventEmitter {
}
}

public getNodes(): Array<ClusterNode> {
public static getNodes(): Array<ClusterNode> {
return Object.keys(this._nodes).map((k) => {
return {
id: this._nodes[k].id,
Expand All @@ -267,7 +289,7 @@ class ClusterService extends EventEmitter {
})
}

private _receive(payload: string): boolean | Error {
public static receive(payload: string): boolean | Error {
try {
const msg = JSON.parse(payload);
const {s, ...data} = msg;
Expand Down Expand Up @@ -297,7 +319,7 @@ class ClusterService extends EventEmitter {

let csnode: ClusterServiceNode | undefined = this._nodes[cnode.id];
if (event == 'cluster:heartbeat' || csnode == undefined) {
csnode = this._learnNode(node);
csnode = this._learnNode(cnode);
}

if (csnode == undefined) {
Expand All @@ -324,7 +346,7 @@ class ClusterService extends EventEmitter {
}
csnode.seq_win |= (1 << rel_seq);

this._listeners.forEach((l) => l._emit(event, message, {
this._listeners.forEach((cb) => cb(event, message, {
node: {
id: cnode.id,
ip: cnode.ip,
Expand All @@ -342,7 +364,7 @@ class ClusterService extends EventEmitter {
}
}

public async publish(event: any, message: any = {}) {
public static async publish(event: any, message: any = {}) {
const payload = {
event,
message,
Expand All @@ -366,14 +388,6 @@ class ClusterService extends EventEmitter {
return this._bus.publish(JSON.stringify(msg));
}

public async destroy() {
if (--ClusterService.ref == 0) {
await this._bus.destroy();
this._bus = undefined;
this.removeAllListeners();
ClusterService.instance = undefined;
}
}
}

export default ClusterService;
export default ClusterManager;
Loading

0 comments on commit b012e07

Please sign in to comment.