Skip to content

Commit

Permalink
Merge pull request #54 from tusharmath/raf-throttle
Browse files Browse the repository at this point in the history
Raf throttle
  • Loading branch information
tusharmath authored Nov 2, 2016
2 parents 2b21ec8 + 2c7bfe9 commit a9a2888
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export {merge} from './operators/Merge'
export {multicast} from './operators/Multicast'
export {Observable} from './lib/Observable'
export {Observer} from './lib/Observer'
export {rafThrottle} from './operators/RafThrottle'
export {reduce} from './operators/Reduce'
export {sample} from './operators/Sample'
export {scan} from './operators/Scan'
Expand Down
60 changes: 60 additions & 0 deletions src/operators/RafThrottle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Created by tushar.mathur on 02/11/16.
*/

import {IObservable} from '../types/core/IObservable'
import {ISubscription} from '../types/core/ISubscription'
import {IScheduler} from '../types/IScheduler'
import {IObserver} from '../types/core/IObserver'
import {CompositeSubscription} from '../lib/CompositeSubscription'
import {LinkedListNode} from '../lib/LinkedList'

export class RafObserver<T> implements IObserver<T> {
private completed = false
private queue: LinkedListNode<ISubscription>
private canFlush = true

constructor (private sink: IObserver<T>,
private scheduler: IScheduler,
private cSub: CompositeSubscription) {
this.flush = this.flush.bind(this)
}

next (val: T): void {
if (this.canFlush) {
this.canFlush = false
this.sink.next(val)
this.queue = this.cSub.add(this.scheduler.requestAnimationFrame(this.flush))
}
}

error (err: Error): void {
this.sink.error(err)
}

complete (): void {
this.completed = true
this.sink.complete()
this.cSub.remove(this.queue)
}

private flush () {
this.canFlush = true
this.cSub.remove(this.queue)
}
}

export class RafThrottle<T> implements IObservable<T> {
constructor (private source: IObservable<T>) {
}

subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
const cSub = new CompositeSubscription()
cSub.add(this.source.subscribe(new RafObserver(observer, scheduler, cSub), scheduler))
return cSub
}
}

export function rafThrottle<T> (source: IObservable<T>) {
return new RafThrottle(source)
}
16 changes: 16 additions & 0 deletions test/test.RafThrottle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
/**
* Created by tushar.mathur on 02/11/16.
*/

import test from 'ava'
import {rafThrottle} from '../src/operators/RafThrottle'
import {TestScheduler} from '../src/testing/TestScheduler'
import {marble, toMarble} from '../src/testing/Marble'

test(t => {
const sh = TestScheduler.of()
const message = 'ABCDEFGH|'
const source$ = sh.Hot(marble(message))
const {results} = sh.start(() => rafThrottle(source$))
t.is(toMarble(results), '-B-D-F-H|')
})

0 comments on commit a9a2888

Please sign in to comment.