Skip to content

Commit

Permalink
refactor: cluster sub-system to typescript
Browse files Browse the repository at this point in the history
  • Loading branch information
fredriklindberg committed Jan 6, 2024
1 parent dc57837 commit 6e2d391
Show file tree
Hide file tree
Showing 14 changed files with 317 additions and 207 deletions.
84 changes: 38 additions & 46 deletions src/cluster/cluster-manager.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import { strict as assert } from 'assert';
import crypto from 'node:crypto';
import MemoryEventBus from './memory-eventbus.js';
import RedisEventBus from './redis-eventbus.js';
import RedisEventBus, { RedisEventBusOptions } from './redis-eventbus.js';
import Node from './cluster-node.js';
import { Logger } from '../logger.js';
import UdpEventBus from './udp-eventbus.js';
import EventBus from './eventbus.js';
import UdpEventBus, { UdpEventBusOptions } from './udp-eventbus.js';
import { EmitMeta } from './eventbus.js';
import EventBusInterface from './eventbus-interface.js';

export type ClusterManagerOptions = {
key?: string,
staleTimeout?: number,
removalTimeout?: number,
heartbeatInterval?: number,
redis? : any,
udp: any,
redis?: RedisEventBusOptions,
udp?: UdpEventBusOptions,
}

export enum ClusterManagerType {
Expand All @@ -38,35 +38,26 @@ type ClusterServiceNode = ClusterNode & {
removalTimer?: NodeJS.Timeout,
}

class ClusterManager {
//private static instance: ClusterManager | undefined;
//private static ref: number;
export type EmitCallback = (event: string, message: any, meta: EmitMeta) => void;

class ClusterManager {
private static logger: any;
private static _key: string = '';
private static _nodes: { [key: string]: ClusterServiceNode } = {};
private static _listeners: Array<EventBus> = [];
private static _listeners: Array<EmitCallback> = [];
private static _seq: number = 0;
private static _window_size: number;
private static _staleTimeout: number;
private static _removalTimeout: number;
private static _heartbeatInterval: number;
private static _bus: any;
private static _bus: EventBusInterface;
private static multiNode: boolean = false;
private static _heartbeat: NodeJS.Timeout | undefined;

private static initialized: boolean = false;
private static ready: boolean = false;

public static async init(type: ClusterManagerType, opts?: ClusterManagerOptions): Promise<void> {
//if (ClusterManager.instance instanceof ClusterManager) {
// ClusterManager.ref++;
// return ClusterManager.instance;
//}
//assert(type != null, "type not given");
//ClusterManager.instance = this;
//ClusterManager.ref = 1;

this.logger = Logger("cluster-service");
this._key = opts?.key || '';
this._nodes = {};
Expand All @@ -87,13 +78,6 @@ class ClusterManager {
this._heartbeatInterval = opts?.heartbeatInterval || 9500;

this._listeners = [];
const onMessage = (payload: string) => {
this.receive(payload)
};

const getLearntPeers = () => {
return this.getLearntPeers();
};

try {
await new Promise((resolve, reject) => {
Expand All @@ -106,63 +90,66 @@ class ClusterManager {
case ClusterManagerType.REDIS:
this.multiNode = true;
this._bus = new RedisEventBus({
...opts?.redis,
...<RedisEventBusOptions>opts?.redis,
callback: ready,
handler: onMessage,
})
break;
case ClusterManagerType.UDP:
this.multiNode = true;
this._bus = new UdpEventBus({
...opts?.udp,
...<UdpEventBusOptions>opts?.udp,
callback: ready,
handler: onMessage,
getLearntPeers,
});
break;
case ClusterManagerType.SINGLE_NODE:
case ClusterManagerType.MEM:
this.multiNode = false;
this._bus = new MemoryEventBus({
callback: ready,
handler: onMessage,
});
break;
default:
reject(new Error(`no_such_cluster_type`));
}
});
}
catch (e: any) {
} catch (e: any) {
throw e;
}

this.initialized = true;
this.ready = false;
this.logger.info(`Clustering mode ${type} initialized`);
}

public static async close(): Promise<void> {
if (!this.initialized) {
return;
}
this.stop();
await this._bus.destroy();
this._bus = undefined;
this._bus = <any>undefined;
this.initialized = false;
}

public static async setReady(ready: boolean = true): Promise<boolean> {
if (!ready) {
clearInterval(this._heartbeat);
return this.multiNode;
public static isMultinode(): boolean {
return this.multiNode;
}

public static async start(): Promise<void> {
if (this.ready) {
return;
}

this.ready = true;

const heartbeat = () => {
this.publish("cluster:heartbeat");
};
heartbeat();
this._heartbeat = setInterval(heartbeat, this._heartbeatInterval);

if (!this.multiNode) {
return this.multiNode;
return;
}

const rapidHeartbeat = setInterval(heartbeat, 2000);
Expand All @@ -175,15 +162,20 @@ class ClusterManager {
setTimeout(resolve, waitTime);
});
clearInterval(rapidHeartbeat);
return this.multiNode;
}

public static attach(bus: EventBus): void {
this._listeners.push(bus);
public static stop(): void {
this.ready = false;
clearInterval(this._heartbeat);
this._heartbeat = undefined;
}

public static attach(callback: EmitCallback): void {
this._listeners.push(callback);
}

public static detach(bus: EventBus): void {
this._listeners = this._listeners.filter((x) => x != bus);
public static detach(callback: EmitCallback): void {
this._listeners = this._listeners.filter((x) => x != callback);
}

public static getLearntPeers(): Array<string> {
Expand Down Expand Up @@ -354,7 +346,7 @@ class ClusterManager {
}
csnode.seq_win |= (1 << rel_seq);

this._listeners.forEach((l) => l._emit(event, message, {
this._listeners.forEach((cb) => cb(event, message, {
node: {
id: cnode.id,
ip: cnode.ip,
Expand Down
13 changes: 13 additions & 0 deletions src/cluster/discovery-method.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import dgram from 'dgram';

export default abstract class DiscoveryMethod {

public abstract readonly name: string;

public abstract eligible(): number;

public abstract init(socket: dgram.Socket | undefined, socket6: dgram.Socket | undefined): void;

public abstract getPeers(): Promise<Array<string>>;

}
35 changes: 35 additions & 0 deletions src/cluster/eventbus-interface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import ClusterManager from './cluster-manager.js';

export type EventBusInterfaceOptions = {
callback: (error?: Error) => void
}

export default abstract class EventBusInterface {
private destroyed: boolean = false;

constructor(opts: EventBusInterfaceOptions) {
}

public async destroy(): Promise<void> {
if (this.destroyed) {
return;
}
await this._destroy();
this.destroyed = true;
}

public async publish(message: string): Promise<void> {
return this._publish(message);
}

protected abstract _publish(message: string): Promise<void>;

protected abstract _destroy(): Promise<void>;

protected receive(message: string): void {
const res: Boolean | Error = ClusterManager.receive(message);
if (res instanceof Error) {
throw res;
}
}
}
31 changes: 15 additions & 16 deletions src/cluster/eventbus.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events';
import { Logger } from '../logger.js';
import ClusterManager from './cluster-manager.js';
import ClusterManager, { EmitCallback } from './cluster-manager.js';

export type EmitMeta = {
node: {
Expand All @@ -13,7 +13,7 @@ export type EmitMeta = {

class EventBus extends EventEmitter {
private logger: any;
private clusterService: ClusterManager;
private emitCallback: EmitCallback;

constructor() {
super();
Expand All @@ -27,24 +27,23 @@ class EventBus extends EventEmitter {
this.setMaxListeners(this.getMaxListeners() - 1);
});

const clusterService = this.clusterService = new ClusterManager();
ClusterManager.attach(this);
const emitCallback: EmitCallback = this.emitCallback = (event, message, meta) => {
super.emit(event, message, meta);
this.logger.isTraceEnabled() &&
this.logger.trace({
operation: 'emit',
event,
message,
meta
});
};

ClusterManager.attach(emitCallback);
}

public async destroy(): Promise<void> {
this.removeAllListeners();
ClusterManager.detach(this);
}

public _emit(event: string, message: any, meta: EmitMeta) {
super.emit(event, message, meta);
this.logger.isTraceEnabled() &&
this.logger.trace({
operation: 'emit',
event,
message,
meta
});
ClusterManager.detach(this.emitCallback);
}

async publish(event: string, message: any) {
Expand Down
Loading

0 comments on commit 6e2d391

Please sign in to comment.