Skip to content

Commit

Permalink
fix: remove disconnection trigger when pong is not received in time, …
Browse files Browse the repository at this point in the history
…just warn instead
  • Loading branch information
b-ma committed May 23, 2024
1 parent a111327 commit f191970
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 31 deletions.
6 changes: 4 additions & 2 deletions src/client/Socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,10 @@ class Socket {
* JSON compatible data types (i.e. string, number, boolean, object, array and null).
*/
send(channel, ...args) {
const msg = packStringMessage(channel, ...args);
this.#socket.send(msg);
if (this.#socket.readyState === 1) {
const msg = packStringMessage(channel, ...args);
this.#socket.send(msg);
}
}

/**
Expand Down
28 changes: 20 additions & 8 deletions src/server/Plugin.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,35 @@ import BasePlugin from '../common/BasePlugin.js';
* @inheritdoc
*/
class Plugin extends BasePlugin {
#server = null;
#clients = new Set();

/**
* @param {server.Server} server - The soundworks server instance.
* @param {string} id - User defined id of the plugin as defined in
* {@link server.PluginManager#register}.
*/
constructor(server, id) {
super(id);
this.#server = server;
}

/**
* Instance of soundworks server.
* @type {server.Server}
* @see {@link server.Server}
*/
this.server = server;
/**
* Instance of soundworks server.
* @type {server.Server}
* @see {@link server.Server}
*/
get server() {
return this.#server;
}

/** @private */
this.clients = new Set();
/**
* Set of the clients registered in the plugin.
* @type {Set<server.Client>}
* @see {@link server.Client}
*/
get clients() {
return this.#clients;
}

/**
Expand Down
6 changes: 4 additions & 2 deletions src/server/Server.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import ContextManager from './ContextManager.js';
import PluginManager from './PluginManager.js';
import StateManager from './StateManager.js';
import {
kSocketClientId,
kSocketTerminate,
} from './Socket.js';
import Sockets from './Sockets.js';
Expand Down Expand Up @@ -839,6 +840,7 @@ Invalid certificate files, please check your:
*/
_onSocketConnection(role, socket, connectionToken) {
const client = new Client(role, socket);
socket[kSocketClientId] = client.id;
const roles = Object.keys(this.config.app.clients);

// this has been validated
Expand Down Expand Up @@ -912,8 +914,8 @@ WARNING
Version discrepancies between server and "${role}" client:
+ server: ${this.version} | client: ${version}
This might lead to unexpected behavior, you should consider to update your
dependancies on both your server and clients.
This might lead to unexpected behavior, you should consider to re-install your
dependencies on both your server and clients.
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!`);
}
Expand Down
11 changes: 8 additions & 3 deletions src/server/Socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
kSocketsRemoveFromAllRooms,
} from './Sockets.js';

export const kSocketClientId = Symbol('soundworks:socket-client-id');
export const kSocketTerminate = Symbol('soundworks:socket-terminate');

/**
Expand Down Expand Up @@ -87,9 +88,13 @@ class Socket {

this.#heartbeatId = setInterval(() => {
if (isAlive === false) {
// emit a 'close' event to go trough all the disconnection pipeline
// Emit a 'close' event to go trough all the disconnection pipeline
//
// @note - this seems to create false positive disconnections when
this.#dispatchEvent('close');
// client is busy, e.g. when loading large sound files so let's just warn
// until we gather more feedback
// this.#dispatchEvent('close');
console.warn(`[Socket] client (id: ${this[kSocketClientId]}) did not respond to ping message in time, interval: ${PING_INTERVAL}`);
return;
}

Expand Down Expand Up @@ -198,7 +203,7 @@ class Socket {
if (this.#socket.readyState === 1) {
this.#socket.send(msg, (err) => {
if (err) {
console.error('error sending msg:', channel, args, err.message);
console.error('[Socket] error sending msg:', channel, args, err.message);
}
});
}
Expand Down
28 changes: 15 additions & 13 deletions src/server/Sockets.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import querystring from 'node:querystring';
import {
Worker,
} from 'node:worker_threads';

import querystring from 'querystring';
import {
default as WebSocket,
WebSocketServer,
Expand All @@ -11,8 +11,9 @@ import {
import Socket, {
kSocketTerminate,
} from './Socket.js';
import networkLatencyWorker from './audit-network-latency.worker.js';

// @note - fs.readFileSync creates some cwd() issues...
import networkLatencyWorker from './audit-network-latency.worker.js';

export const kSocketsRemoveFromAllRooms = Symbol('soundworks:sockets-remove-from-all-rooms');
export const kSocketsLatencyStatsWorker = Symbol('soundworks:sockets-latency-stats-worker');
Expand Down Expand Up @@ -77,8 +78,7 @@ class Sockets {
});

this.#wsServer.on('connection', (ws, req) => {
const queryString = querystring.decode(req.url.split('?')[1]);
const { role, token } = queryString;
const { role, token } = querystring.parse(req.url.split('?')[1]);
const socket = new Socket(ws, this);

socket.addToRoom('*');
Expand All @@ -89,8 +89,7 @@ class Sockets {

// Prevent socket with protected role to connect is token is invalid
server.httpServer.on('upgrade', async (req, socket, head) => {
const queryString = querystring.decode(req.url.split('?')[1]);
const { role, token } = queryString;
const { role, token } = querystring.parse(req.url.split('?')[1]);

if (server.isProtected(role)) {
// we don't have any IP in the upgrade request object,
Expand All @@ -107,7 +106,7 @@ class Sockets {
}

/**
* Terminate all existing sockets
* Terminate all existing sockets.
* @private
*/
terminate() {
Expand All @@ -118,6 +117,15 @@ class Sockets {
sockets.forEach(socket => socket[kSocketTerminate]());
}

/**
* Remove given socket from all rooms.
*/
[kSocketsRemoveFromAllRooms](socket) {
for (let [_, room] of this.#rooms) {
room.delete(socket);
}
}

/**
* Add a socket to a room.
*
Expand Down Expand Up @@ -150,12 +158,6 @@ class Sockets {
}
}

[kSocketsRemoveFromAllRooms](socket) {
for (let [_key, room] of this.#rooms) {
room.delete(socket);
}
}

/**
* Send a message to all clients os given room(s). If no room is specified,
* the message is sent to all clients.
Expand Down
7 changes: 4 additions & 3 deletions src/server/audit-network-latency.worker.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// NOTICE: we use this syntax so that the server can be bundled to cjs
// with esbuild, so we can ship a cjs server bundle into Max.
// @note - We use this common js syntax so that the server can be bundled to cjs
// which allows to build a server that can be accepted by Max `node.script`
//
// Should move back to regular esm module once Max has fixed its loader
export default `
const { parentPort } = require('node:worker_threads');
Expand All @@ -11,6 +12,7 @@ let averageLatencyPeriod = 2;
let intervalId = null;
let meanLatency = 0;
// workaround that sc-utils is pure emascript module
let getTime;
let inited = new Promise(async (resolve) => {
module = await import('@ircam/sc-utils');
Expand All @@ -25,7 +27,6 @@ parentPort.on('message', async msg => {
averageLatencyPeriod = msg.value.averageLatencyPeriod;
await inited;
// launch compute in its own loop so that the number of computation is
// decoupled from the number of connected clients
clearInterval(intervalId);
Expand Down

0 comments on commit f191970

Please sign in to comment.