Skip to content

Commit

Permalink
Added support for different transports
Browse files Browse the repository at this point in the history
Added experimental webworker transport
  • Loading branch information
davidwdan committed Oct 11, 2017
1 parent c667667 commit e318b24
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 54 deletions.
6 changes: 4 additions & 2 deletions index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
import {Promise} from "es6-shim";
export * from './src/Client';
import {Promise} from 'es6-shim';
export {WebWorkerTransport} from './src/Transport/WebWorkerTransport';
export {WebSocketTransport} from './src/Transport/WebSocketTransport';
export * from './src/Client';
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "thruway.js",
"version": "1.1.24",
"version": "1.2.3",
"description": "WAMP RxJS Client",
"main": "index.js",
"repository": {
Expand Down
39 changes: 26 additions & 13 deletions spec/client/client-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected);
Expand All @@ -45,8 +46,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected);
Expand All @@ -65,8 +67,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected, {d: resultMessage});
Expand All @@ -86,8 +89,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected, {d: resultMessage});
Expand All @@ -107,8 +111,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected, {d: resultMessage});
Expand All @@ -129,8 +134,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected, {d: resultMessage});
Expand All @@ -151,8 +157,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected, {d: resultMessage});
Expand All @@ -172,8 +179,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected, {d: resultMessage});
Expand All @@ -195,8 +203,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected);
Expand All @@ -221,8 +230,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri2');

expectObservable(call).toBe(expected);
Expand All @@ -249,8 +259,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call).toBe(expected, null, new WampErrorException('some.server.error'));
Expand All @@ -275,8 +286,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri1');

expectObservable(call, unsubscribe).toBe(expected);
Expand All @@ -302,8 +314,9 @@ describe('Client', () => {
});

const ws = Subject.create(observer, messages);
ws.onOpen = new Subject();

const client = new Client('ws://test', 'realm1', {}, ws);
const client = new Client(ws, 'realm1', {});
const call = client.call('testing.uri');

expectObservable(call, unsubscribe).toBe(expected, {d: resultMessage});
Expand Down
38 changes: 20 additions & 18 deletions src/Client.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {TransportInterface} from './Transport/TransportInterface';
import {WampChallengeException} from './Common/WampChallengeException';
import {WebSocketSubject} from './Subject/WebSocketSubject';
import {WebSocketTransport} from './Transport/WebSocketTransport';
import {RegisterObservable} from './Observable/RegisterObservable';
import {AuthenticateMessage} from './Messages/AuthenticateMessage';
import {WampErrorException} from './Common/WampErrorException';
Expand Down Expand Up @@ -39,13 +40,13 @@ import 'rxjs/add/operator/finally';
import 'rxjs/add/operator/exhaust';
import 'rxjs/add/operator/defaultIfEmpty';
import 'rxjs/add/operator/shareReplay';

import 'rxjs/add/observable/empty';
import 'rxjs/add/observable/from';
import 'rxjs/add/observable/timer';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/merge';


export class Client {
private messages: Observable<Message>;
private subscription: Subscription;
Expand Down Expand Up @@ -89,20 +90,20 @@ export class Client {
};
}

constructor(url: string, realm: string);
constructor(url: string, realm: string, options?: WampOptions, webSocket?: Subject<Message>);
constructor(private url: string,
constructor(private urlOrTransport: string | TransportInterface,
private realm: string,
private options: WampOptions = {},
private webSocket?: Subject<Message>) {
private transport?: TransportInterface) {

const open = new Subject();
const close = new Subject();
this.transport = typeof urlOrTransport === 'string'
? new WebSocketTransport(urlOrTransport)
: <TransportInterface>this.urlOrTransport;

this.subscription = new Subscription();
this.webSocket = webSocket || new WebSocketSubject(url, ['wamp.2.json'], open, close);

this.messages = this.webSocket
const open = this.transport.onOpen;

this.messages = this.transport
.retryWhen((attempts: Observable<Error>) => {
const maxRetryDelay = 300000;
const initialRetryDelay = 1500;
Expand Down Expand Up @@ -134,7 +135,7 @@ export class Client {
.map(_ => {
this.options.roles = Client.roles();
return new HelloMessage(this.realm, this.options);
}).subscribe(m => this.webSocket.next(m));
}).subscribe(m => this.transport.next(m));

const challengeMsg = this.messages
.filter((msg: Message) => msg instanceof ChallengeMessage)
Expand All @@ -152,18 +153,18 @@ export class Client {
}
return Observable.throw(error);
})
.do(m => this.webSocket.next(m));
.do(m => this.transport.next(m));

this.session = this.messages
.merge(challengeMsg)
.filter((msg: Message) => msg instanceof WelcomeMessage)
.shareReplay(1);

this.subscription.add(this.webSocket);
this.subscription.add(this.transport);
}

public topic(uri: string, options?: Object): Observable<any> {
return this.session.switchMapTo(new TopicObservable(uri, options, this.messages, this.webSocket));
return this.session.switchMapTo(new TopicObservable(uri, options, this.messages, this.transport));
}

public publish(uri: string, value: Observable<any> | any, options?: Object): Subscription {
Expand All @@ -177,24 +178,24 @@ export class Client {
}))
.exhaust()
.map(v => new PublishMessage(Utils.uniqueId(), options, uri, [v]))
.subscribe(this.webSocket);
.subscribe(this.transport);
}

public call(uri: string, args?: Array<any>, argskw?: Object, options?: {}): Observable<any> {
return this.session
.take(1)
.switchMapTo(new CallObservable(uri, this.messages, this.webSocket, args, argskw, options));
.switchMapTo(new CallObservable(uri, this.messages, this.transport, args, argskw, options));
}

public register(uri: string, callback: Function, options?: {}): Observable<any> {
return this.session.switchMapTo(new RegisterObservable(uri, callback, this.messages, this.webSocket, options));
return this.session.switchMapTo(new RegisterObservable(uri, callback, this.messages, this.transport, options));
}

public progressiveCall(uri: string, args?: Array<any>, argskw?: Object, options: { receive_progress? } = {}): Observable<any> {

options.receive_progress = true;
const completed = new Subject();
const callObs = new CallObservable(uri, this.messages, this.webSocket, args, argskw, options);
const callObs = new CallObservable(uri, this.messages, this.transport, args, argskw, options);
let retry = false;
return this.session
.takeUntil(completed)
Expand Down Expand Up @@ -238,5 +239,6 @@ export class Client {
export interface WampOptions {
authmethods?: Array<string>;
roles?: any;

[propName: string]: any;
}
8 changes: 8 additions & 0 deletions src/Transport/TransportInterface.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import {Subject} from 'rxjs/Subject';
import {Observable} from 'rxjs/Observable';
import {Message} from '../Messages/Message';

export interface TransportInterface extends Subject<Message> {
onOpen: Observable<Event>
onClose: Observable<CloseEvent>
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import {Subscription} from 'rxjs/Subscription';
import {Observable} from 'rxjs/Observable';
import {Subscriber} from 'rxjs/Subscriber';
import {Subject} from 'rxjs/Subject';
import {NextObserver} from 'rxjs/Observer';
import {CreateMessage} from '../Messages/CreateMessage';
import {TransportInterface} from './TransportInterface';

// This is used for WebSockets in node - removed by webpack for bundling
declare var require;
const WebSocket2 = require('ws');

export class WebSocketSubject<Message> extends Subject<any> {
export class WebSocketTransport<Message> extends Subject<any> implements TransportInterface {

private output: Subject<any> = new Subject();
private socket: WebSocket = null;
private open = new Subject();
private close = new Subject();

constructor(private url: string = 'ws://127.0.0.1:9090/',
private protocols?: string | string[],
private openObserver?: NextObserver<any>,
private closeObserver?: NextObserver<any>) {
constructor(private url: string = 'ws://127.0.0.1:9090/', private protocols: string | string[] = ['wamp.2.json']) {
super();
}

Expand All @@ -33,7 +33,9 @@ export class WebSocketSubject<Message> extends Subject<any> {
subscription.add(this.output.subscribe(subscriber));

subscription.add(() => {

if (this.socket) {
console.log('closing socket');
this.socket.close();
this.socket = null;
}
Expand All @@ -55,27 +57,19 @@ export class WebSocketSubject<Message> extends Subject<any> {
this.socket = null;
this.output.error(err);
};

ws.onclose = (e: CloseEvent) => {
this.socket = null;

if (this.closeObserver) {
this.closeObserver.next(e);
}
this.close.next(e);

// Handle all closes as errors
// if (e.wasClean) {
// this.output.complete();
// return;
// }

this.output.error(e);
};

ws.onopen = () => {
ws.onopen = (e: Event) => {
console.log('socket opened');
this.socket = ws;
if (this.openObserver) {
this.openObserver.next(this);
}
this.open.next(e);
};

ws.onmessage = (e: MessageEvent) => {
Expand All @@ -87,7 +81,6 @@ export class WebSocketSubject<Message> extends Subject<any> {
}
}

// @todo figure out why the Message abstract doesn't work here
public next(msg: any): void {
if (!this.socket) {
return;
Expand All @@ -104,4 +97,12 @@ export class WebSocketSubject<Message> extends Subject<any> {
this.socket = null;
}
}

get onOpen(): Observable<any> {
return this.open.asObservable();
}

get onClose(): Observable<any> {
return this.close.asObservable();
}
}
Loading

0 comments on commit e318b24

Please sign in to comment.