diff --git a/package.json b/package.json index 4e4b77d..569770f 100644 --- a/package.json +++ b/package.json @@ -9,17 +9,17 @@ "cleanup": "find ./src -type f -name '*.js' -delete && find ./src -type f -name '*.map' -delete", "coverage": "nyc npm test && nyc report --reporter=text-lcov | coveralls", "hydra": "node --trace-hydrogen --trace-phase=Z --trace-deopt --code-comments --hydrogen-track-positions --redirect-code-traces --redirect-code-traces-to=code.asm .dist/benchmarks/run", - "prepublish": "tsc -d && npm run build && npm run build-min", + "prepublish": "tsc -d && npm run build && npm run build:min", "rfc": "node chore/rfc", "semantic-release": "semantic-release pre && npm publish && semantic-release post", "test": "tsc -d && ava .dist/test", "test:watch": "ava .dist/test --watch --source .dist", - "build": "npm run build-es && npm run build-cjs", - "build-min": "npm run build-es-min && npm run build-cjs-min", - "build-es": "rollup -c", - "build-es-min": "babel .dist/main-es.js > .dist/main-es.min.js", - "build-cjs": "browserify .dist/src/main.js > .dist/main-cjs.js", - "build-cjs-min": "babel .dist/main-cjs.js > .dist/main-cjs.min.js" + "build": "npm run build:es && npm run build:cjs", + "build:min": "npm run build:es:min && npm run build:cjs:min", + "build:es": "rollup -c", + "build:es:min": "babel .dist/main-es.js > .dist/main-es.min.js", + "build:cjs": "browserify .dist/src/main.js > .dist/main-cjs.js", + "build:cjs:min": "babel .dist/main-cjs.js > .dist/main-cjs.min.js" }, "author": "", "license": "ISC", diff --git a/src/lib/CounterSubscription.ts b/src/lib/CounterSubscription.ts deleted file mode 100644 index 0aa0fdf..0000000 --- a/src/lib/CounterSubscription.ts +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Created by tushar on 29/01/17. - */ -import {Subscription} from '../types/core/Subscription' -import {Observer} from '../types/core/Observer' - -export abstract class CounterSubscription implements Subscription { - private count = 0 - abstract observer: Observer - abstract subscription: Subscription - - protected onFrame = () => { - this.observer.next(this.count++) - } - - unsubscribe (): void { - this.subscription.unsubscribe() - } - - get closed () { - return this.subscription.closed - } -} diff --git a/src/lib/DefaultScheduler.ts b/src/lib/DefaultScheduler.ts index 8cf8deb..41d1755 100644 --- a/src/lib/DefaultScheduler.ts +++ b/src/lib/DefaultScheduler.ts @@ -6,11 +6,13 @@ import {Subscription} from '../types/core/Subscription' import {ITask} from '../types/ITask' import {IScheduledTask} from '../types/IScheduledTask' +type RicOptions = {timeout: number} +declare function requestIdleCallback (fn: () => void, options?: RicOptions): number +declare function cancelIdleCallback (id: number): void + function run (task: IScheduledTask) { return task.run() } - - class AnimationFrame implements IScheduledTask { closed = false private id: number @@ -18,35 +20,13 @@ class AnimationFrame implements IScheduledTask { constructor (private task: ITask) { } - run () { - this.id = requestAnimationFrame(() => { - this.closed = true - this.task() - }) - return this - } - - unsubscribe (): void { - if (this.closed) return - cancelAnimationFrame(this.id) + onFrame () { this.closed = true - } -} - -class AnimationFrames implements IScheduledTask { - closed = false - private id: number - - constructor (private task: ITask) { - } - - onFrame = () => { this.task() - if (!this.closed) this.run() } run () { - this.id = requestAnimationFrame(this.onFrame) + this.id = requestAnimationFrame(this.onFrame.bind(this)) return this } @@ -56,7 +36,6 @@ class AnimationFrames implements IScheduledTask { this.closed = true } } - class Interval implements IScheduledTask { closed = false private id: any @@ -66,7 +45,7 @@ class Interval implements IScheduledTask { } run () { - this.id = setInterval(() => this.task(), this.interval) + this.id = setInterval(this.task, this.interval) return this } @@ -75,7 +54,6 @@ class Interval implements IScheduledTask { this.closed = true } } - class Timeout implements IScheduledTask { closed = false private timer: any @@ -83,22 +61,71 @@ class Timeout implements IScheduledTask { constructor (private task: ITask, private timeout: number) { } + private onTimeout () { + this.task() + this.closed = true + } + run () { - this.timer = setTimeout(() => { + this.timer = setTimeout(this.onTimeout.bind(this), this.timeout) + return this + } + + unsubscribe (): void { + if (this.closed === false) { + clearTimeout(this.timer) this.closed = true - this.task() - }, this.timeout) + } + } +} +class RequestIdleCallback implements IScheduledTask { + closed: boolean + private id: number + + constructor (private task: ITask, private options?: RicOptions) { + } + + run (): IScheduledTask { + this.id = requestIdleCallback(this.task, this.options) return this } unsubscribe (): void { if (this.closed) return - clearTimeout(this.timer) - this.closed = true + this.closed = false + cancelIdleCallback(this.id) } } +class NextTick implements IScheduledTask { + closed: boolean + + constructor (private task: ITask) { + } + + onTick (i: NextTick) { + if (i.closed) return + i.task() + } + run (): IScheduledTask { + process.nextTick(this.onTick, this) + return this + } + + unsubscribe (): void { + if (this.closed) return + this.closed = false + } +} class DefaultScheduler implements Scheduler { + requestIdleCallback (task: ITask, options?: RicOptions): Subscription { + return run(new RequestIdleCallback(task, options)) + } + + nextTick (task: ITask): Subscription { + return run(new NextTick(task)) + } + setInterval (task: ITask, interval: number): Subscription { return run(new Interval(task, interval)) } @@ -111,13 +138,8 @@ class DefaultScheduler implements Scheduler { return run(new AnimationFrame(task)) } - requestAnimationFrames (task: ITask): Subscription { - return run(new AnimationFrames(task)) - } - now (): number { return Date.now() } } - export const createScheduler = (): Scheduler => new DefaultScheduler() diff --git a/src/lib/ScheduleAsap.ts b/src/lib/ScheduleAsap.ts new file mode 100644 index 0000000..a63c396 --- /dev/null +++ b/src/lib/ScheduleAsap.ts @@ -0,0 +1,23 @@ +import {Scheduler} from '../types/Scheduler' +import {ITask} from '../types/ITask' +import {Subscription} from '../types/core/Subscription' +/** + * Created by tushar on 05/02/17. + */ + +interface Global { + requestIdleCallback?: Function + process?: {nextTick: Function} + setTimeout: Function +} + +function getGlobal (): Global { + return typeof window === 'object' ? window : global +} + +export function asap (sh: Scheduler, task: ITask): Subscription { + const global = getGlobal() + if (global.requestIdleCallback) return sh.requestIdleCallback(task) + if (global.process) return sh.nextTick(task) + return sh.setTimeout(task, 1) +} \ No newline at end of file diff --git a/src/operators/Delay.ts b/src/operators/Delay.ts index c65155b..dfc19ab 100644 --- a/src/operators/Delay.ts +++ b/src/operators/Delay.ts @@ -7,6 +7,7 @@ import {Scheduler} from '../types/Scheduler' import {Subscription} from '../types/core/Subscription' import {safeObserver} from '../lib/SafeObserver' import {CompositeSubscription} from '../lib/CompositeSubscription' +import {Curry} from '../lib/Curry' class DelayObserver implements Observer { constructor (private timeout: number, @@ -15,10 +16,6 @@ class DelayObserver implements Observer { private cSub: CompositeSubscription) { } - private completeDelayed = () => { - this.sink.complete() - } - next (val: T): void { const node = this.cSub.add(this.scheduler.setTimeout(() => { this.sink.next(val) @@ -31,7 +28,7 @@ class DelayObserver implements Observer { } complete (): void { - this.cSub.add(this.scheduler.setTimeout(this.completeDelayed, this.timeout)) + this.cSub.add(this.scheduler.setTimeout(this.sink.complete.bind(this.sink), this.timeout)) } } @@ -47,4 +44,6 @@ class DelayObservable implements Observable { } } -export const delay = (timeout: number, source: Observable): Observable => new DelayObservable(timeout, source) \ No newline at end of file +export const delay = Curry( (timeout: number, source: Observable): Observable => new DelayObservable(timeout, source)) as Function & + { (timeout: number, source: Observable): Observable} & + { (timeout: number): {(source: Observable): Observable}} diff --git a/src/operators/Reduce.ts b/src/operators/Reduce.ts index 84627b0..1969eac 100644 --- a/src/operators/Reduce.ts +++ b/src/operators/Reduce.ts @@ -10,7 +10,7 @@ import {Scheduler} from '../types/Scheduler' import {Curry} from '../lib/Curry' -export type TReducer = {(previousValue: R, currentValue: T): R} +export type TReducer = {(memory: R, current: T): R} export type TSeed = R export type TSource = Observable export type TResult = Observable diff --git a/src/operators/Scan.ts b/src/operators/Scan.ts index 26a1c55..708fc13 100644 --- a/src/operators/Scan.ts +++ b/src/operators/Scan.ts @@ -1,15 +1,13 @@ /** * Created by tushar.mathur on 09/10/16. */ - - import {Observable} from '../types/core/Observable' import {Observer} from '../types/core/Observer' import {Scheduler} from '../types/Scheduler' import {Subscription} from '../types/core/Subscription' import {Curry} from '../lib/Curry' -export type TReducer = (current: T, memory: R) => R +export type TReducer = (memory: R, current: T) => R export type TSeed = R export type TSource = Observable export type TResult = Observable @@ -22,7 +20,7 @@ class ScanObserver implements Observer { } next (val: T): void { - this.value = this.reducer(val, this.value) + this.value = this.reducer(this.value, val) this.sink.next(this.value) } @@ -52,4 +50,4 @@ export const scan = Curry(function (reducer: TReducer, value: V, so {(reducer: TReducer, seed: TSeed, source: TSource): TResult} & {(reducer: TReducer): {(seed: TSeed, source: TSource): TResult}} & {(reducer: TReducer, seed: TSeed): {(source: TSource): TResult}} & - {(reducer: TReducer): { (seed: TSeed): { (source: TSource): TResult } } } + {(reducer: TReducer): {(seed: TSeed): {(source: TSource): TResult}}} diff --git a/src/operators/Switch.ts b/src/operators/Switch.ts index e03160e..87a5919 100644 --- a/src/operators/Switch.ts +++ b/src/operators/Switch.ts @@ -27,10 +27,12 @@ class SwitchValueObserver implements Observer { class SwitchObserver implements Observer> { private currentSub: LinkedListNode | undefined = void 0 + private sink: SwitchValueObserver - constructor (private sink: Observer, + constructor (private mainSink: Observer, private cSub: CompositeSubscription, private scheduler: Scheduler) { + this.sink = new SwitchValueObserver(mainSink) } private removeCurrentSub () { @@ -43,7 +45,7 @@ class SwitchObserver implements Observer> { } next (val: Observable): void { - this.setCurrentSub(val.subscribe(new SwitchValueObserver(this.sink), this.scheduler)) + this.setCurrentSub(val.subscribe(this.sink, this.scheduler)) } error (err: Error): void { @@ -52,7 +54,7 @@ class SwitchObserver implements Observer> { complete (): void { this.removeCurrentSub() - this.sink.complete() + this.mainSink.complete() } } diff --git a/src/sources/Frames.ts b/src/sources/Frames.ts index cbb6ebd..b78b26e 100644 --- a/src/sources/Frames.ts +++ b/src/sources/Frames.ts @@ -6,24 +6,36 @@ import {Observer} from '../types/core/Observer' import {Scheduler} from '../types/Scheduler' import {Subscription} from '../types/core/Subscription' import {safeObserver} from '../lib/SafeObserver' -import {CounterSubscription} from '../lib/CounterSubscription' -class RAFSubscription extends CounterSubscription { +class RAFSubscription implements Subscription { observer: Observer subscription: Subscription + closed = false - constructor (sink: Observer, scheduler: Scheduler) { - super() - this.subscription = scheduler.requestAnimationFrames(this.onFrame) - this.observer = safeObserver(sink) + constructor (private sink: Observer, private scheduler: Scheduler) { + this.schedule() + } + + private schedule () { + this.subscription = this.scheduler.requestAnimationFrame(this.onFrame) + } + + onFrame = () => { + if (this.closed) return + this.sink.next(undefined) + this.schedule() + } + + unsubscribe (): void { + this.closed = true } } -class FrameObservable implements Observable { - subscribe (observer: Observer, scheduler: Scheduler): Subscription { - return new RAFSubscription(observer, scheduler) +class FrameObservable implements Observable { + subscribe (observer: Observer, scheduler: Scheduler): Subscription { + return new RAFSubscription(safeObserver(observer), scheduler) } } -export function frames (): Observable { +export function frames (): Observable { return new FrameObservable() } \ No newline at end of file diff --git a/src/sources/FromArray.ts b/src/sources/FromArray.ts index 18450b1..43ce9bf 100644 --- a/src/sources/FromArray.ts +++ b/src/sources/FromArray.ts @@ -6,13 +6,14 @@ import {Subscription} from '../types/core/Subscription' import {Observer} from '../types/core/Observer' import {Scheduler} from '../types/Scheduler' import {toSafeFunction} from '../lib/ToSafeFunction' +import {asap} from '../lib/ScheduleAsap' class FromArraySubscription implements Subscription { private subscription: Subscription closed = false constructor (private array: Array, private sink: Observer, scheduler: Scheduler) { - this.subscription = scheduler.setTimeout(this.executeSafely.bind(this), 0) + this.subscription = asap(scheduler, this.executeSafely.bind(this)) } diff --git a/src/sources/FromDOM.ts b/src/sources/FromDOM.ts index a5d61cc..ff6ea5f 100644 --- a/src/sources/FromDOM.ts +++ b/src/sources/FromDOM.ts @@ -3,7 +3,6 @@ */ import {Observable} from '../types/core/Observable' import {Observer} from '../types/core/Observer' -import {Scheduler} from '../types/Scheduler' import {Subscription} from '../types/core/Subscription' import {IListener} from '../types/IListener' import {Curry} from '../lib/Curry' @@ -25,8 +24,8 @@ class DOMObservable implements TResult { constructor (private name: string, private element: HTMLElement) { } - subscribe (observer: Observer, scheduler: Scheduler): Subscription { - const listener = (e: Event) => observer.next(e) + subscribe (observer: Observer): Subscription { + const listener = observer.next.bind(observer) this.element.addEventListener(this.name, listener) return new DOMSubscription(this.element, listener, this.name) } diff --git a/src/sources/Interval.ts b/src/sources/Interval.ts index d7e580a..40ee4c4 100644 --- a/src/sources/Interval.ts +++ b/src/sources/Interval.ts @@ -6,28 +6,35 @@ import {Subscription} from '../types/core/Subscription' import {Observer} from '../types/core/Observer' import {Scheduler} from '../types/Scheduler' import {safeObserver} from '../lib/SafeObserver' -import {CounterSubscription} from '../lib/CounterSubscription' -class TimerSubscription extends CounterSubscription { - observer: Observer +class TimerSubscription implements Subscription { + closed = false subscription: Subscription - constructor (sink: Observer, scheduler: Scheduler, interval: number) { - super() - this.subscription = scheduler.setInterval(this.onFrame, interval) - this.observer = safeObserver(sink) + + constructor (private sink: Observer, scheduler: Scheduler, interval: number) { + this.subscription = scheduler.setInterval(this.onFrame.bind(this), interval) + } + + onFrame () { + this.sink.next(undefined) + } + + unsubscribe (): void { + this.closed = true + this.subscription.unsubscribe() } } -class IntervalObservable implements Observable { +class IntervalObservable implements Observable { constructor (private interval: number) { } - subscribe (observer: Observer, scheduler: Scheduler): Subscription { - return new TimerSubscription(observer, scheduler, this.interval) + subscribe (observer: Observer, scheduler: Scheduler): Subscription { + return new TimerSubscription(safeObserver(observer), scheduler, this.interval) } } -export function interval (interval: number): Observable { +export function interval (interval: number): Observable { return new IntervalObservable(interval) } diff --git a/src/testing/TestScheduler.ts b/src/testing/TestScheduler.ts index 177e5e2..d28568b 100644 --- a/src/testing/TestScheduler.ts +++ b/src/testing/TestScheduler.ts @@ -70,22 +70,6 @@ export class TestScheduler implements Scheduler { return this.setTimeout(task, this.now() + this.rafTimeout, 0) } - requestAnimationFrames (task: ITask): Subscription { - var closed = false - const repeatedTask = () => { - if (closed) return - task() - this.requestAnimationFrame(repeatedTask) - } - this.requestAnimationFrame(repeatedTask) - return { - closed, - unsubscribe () { - closed = true - } - } - } - setInterval (task: ITask, interval: number): Subscription { var closed = false const repeatedTask = () => { @@ -102,6 +86,14 @@ export class TestScheduler implements Scheduler { } } + requestIdleCallback (task: ITask, options: {timeout: number}): Subscription { + return this.setTimeout(task, options.timeout > 0 ? options.timeout : 50) + } + + nextTick (task: ITask): Subscription { + return this.setTimeout(task, 1) + } + private run () { this.queue.forEach(node => { const qItem = node.value diff --git a/src/types/Scheduler.ts b/src/types/Scheduler.ts index 6a80076..0942b51 100644 --- a/src/types/Scheduler.ts +++ b/src/types/Scheduler.ts @@ -9,6 +9,7 @@ export interface Scheduler { setTimeout(task: ITask, relativeTime: number): Subscription setInterval(task: ITask, interval: number): Subscription requestAnimationFrame(task: ITask): Subscription - requestAnimationFrames(task: ITask): Subscription + requestIdleCallback(task: ITask, options?: {timeout: number}): Subscription + nextTick(task: ITask): Subscription now(): number } diff --git a/test/test.Frames.ts b/test/test.Frames.ts index e764967..2bebfb8 100644 --- a/test/test.Frames.ts +++ b/test/test.Frames.ts @@ -5,9 +5,10 @@ import {test} from 'ava' import {frames} from '../src/main' import {TestScheduler} from '../src/testing/TestScheduler' import {toMarble} from '../src/testing/Marble' +import {scan} from '../src/operators/Scan' test(t => { const sh = TestScheduler.of(10) - const {results} = sh.start(() => frames(), 10, 50) - t.is(toMarble(results, 10), '-012') + const {results} = sh.start(() => scan((i) => i + 1, -1, frames()), 200, 250) + t.is(toMarble(results), '-0123') }) diff --git a/test/test.FromArray.ts b/test/test.FromArray.ts index 34af379..c906f57 100644 --- a/test/test.FromArray.ts +++ b/test/test.FromArray.ts @@ -17,7 +17,7 @@ test(t => { const testFunction = (x: any) => x === 2 ? throwError(ERROR_MESSAGE) : x * 100 const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3]))) t.deepEqual(results, [ - next(200, 100), - error(200, new Error(ERROR_MESSAGE)) + next(201, 100), + error(201, new Error(ERROR_MESSAGE)) ]) }) diff --git a/test/test.IntervalObservable.ts b/test/test.IntervalObservable.ts index e9f7169..523f260 100644 --- a/test/test.IntervalObservable.ts +++ b/test/test.IntervalObservable.ts @@ -10,12 +10,13 @@ import {ReactiveEvents, EventError} from '../src/testing/ReactiveEvents' import {interval} from '../src/sources/Interval' import {toMarble} from '../src/testing/Marble' import {thrower, ERROR_MESSAGE} from '../src/testing/Thrower' +import {scan} from '../src/operators/Scan' const {error} = ReactiveEvents test('subscribe()', t => { const sh = TestScheduler.of() - const {results} = sh.start(() => interval(10), 20, 70) - t.is(toMarble(results, 20), '-0123') + const {results} = sh.start(() => scan(i => i + 1, -1, interval(10)), 200, 250) + t.is(toMarble(results), '-0123') }) test('ERROR!', t => {