Skip to content

Commit

Permalink
add ExecSubject and generic executor
Browse files Browse the repository at this point in the history
  • Loading branch information
nairinarinyan committed Aug 29, 2019
1 parent 346eec9 commit 297ff8a
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
node_modules
build
notes.md
examples
*.js
*.d.ts
*.tgz
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "reactive-blocs",
"version": "0.0.11",
"version": "0.0.20",
"description": "Reactive Blocs",
"main": "index.js",
"scripts": {
Expand All @@ -22,13 +22,13 @@
},
"homepage": "https://github.com/nairinarinyan/reactive-blocs#readme",
"peerDependencies": {
"rxjs": "^6.5.1",
"rxjs": "^6.5.2",
"react": "^16.8.6"
},
"devDependencies": {
"@types/react": "^16.8.14",
"react": "^16.8.6",
"rxjs": "^6.5.1",
"rxjs": "^6.5.2",
"typescript": "^3.4.2"
}
}
44 changes: 42 additions & 2 deletions src/executors.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Observable, BehaviorSubject, of } from 'rxjs';
import { distinctUntilChanged } from 'rxjs/operators';
import { Observable, BehaviorSubject, of, Subject, merge, Subscriber, Subscription, SubscriptionLike, throwError } from 'rxjs';
import { distinctUntilChanged, mergeMap, catchError, share, switchMap, filter, tap } from 'rxjs/operators';

export type Executor<T> = (...execArgs: any[]) => Observable<T>;
export type ExecPerformer = (...execArgs: any[]) => void;
Expand Down Expand Up @@ -43,3 +43,43 @@ export function execControlled<T>(exec: Executor<T>, control: Control, initialVa

return subject;
}

export class ExecSubject<T> extends BehaviorSubject<T> {
private _executionStream = new Subject<any>();

constructor(
private _executor: Executor<T>,
_initialValue?: T,
private _error ?: BehaviorSubject<Error>,
) {
super(_initialValue);
this.init();
}

private init() {
this._executionStream.pipe(
switchMap(args => this.performExec(this._executor, args)),
tap(error => {
if (error instanceof Error && this._error) {
this._error.next(error);
}
}),
filter(result => !(result instanceof Error)),
share(),
).subscribe(val => this.next(val as T));
}

private performExec<T>(exec: Executor<T>, args: any) {
return exec(args).pipe(
catchError(err => of(new Error(err)))
);
};

exec(args: any) {
this._executionStream.next(args);
}
}

export function exec<T>(exec: Executor<T>, initialValue?: T, error?: BehaviorSubject<Error>): ExecSubject<T> {
return new ExecSubject<T>(exec, initialValue, error);
}

0 comments on commit 297ff8a

Please sign in to comment.