diff --git a/projects/observable-webworker/src/lib/from-worker-pool.spec.ts b/projects/observable-webworker/src/lib/from-worker-pool.spec.ts index 3bf840e..6903e19 100644 --- a/projects/observable-webworker/src/lib/from-worker-pool.spec.ts +++ b/projects/observable-webworker/src/lib/from-worker-pool.spec.ts @@ -60,6 +60,17 @@ describe('fromWorkerPool', () => { sub.unsubscribe(); }); + it('does not send input close notification to ensure the workers are kept alive', () => { + const subscriptionSpy = jasmine.createSpy('subscriptionSpy'); + const sub = stubbedWorkerStream.subscribe(subscriptionSpy); + + input$.next(1); + + expect(stubbedWorkers[0].postMessage).not.toHaveBeenCalledWith(jasmine.objectContaining({ kind: 'C' })); + + sub.unsubscribe(); + }); + it('shuts down workers when subscriber unsubscribes', () => { const subscriptionSpy = jasmine.createSpy('subscriptionSpy'); const sub = stubbedWorkerStream.subscribe(subscriptionSpy); diff --git a/projects/observable-webworker/src/lib/from-worker-pool.ts b/projects/observable-webworker/src/lib/from-worker-pool.ts index 93afcf3..cda116f 100644 --- a/projects/observable-webworker/src/lib/from-worker-pool.ts +++ b/projects/observable-webworker/src/lib/from-worker-pool.ts @@ -1,4 +1,4 @@ -import { Observable, ObservableInput, of, Subject, zip } from 'rxjs'; +import { concat, NEVER, Observable, ObservableInput, of, Subject, zip } from 'rxjs'; import { finalize, map, mergeAll, tap } from 'rxjs/operators'; import { fromWorker } from './from-worker'; @@ -69,7 +69,11 @@ export function fromWorkerPool( }), map( ([worker, unitWork]): Observable => { - return fromWorker(() => worker.factory(), of(unitWork), selectTransferables, { + // input should not complete to ensure the worker doesn't send back completion notifications when work unit is + // processed, otherwise these would cause the fromWorker to unsubscribe from the result. + const input$ = concat(of(unitWork), NEVER); + // const input$ = of(unitWork); + return fromWorker(() => worker.factory(), input$, selectTransferables, { terminateOnComplete: false, }).pipe( finalize(() => { diff --git a/projects/observable-webworker/src/lib/run-worker.spec.ts b/projects/observable-webworker/src/lib/run-worker.spec.ts index 13096a8..b17536b 100644 --- a/projects/observable-webworker/src/lib/run-worker.spec.ts +++ b/projects/observable-webworker/src/lib/run-worker.spec.ts @@ -1,5 +1,6 @@ import { fakeAsync, tick } from '@angular/core/testing'; import { BehaviorSubject, Notification, Observable, of } from 'rxjs'; +import { map } from 'rxjs/operators'; import { DoTransferableWork, DoTransferableWorkUnit, @@ -76,12 +77,6 @@ describe('runWorker', () => { }), ); - expect(postMessageSpy).toHaveBeenCalledWith( - jasmine.objectContaining({ - kind: 'C', - }), - ); - sub.unsubscribe(); }); @@ -128,13 +123,51 @@ describe('runWorker', () => { [expected.buffer] as any, ); + sub.unsubscribe(); + }); + + // https://github.com/cloudnc/observable-webworker/issues/116 + it('should complete the notification stream when the worker completes', () => { + const postMessageSpy = spyOn(window, 'postMessage'); + postMessageSpy.calls.reset(); + + class TestWorker implements DoWork { + public work(input$: Observable): Observable { + // here nothing should keep the subscription alive when input$ completes + return input$.pipe(map(input => input * 2)); + } + } + + const sub = runWorker(TestWorker); + + const notificationEvent: WorkerMessageNotification = new MessageEvent('message', { + data: new Notification('N', 10), + }); + + self.dispatchEvent(notificationEvent); + + expect(postMessageSpy).toHaveBeenCalledWith( + jasmine.objectContaining({ + kind: 'N', + value: 20, + }), + ); + + const completeEvent: WorkerMessageNotification = new MessageEvent('message', { + data: new Notification('C'), + }); + + self.dispatchEvent(completeEvent); + expect(postMessageSpy).toHaveBeenCalledWith( jasmine.objectContaining({ kind: 'C', }), ); - sub.unsubscribe(); + // do note here that instead of manually closing the subscription + // we check it's already closed as expected + expect(sub.closed).toBeTrue(); }); it('should not complete the notification stream if the worker does not complete', () => { @@ -197,12 +230,6 @@ describe('runWorker', () => { }), ); - expect(postMessageSpy).toHaveBeenCalledWith( - jasmine.objectContaining({ - kind: 'C', - }), - ); - sub.unsubscribe(); })); }); diff --git a/projects/observable-webworker/src/lib/run-worker.ts b/projects/observable-webworker/src/lib/run-worker.ts index 04e2352..ca89c3f 100644 --- a/projects/observable-webworker/src/lib/run-worker.ts +++ b/projects/observable-webworker/src/lib/run-worker.ts @@ -1,5 +1,5 @@ import { from, fromEvent, Notification, Observable, Subscription } from 'rxjs'; -import { concatMap, dematerialize, filter, map, materialize } from 'rxjs/operators'; +import { concatMap, dematerialize, map, materialize } from 'rxjs/operators'; import { DoTransferableWork, DoWork, DoWorkUnit, WorkerMessageNotification } from './observable-worker.types'; export type ObservableWorkerConstructor = new (...args: any[]) => DoWork | DoWorkUnit; @@ -25,15 +25,15 @@ export function getWorkerResult( incomingMessages$: Observable>, ): Observable> { const input$ = incomingMessages$.pipe( - map((e: WorkerMessageNotification): Notification => e.data), - map((n: Notification) => new Notification(n.kind, n.value, n.error)), - // ignore complete, the calling thread will manage termination of the stream - filter(n => n.kind !== 'C'), + map( + (e: WorkerMessageNotification): Notification => new Notification(e.data.kind, e.data.value, e.data.error), + ), dematerialize(), ); return workerIsUnitType(worker) - ? input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize()))) + ? // note we intentionally materialize the inner observable so the main thread can reassemble the multiple stream values per input observable + input$.pipe(concatMap(input => from(worker.workUnit(input)).pipe(materialize()))) : worker.work(input$).pipe(materialize()); } diff --git a/projects/observable-webworker/tsconfig.lib.json b/projects/observable-webworker/tsconfig.lib.json index 9c54d83..de12690 100644 --- a/projects/observable-webworker/tsconfig.lib.json +++ b/projects/observable-webworker/tsconfig.lib.json @@ -7,7 +7,8 @@ "inlineSources": true, "types": [], "lib": ["dom", "es2018"], - "strict": true + "strict": true, + "removeComments": true }, "angularCompilerOptions": { "skipTemplateCodegen": true,