Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch from most to @most/core #3

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
},
"license": "MIT",
"peerDependencies": {
"most": "*"
"@most/core": "*"
},
"devDependencies": {
"babel-cli": "^6.7.5",
Expand All @@ -36,8 +36,7 @@
"eslint": "^3.19.0",
"mkdirp": "^0.5.1",
"mocha": "^3.3.0",
"most": "*",
"most-test": "^1.3.0",
"@most/core": "*",
"power-assert": "^1.4.1",
"rimraf": "^2.6.1"
}
Expand Down
7 changes: 4 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
If not supplied, `f` defaults to an identity function, i.e. x => x
*/

const {newStream} = require('@most/core');
import DispatchSource from './source';

function dispatch(f, stream) {
Expand All @@ -13,9 +14,9 @@ function dispatch(f, stream) {
return stream;
}
const source = new DispatchSource(stream, f);
const newStream = new stream.constructor(source);
newStream.select = key => source.select(key);
return newStream;
const streamNew = newStream(source.run.bind(source));
streamNew.select = key => source.select(key);
return streamNew;
};
return stream ? dispatcher(stream) : dispatcher;
}
Expand Down
17 changes: 17 additions & 0 deletions src/pipe.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export class Pipe {
constructor(sink) {
this.sink = sink;
}

event(t, x) {
return this.sink.event(t, x);
}

end(t) {
return this.sink.end(t);
}

error(t, e) {
return this.sink.error(t, e);
}
}
8 changes: 5 additions & 3 deletions src/source.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import Pipe from 'most/lib/sink/Pipe';
const {newStream} = require('@most/core');

import {Pipe} from './pipe';
import {DispatchDisposable, emptyDisposable, dispose} from './dispose';
import {Store} from './store';
import {tryEvent, tryEnd} from './try';
Expand Down Expand Up @@ -41,12 +43,12 @@ export default class DispatchSource {

select(key, initial) {
const source = new TargetSource(this, key);
return new this.stream.constructor(source);
return newStream(source.run.bind(source));
}

add(sink, scheduler, key) {
if(this._store.add(key, sink)) {
this._disposable = this.stream.source.run(this, scheduler);
this._disposable = this.stream.run(this, scheduler);
}
return new DispatchDisposable(this, sink, key);
}
Expand Down
40 changes: 40 additions & 0 deletions test/helpers/reduce.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/** @license MIT License (c) copyright 2010-2016 original author or authors */
/** @author Brian Cavalier */
/** @author John Hann */

const {run} = require('@most/core');
// import { tap } from '../../src/combinator/transform'
import {newDefaultScheduler} from '@most/scheduler';

/**
* Reduce a stream to produce a single result. Note that reducing an infinite
* stream will return a Promise that never fulfills, but that may reject if an error
* occurs.
* @param {function(result:*, x:*):*} f reducer function
* @param {*} initial initial value
* @param {Stream} stream to reduce
* @returns {Promise} promise for the final result of the reduce
*/
export function reduce(f, initial, stream) {
return new Promise((resolve, reject) => {
run(new ReduceSink(f, initial, resolve, reject), newDefaultScheduler(), stream);
});
}

class ReduceSink {
constructor(f, value, resolve, reject) {
this.f = f;
this.value = value;
this.resolve = resolve;
this.reject = reject;
}
event(t, x) {
this.value = this.f(this.value, x);
}
error(t, e) {
this.reject(e);
}
end(t) {
this.resolve(this.value);
}
}
21 changes: 21 additions & 0 deletions test/helpers/stream-helper.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {eq} from '@briancavalier/assert';
import {reduce} from './reduce';

export function assertSame(s1, s2) {
return Promise.all([toArray(s1), toArray(s2)]).then(arrayEquals);
}

export function expectArray(array, s) {
return toArray(s).then(eq(array));
}

function toArray(s) {
return reduce(function(a, x) {
a.push(x);
return a;
}, [], s);
}

function arrayEquals(ss) {
eq(ss[0], ss[1]);
}
30 changes: 21 additions & 9 deletions test/test-dispatch.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
const assert = require('power-assert');
const {dispatch} = require('../src');
const {Stream, empty, from, filter, merge, loop, map, join, reduce} = require('most');
const {Empty, empty, withItems, periodic, filter, merge, loop, map, join} = require('@most/core');
const {reduce} = require('./helpers/reduce');

function from(inputs) {
return withItems(inputs)(periodic(0));
}

describe('[most-dispatch]', () => {
describe('dispatch()', () => {
Expand All @@ -16,16 +21,24 @@ describe('[most-dispatch]', () => {
const a$ = empty();
const b$ = d(a$);

assert(b$ instanceof Stream);
assert("run" in b$);
});

const arraySink = () => {
const result = [];
return {
error: (t, err) => undefined,
event: (t, evt) => console.log(evt) && result.push(evt),
end: (t) => undefined,
result
};
};
it('emits the input values combined as a tuple with a selector function', () => {
const d = dispatch(x => x);
const inputs = [3, 5, 7];
const a$ = from(inputs);
const b$ = d(a$);
return b$
.reduce((arr, x) => (arr.push(x), arr), [])
return reduce((arr, x) => (arr.push(x), arr), [], b$)
.then(values => {
assert(values.length === 3);
values.forEach((v, i) => {
Expand All @@ -50,11 +63,10 @@ describe('[most-dispatch]', () => {
];
const a$ = from(inputs);
const b$ = d(a$);
const c$ = b$.select(3).map(x => ({p: x.b}));
const d$ = b$.select(5).map(x => ({q: x.b}));
const c$ = map(x => ({p: x.b}), b$.select(3));
const d$ = map(x => ({q: x.b}), b$.select(5));
const e$ = merge(c$, d$);
return e$
.reduce((arr, x) => (arr.push(x), arr), [])
return reduce((arr, x) => (arr.push(x), arr), [], e$)
.then(values => {
assert.deepEqual(values, [
{p: 100},
Expand Down Expand Up @@ -105,4 +117,4 @@ describe('[most-dispatch]', () => {
});
});
});
});
});
58 changes: 31 additions & 27 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,41 @@
# yarn lockfile v1


"@most/hold@^2.0.0":
version "2.0.0"
resolved "https://registry.yarnpkg.com/@most/hold/-/hold-2.0.0.tgz#8e2328f01fcf5e08ff93343567e0d3dd50fcc2e2"
"@most/core@*":
version "1.5.0"
resolved "https://registry.yarnpkg.com/@most/core/-/core-1.5.0.tgz#b36b3620724e37ac3552659c1d71948958beb345"
integrity sha512-cI8KTkjaipRVQvf4vrwhqNtY7cTzF9YaI85TU6mHEgjRMeTSS9qgBv6z6wXUePgnbwf11uVgSERZ8PZRDPE2HA==
dependencies:
"@most/multicast" "^1.2.4"
"@most/disposable" "^1.2.2"
"@most/prelude" "^1.7.3"
"@most/scheduler" "^1.2.3"
"@most/types" "^1.0.2"

"@most/multicast@^1.2.4", "@most/multicast@^1.2.5":
version "1.2.5"
resolved "https://registry.yarnpkg.com/@most/multicast/-/multicast-1.2.5.tgz#ba5abc997f9a6511094bec117914f4959720a8fb"
"@most/disposable@^1.2.2":
version "1.2.2"
resolved "https://registry.yarnpkg.com/@most/disposable/-/disposable-1.2.2.tgz#1f9dfcc1b9d73165436eee34ddedc2e70d7be695"
integrity sha512-05u3obo0sDuh1PGw1BS6VEhgJTLkPMyGHOAvSJMSk2tArgBvhMvjayl8fU4DEKO7r+3aTg7F9Stnr1T61578yQ==
dependencies:
"@most/prelude" "^1.4.0"
"@most/prelude" "^1.7.3"
"@most/types" "^1.0.2"

"@most/prelude@^1.4.0":
version "1.6.0"
resolved "https://registry.yarnpkg.com/@most/prelude/-/prelude-1.6.0.tgz#4256e3a902ddf04c1f07afca2267526195072e13"
"@most/prelude@^1.7.3":
version "1.7.3"
resolved "https://registry.yarnpkg.com/@most/prelude/-/prelude-1.7.3.tgz#51db3f3ba3ed65431b6eea89ecb0a31826af640c"
integrity sha512-qWWEnA22UP1lzFfKx75XMut6DUUXGRKe7qv2k+Bgs7ju8lwb5RjsZYyQZ+VcsYvHcIavHKzseLlBMLOe2CvUZw==

"@most/scheduler@^1.2.3":
version "1.2.3"
resolved "https://registry.yarnpkg.com/@most/scheduler/-/scheduler-1.2.3.tgz#7cb97904a23bbfcb2664c60ea4fe047d7b5572f7"
integrity sha512-OpykYNwUIe7/InAs0ftSIkQULJajM6ghmYDeJ0yv9xcLmi5NyAtOOyUdIxlfqi7xHWRiD2iQuuJXHGyqFsKCsw==
dependencies:
"@most/prelude" "^1.7.3"
"@most/types" "^1.0.2"

"@most/types@^1.0.2":
version "1.0.2"
resolved "https://registry.yarnpkg.com/@most/types/-/types-1.0.2.tgz#a272c919a3dafe942bd02d63402f0548593af870"
integrity sha512-ZVkDwaiuGVTXywADeJ3aUBsD41UURldIljlsSTMdiWfhVEOY7dsY8iq+9wFUjRZYjhnpn9tLiOHaqyjbxm1prg==

abbrev@1:
version "1.1.0"
Expand Down Expand Up @@ -1436,18 +1456,6 @@ mocha@^3.3.0:
mkdirp "0.5.1"
supports-color "3.1.2"

most-test@^1.3.0:
version "1.3.0"
resolved "https://registry.yarnpkg.com/most-test/-/most-test-1.3.0.tgz#22b73c68329d487d78e11ad95451b38cf8a77c13"

most@*:
version "1.3.0"
resolved "https://registry.yarnpkg.com/most/-/most-1.3.0.tgz#148f96c311ce26cace63a179d10dd61dacee58f4"
dependencies:
"@most/multicast" "^1.2.5"
"@most/prelude" "^1.4.0"
symbol-observable "^1.0.2"

[email protected]:
version "0.7.1"
resolved "https://registry.yarnpkg.com/ms/-/ms-0.7.1.tgz#9cd13c03adbff25b65effde7ce864ee952017098"
Expand Down Expand Up @@ -2005,10 +2013,6 @@ supports-color@^2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/supports-color/-/supports-color-2.0.0.tgz#535d045ce6b6363fa40117084629995e9df324c7"

symbol-observable@^1.0.2:
version "1.0.4"
resolved "https://registry.yarnpkg.com/symbol-observable/-/symbol-observable-1.0.4.tgz#29bf615d4aa7121bdd898b22d4b3f9bc4e2aa03d"

table@^3.7.8:
version "3.8.3"
resolved "https://registry.yarnpkg.com/table/-/table-3.8.3.tgz#2bbc542f0fda9861a755d3947fefd8b3f513855f"
Expand Down