Skip to content

Commit

Permalink
feat: add abort controller for subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
czy88840616 committed Jul 8, 2024
1 parent 2556f10 commit c0a8614
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 6 deletions.
14 changes: 11 additions & 3 deletions src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
MessageCategory,
MessageType,
PublishOptions,
SubscribeAbortController,
SubscribeOptions,
SubscribeTopicListener,
WaitCheckOptions,
Expand Down Expand Up @@ -493,7 +494,7 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
public subscribe(
listener: SubscribeTopicListener,
options: SubscribeOptions = {}
) {
): SubscribeAbortController {
const topic = options.topic || DEFAULT_LISTENER_KEY;
if (!this.topicListener.has(topic)) {
this.topicListener.set(topic, new Set());
Expand All @@ -502,6 +503,11 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
listener['_subscribeOnce'] = true;
}
this.topicListener.get(topic).add(listener);
listener['_abortController'] = {
abort: () => {
this.topicListener.get(topic).delete(listener);
},
};

// if topic has cache
if (
Expand All @@ -517,14 +523,16 @@ export abstract class AbstractEventBus<T> implements IEventBus<T> {
}
this.topicMessageCache.get(topic).clear();
}

return listener['_abortController'];
}

public subscribeOnce(
listener: SubscribeTopicListener,
options: SubscribeOptions = {}
) {
): SubscribeAbortController {
options.subscribeOnce = true;
this.subscribe(listener, options);
return this.subscribe(listener, options);
}

public publish(
Expand Down
11 changes: 9 additions & 2 deletions src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,21 @@ export type SubscribeTopicListener = (
responder?: IResponder
) => void | Promise<void>;

export type SubscribeAbortController = {
abort: () => void;
};

export interface IEventBus<T> {
addWorker(worker: T);
start(err?: Error): Promise<void>;
subscribe(callback: SubscribeTopicListener, options?: SubscribeOptions): void;
subscribe(
callback: SubscribeTopicListener,
options?: SubscribeOptions
): SubscribeAbortController;
subscribeOnce(
callback: SubscribeTopicListener,
options?: SubscribeOptions
): void;
): SubscribeAbortController;
publishAsync<ResData>(
data: unknown,
publishOptions?: PublishOptions
Expand Down
32 changes: 32 additions & 0 deletions test/cp.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ describe('/test/cp.test.ts', function () {
await bus.stop();
});

it('test base subscribe with abort', async () => {
const bus = new ChildProcessEventBus();
const worker = createChildProcessWorker(join(__dirname, 'cp/base.ts'));
bus.addWorker(worker);
await bus.start();

const result = await new Promise(resolve => {
const abortController = bus.subscribe(message => {
resolve(message.body);
});

abortController.abort();

bus.publish({
data: {
name: 'test',
}
}, {
topic: 'target'
});

setTimeout(() => {
resolve(null);
}, 1000);
});

expect(result).toEqual(null);

worker.kill();
await bus.stop();
});

it('test publish with async', async () => {
const bus = new ChildProcessEventBus();
const worker = createChildProcessWorker(join(__dirname, 'cp/publish_async.ts'));
Expand Down
35 changes: 34 additions & 1 deletion test/local.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('/test/local.test.ts', function () {
await bus.stop();
});

it('test base publish and subscribe', async () => {
it.only('test base publish and subscribe', async () => {
const bus = new LocalEventBus({
isWorker: false,
});
Expand All @@ -49,6 +49,39 @@ describe('/test/local.test.ts', function () {
await bus.stop();
});

it.only('test base subscribe and abort', async () => {
const bus = new LocalEventBus({
isWorker: false,
});
createLocalWorker(join(__dirname, 'local/base_abort.ts'));
await bus.start();

const result = await new Promise(resolve => {
const abortController = bus.subscribe(message => {
resolve(message.body);
});

abortController.abort();

bus.publish({
data: {
name: 'test',
}
},
{
topic: 'target',
});

setTimeout(() => {
resolve(null);
}, 1000);
});

expect(result).toEqual(null);

await bus.stop();
});

it('test publish with async', async () => {
const bus = new LocalEventBus({
isWorker: false,
Expand Down
24 changes: 24 additions & 0 deletions test/local/base_abort.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { LocalEventBus } from '../../src/index';

export async function createWorker() {
const bus = new LocalEventBus({
isWorker: true,
});

bus.subscribe(message=>{
console.log(message);

bus.publish({
data: 'hello world'
});
},
{
topic: 'target',
});

await bus.start();
}

createWorker().then(() => {
console.log('ready');
});
34 changes: 34 additions & 0 deletions test/thread.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,40 @@ describe('/test/thread.test.ts', function () {
await bus.stop();
});

it('test base subscribe and abort', async () => {
const bus = new ThreadEventBus();
const worker = createThreadWorker(join(__dirname, 'worker/base.ts'));
bus.addWorker(worker);
await bus.start();

const result = await new Promise(resolve => {
const abortController = bus.subscribe(message => {
resolve(message.body);
});

abortController.abort();

bus.publish({
data: {
name: 'test',
}
},
{
topic: 'target',
});

setTimeout(() => {
resolve(null);
}, 1000);

});

expect(result).toEqual(null);

await worker.terminate();
await bus.stop();
});

it('test publish with async', async () => {
const bus = new ThreadEventBus();
const worker = createThreadWorker(join(__dirname, 'worker/publish_async.ts'));
Expand Down

0 comments on commit c0a8614

Please sign in to comment.