From c0a8614323f1c783f4cb3cb695c5f02ed4bf7fa3 Mon Sep 17 00:00:00 2001 From: Harry Chen Date: Mon, 8 Jul 2024 15:03:23 +0800 Subject: [PATCH] feat: add abort controller for subscribe --- src/base.ts | 14 +++++++++++--- src/interface.ts | 11 +++++++++-- test/cp.test.ts | 32 ++++++++++++++++++++++++++++++++ test/local.test.ts | 35 ++++++++++++++++++++++++++++++++++- test/local/base_abort.ts | 24 ++++++++++++++++++++++++ test/thread.test.ts | 34 ++++++++++++++++++++++++++++++++++ 6 files changed, 144 insertions(+), 6 deletions(-) create mode 100644 test/local/base_abort.ts diff --git a/src/base.ts b/src/base.ts index c69c895..e55d490 100644 --- a/src/base.ts +++ b/src/base.ts @@ -10,6 +10,7 @@ import { MessageCategory, MessageType, PublishOptions, + SubscribeAbortController, SubscribeOptions, SubscribeTopicListener, WaitCheckOptions, @@ -493,7 +494,7 @@ export abstract class AbstractEventBus implements IEventBus { public subscribe( listener: SubscribeTopicListener, options: SubscribeOptions = {} - ) { + ): SubscribeAbortController { const topic = options.topic || DEFAULT_LISTENER_KEY; if (!this.topicListener.has(topic)) { this.topicListener.set(topic, new Set()); @@ -502,6 +503,11 @@ export abstract class AbstractEventBus implements IEventBus { listener['_subscribeOnce'] = true; } this.topicListener.get(topic).add(listener); + listener['_abortController'] = { + abort: () => { + this.topicListener.get(topic).delete(listener); + }, + }; // if topic has cache if ( @@ -517,14 +523,16 @@ export abstract class AbstractEventBus implements IEventBus { } this.topicMessageCache.get(topic).clear(); } + + return listener['_abortController']; } public subscribeOnce( listener: SubscribeTopicListener, options: SubscribeOptions = {} - ) { + ): SubscribeAbortController { options.subscribeOnce = true; - this.subscribe(listener, options); + return this.subscribe(listener, options); } public publish( diff --git a/src/interface.ts b/src/interface.ts index ca81f5d..f83d42c 100644 --- a/src/interface.ts +++ b/src/interface.ts @@ -122,14 +122,21 @@ export type SubscribeTopicListener = ( responder?: IResponder ) => void | Promise; +export type SubscribeAbortController = { + abort: () => void; +}; + export interface IEventBus { addWorker(worker: T); start(err?: Error): Promise; - subscribe(callback: SubscribeTopicListener, options?: SubscribeOptions): void; + subscribe( + callback: SubscribeTopicListener, + options?: SubscribeOptions + ): SubscribeAbortController; subscribeOnce( callback: SubscribeTopicListener, options?: SubscribeOptions - ): void; + ): SubscribeAbortController; publishAsync( data: unknown, publishOptions?: PublishOptions diff --git a/test/cp.test.ts b/test/cp.test.ts index 6968bbd..08909fd 100644 --- a/test/cp.test.ts +++ b/test/cp.test.ts @@ -51,6 +51,38 @@ describe('/test/cp.test.ts', function () { await bus.stop(); }); + it('test base subscribe with abort', async () => { + const bus = new ChildProcessEventBus(); + const worker = createChildProcessWorker(join(__dirname, 'cp/base.ts')); + bus.addWorker(worker); + await bus.start(); + + const result = await new Promise(resolve => { + const abortController = bus.subscribe(message => { + resolve(message.body); + }); + + abortController.abort(); + + bus.publish({ + data: { + name: 'test', + } + }, { + topic: 'target' + }); + + setTimeout(() => { + resolve(null); + }, 1000); + }); + + expect(result).toEqual(null); + + worker.kill(); + await bus.stop(); + }); + it('test publish with async', async () => { const bus = new ChildProcessEventBus(); const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async.ts')); diff --git a/test/local.test.ts b/test/local.test.ts index 1c46045..ca58cd3 100644 --- a/test/local.test.ts +++ b/test/local.test.ts @@ -22,7 +22,7 @@ describe('/test/local.test.ts', function () { await bus.stop(); }); - it('test base publish and subscribe', async () => { + it.only('test base publish and subscribe', async () => { const bus = new LocalEventBus({ isWorker: false, }); @@ -49,6 +49,39 @@ describe('/test/local.test.ts', function () { await bus.stop(); }); + it.only('test base subscribe and abort', async () => { + const bus = new LocalEventBus({ + isWorker: false, + }); + createLocalWorker(join(__dirname, 'local/base_abort.ts')); + await bus.start(); + + const result = await new Promise(resolve => { + const abortController = bus.subscribe(message => { + resolve(message.body); + }); + + abortController.abort(); + + bus.publish({ + data: { + name: 'test', + } + }, + { + topic: 'target', + }); + + setTimeout(() => { + resolve(null); + }, 1000); + }); + + expect(result).toEqual(null); + + await bus.stop(); + }); + it('test publish with async', async () => { const bus = new LocalEventBus({ isWorker: false, diff --git a/test/local/base_abort.ts b/test/local/base_abort.ts new file mode 100644 index 0000000..badf096 --- /dev/null +++ b/test/local/base_abort.ts @@ -0,0 +1,24 @@ +import { LocalEventBus } from '../../src/index'; + +export async function createWorker() { + const bus = new LocalEventBus({ + isWorker: true, + }); + + bus.subscribe(message=>{ + console.log(message); + + bus.publish({ + data: 'hello world' + }); + }, + { + topic: 'target', + }); + + await bus.start(); +} + +createWorker().then(() => { + console.log('ready'); +}); diff --git a/test/thread.test.ts b/test/thread.test.ts index 85cc4eb..776d5ba 100644 --- a/test/thread.test.ts +++ b/test/thread.test.ts @@ -52,6 +52,40 @@ describe('/test/thread.test.ts', function () { await bus.stop(); }); + it('test base subscribe and abort', async () => { + const bus = new ThreadEventBus(); + const worker = createThreadWorker(join(__dirname, 'worker/base.ts')); + bus.addWorker(worker); + await bus.start(); + + const result = await new Promise(resolve => { + const abortController = bus.subscribe(message => { + resolve(message.body); + }); + + abortController.abort(); + + bus.publish({ + data: { + name: 'test', + } + }, + { + topic: 'target', + }); + + setTimeout(() => { + resolve(null); + }, 1000); + + }); + + expect(result).toEqual(null); + + await worker.terminate(); + await bus.stop(); + }); + it('test publish with async', async () => { const bus = new ThreadEventBus(); const worker = createThreadWorker(join(__dirname, 'worker/publish_async.ts'));