-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Using rxjs/webSocket on a server #5385
Comments
I'm working on making a transparent WebSocket layer for connecting a browser to a remote server (Raspberry Pi) to be able to subscribe directly to sensors and set outputs etc. from the browser or front-end application. So the idea is to have a WebSocket server that handles incoming connections and subscribes in a seamless manner to the GPIO ports. This would be a welcomed feature! |
@kevmo314 Seems there is a way to do this already: const WebSocket = require('ws')
const { webSocket } = require('rxjs/webSocket')
const webSocketConnection$ = (
webSocket({
protocol: 'v1',
url: 'http://example.com:3000',
WebSocketCtor: WebSocket,
})
) Ref: https://gist.github.com/Sawtaytoes/fe3d16b1a15aa20eef5d2a41d0b39934 |
@abarke This doesn't work for existing web sockets. I mentioned this in the first line of the FR. |
@kevmo314 apologies I see your point. You are right. An already established WebSocket would allow a new I created a demo to simulate a shared resource on a server just to wrap my head around it, as I need something similar for a project: https://stackblitz.com/edit/rxjs-shared-source-example?file=index.ts It might be simple enough to modify the |
Hey @kevmo314 I managed to create a PoC and published the library on NPM for consumption. I rewrote the WebSocketSubject class as extending it proved difficult due to not being able to set private members of the |
Any feedback on this @kevmo314 ? Im looking at opening a PR for this, however I'm not sure if I should merge the WebSocketServerSubject into the WebSocketSubject (could get messy) or leave it as its own class (some duplicate code, but has separation of concerns and is more maintainable IMHO) @benlesh any ideas or feedback of how to proceed? |
Unfortunately my use case has come and gone, the code that needed this is now happily running and stable. I took a look at the package though, it looks pretty good and I'd love to see this PR'd into the main |
@kevmo314 regarding two subjects on a single socket... do you mean something like this?
I tried this but it seems only |
what about using const wss = new Server({ server });
const handleConnection = (websocket: WebSocket): Observable<WebSocket.MessageEvent | WebSocket.CloseEvent> => {
const onmessage$ = fromEvent<WebSocket.MessageEvent>(websocket, 'message')
.pipe(
tap(m => console.log(m.type))
);
const onclose$ = fromEvent<WebSocket.CloseEvent>(websocket, 'close')
.pipe(
tap(m => console.log(m.reason)));
// const onerror$ = ....
// etc
return merge(onmessage$, onclose$)
}
fromEvent(wss, 'connection')
.pipe(
map(a => a[0] as WebSocket),
mergeMap(handleConnection)
).subscribe();
} |
This is very nice idea, did you try it ? |
sure. in the moment one message type is looking like that: const onConvertMessage$ = fromEvent(websocket, 'message')
.pipe(
map(m => WsExchange.fromMessage(JSON.parse((m as WsMessage).data))),
filter(m => m.action === "convert"),
map(m => m as ConvertText),
switchMap(handleConvertion),
mergeMap(uploadToAws),
map(bucket => GeneratedAudio.from(bucket)),
tap(message => websocket.send(JSON.stringify(message))),
catchError((err, caught) => {
console.log("error in convert pipeline: ", err)
return caught;
}),
); |
I don't think this makes sense as part of If it is included, it should not be part of the |
This is not true, as mentioned in the original issue the request is to follow the w3c specification for which the ws websocket is compliant. The change is to support passing in a spec-compliant websocket which also exists in browsers, not a specifically nodejs runtime websocket. |
The client is spec-compliant in that it can talk to other spec-compliant sockets, absolutely, but the API cannot be used in the browser. For example, the node websocket allows the user to implement ping/pong, whereas browser websockets handle ping/pong automatically. Does RxJS need to expose additional streams on the subject? IMO yes, if the goal is to fully support native sockets. If the goal is only to pass in a web socket, why not create a dynamic constructor as-needed? Someone suggested this earlier, but without creating a constructor on-the-fly. We could leverage variable scope, for example: const nativeSocket = /* native `ws` created socket reference */;
const WebsocketCtor = function LocalCtor(url: string, protocol: string[]) {
this = nativeSocket;
};
const rxjsSocket = new WebSocketSubject({
WebSocketCtor,
}); In this situation, RxJS won't attach listeners until the developer subscribes to the new subject. It would be possible for the socket to emit events that are lost and de-sync RxJS state. For example, the |
I'm writing an additional comment instead of more edits 😂 |
Hi, I'm trying to access message's data in import express from 'express'
import { createServer } from 'http'
import { fromEvent, merge } from 'rxjs'
import { map, mergeMap, tap } from 'rxjs/operators'
import { WebSocketServer } from 'ws'
const app = express()
const server = createServer(app)
const wss = new WebSocketServer({ server })
const handleConnection = socket => {
const onClose$ = fromEvent(socket, 'close').pipe(tap(value => console.log('close', value)))
const onMessage$ = fromEvent(socket, 'message').pipe(tap(value => console.log('message', value)))
return merge(onClose$, onMessage$)
}
fromEvent(wss, 'connection')
.pipe(
map(value => value[0]),
mergeMap(handleConnection)
)
.subscribe() ...but instead I'm getting ws's event target formatted messages, like so:
How can I fix this? |
Feature Request
Right now WebSocketSubject only accepts a constructor. Can this be changed to also accept an existing socket in the config?
Specifically, my use case is that I'd like to use rxjs with websocket/ws on my server, and their API would allow me to do something like:
wrt api compatibility, I think it's reasonable to assume the web socket remains w3c compatible instead of worrying about the NodeJS.WebSocket API. There are various compatibility shims to adapt the assorted NodeJS variants to w3c sockets. I'm curious if this was an intentional choice to not allow wrapping an existing socket or if this can be done?
The text was updated successfully, but these errors were encountered: