Skip to content

Commit

Permalink
feat: add optional filter to attached states
Browse files Browse the repository at this point in the history
  • Loading branch information
b-ma committed Apr 29, 2024
1 parent 8fbf68b commit 982816b
Show file tree
Hide file tree
Showing 5 changed files with 359 additions and 258 deletions.
65 changes: 59 additions & 6 deletions src/common/BaseSharedState.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import {
// }

class BaseSharedState {
constructor(id, remoteId, schemaName, schema, client, isOwner, manager, initValues = {}) {
constructor(id, remoteId, schemaName, schema, client, isOwner, manager, initValues, filter) {
/** @private */
this._id = id;
/** @private */
Expand All @@ -37,6 +37,9 @@ class BaseSharedState {
this._client = client;
/** @private */
this._manager = manager;
/** @private */
this._filter = filter;

/**
* true is the state has been detached or deleted
* @private
Expand Down Expand Up @@ -332,13 +335,15 @@ ${JSON.stringify(initValues, null, 2)}`);
}

if (!isPlainObject(updates)) {
throw new ReferenceError(`[SharedState] State "${this.schemaName}": state.set(updates[, context]) should receive an object as first parameter`);
throw new TypeError(`[SharedState] State "${this.schemaName}": state.set(updates[, context]) should receive an object as first parameter`);
}

if (context !== null && !isPlainObject(context)) {
throw new ReferenceError(`[SharedState] State "${this.schemaName}": state.set(updates[, context]) should receive an object as second parameter`);
throw new TypeError(`[SharedState] State "${this.schemaName}": state.set(updates[, context]) should receive an object as second parameter`);
}



const newValues = {};
const oldValues = {};
const localParams = {};
Expand All @@ -349,10 +354,18 @@ ${JSON.stringify(initValues, null, 2)}`);
let propagateNow = false;

for (let name in updates) {
// try to coerce value early, so that eventual errors are triggered early
// Try to coerce value early, so that eventual errors are triggered early
// on the node requesting the update, and not only on the server side
// This throws if name does not exists
this._parameters.coerceValue(name, updates[name]);

// Check that name is in filter list, if any
if (this._filter !== null) {
if (!this._filter.includes(name)) {
throw new DOMException(`[SharedState] State "${this.schemaName}": cannot set parameter '${name}', parameter is not in filter list`, 'NotSupportedError');
}
}

// `immediate` option behavior
//
// If immediate=true
Expand Down Expand Up @@ -437,6 +450,16 @@ ${JSON.stringify(initValues, null, 2)}`);
* const value = state.get('paramName');
*/
get(name) {
if (!this._parameters.has(name)) {
throw new ReferenceError(`[SharedState] State "${this.schemaName}": Cannot get value of undefined parameter "${name}"`);
}

if (this._filter !== null) {
if (!this._filter.includes(name)) {
throw new DOMException(`[SharedState] State "${this.schemaName}": cannot get parameter '${name}', parameter is not in filter list`, 'NotSupportedError');
}
}

return this._parameters.get(name);
}

Expand All @@ -455,6 +478,16 @@ ${JSON.stringify(initValues, null, 2)}`);
* const value = state.getUnsafe('paramName');
*/
getUnsafe(name) {
if (!this._parameters.has(name)) {
throw new ReferenceError(`[SharedState] State "${this.schemaName}": Cannot get value of undefined parameter "${name}"`);
}

if (this._filter !== null) {
if (!this._filter.includes(name)) {
throw new DOMException(`[SharedState] State "${this.schemaName}": cannot get parameter '${name}', parameter is not in filter list`, 'NotSupportedError');
}
}

return this._parameters.getUnsafe(name);
}

Expand All @@ -467,7 +500,17 @@ ${JSON.stringify(initValues, null, 2)}`);
* const values = state.getValues();
*/
getValues() {
return this._parameters.getValues();
const values = this._parameters.getValues();

if (this._filter !== null) {
for (let name in values) {
if (!this._filter.includes(name)) {
delete values[name];
}
}
}

return values;
}

/**
Expand All @@ -484,7 +527,17 @@ ${JSON.stringify(initValues, null, 2)}`);
* const values = state.getValues();
*/
getValuesUnsafe() {
return this._parameters.getValuesUnsafe();
const values = this._parameters.getValuesUnsafe();

if (this._filter !== null) {
for (let name in values) {
if (!this._filter.includes(name)) {
delete values[name];
}
}
}

return values;
}

/**
Expand Down
42 changes: 36 additions & 6 deletions src/common/BaseStateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class BaseStateManager {
// CREATE
// ---------------------------------------------

this.client.transport.addListener(CREATE_RESPONSE, (reqId, stateId, remoteId, schemaName, schema, initValues) => {
this.client.transport.addListener(CREATE_RESPONSE, (reqId, stateId, remoteId, schemaName, schema, initValues, filter) => {

// cache schema when first dealing it to save some bandwidth
if (!this._cachedSchemas.has(schemaName)) {
Expand All @@ -59,7 +59,7 @@ class BaseStateManager {

schema = this._cachedSchemas.get(schemaName);

const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, true, this, initValues);
const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, true, this, initValues, null);
this._statesById.set(state.id, state);

this._promiseStore.resolve(reqId, state);
Expand All @@ -72,7 +72,7 @@ class BaseStateManager {
// ---------------------------------------------
// ATTACH (when creator, is attached by default)
// ---------------------------------------------
this.client.transport.addListener(ATTACH_RESPONSE, (reqId, stateId, remoteId, schemaName, schema, currentValues) => {
this.client.transport.addListener(ATTACH_RESPONSE, (reqId, stateId, remoteId, schemaName, schema, currentValues, filter) => {
// cache schema when first dealing with it to save some bandwidth
// note: when we make the schemas dynamic at some point
// the server should be able to invalidate the schema and send
Expand All @@ -83,7 +83,7 @@ class BaseStateManager {

schema = this._cachedSchemas.get(schemaName);

const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, false, this, currentValues);
const state = new SharedState(stateId, remoteId, schemaName, schema, this.client, false, this, currentValues, filter);
this._statesById.set(state.id, state);

this._promiseStore.resolve(reqId, state);
Expand Down Expand Up @@ -243,11 +243,41 @@ class BaseStateManager {
* @example
* const state = await client.stateManager.attach('my-schema');
*/
async attach(schemaName, stateId = null) {
async attach(schemaName, stateIdOrFilter = null, filter = null) {
let stateId = null;

if (!isString(schemaName)) {
throw new TypeError(`Cannot execute 'attach' on 'StateManager': argument 1 should be either a number or an array`);
}

if (arguments.length === 2) {
if (Number.isFinite(stateIdOrFilter)) {
stateId = stateIdOrFilter;
filter = null;
} else if (Array.isArray(stateIdOrFilter)) {
stateId = null;
filter = stateIdOrFilter;
} else {
throw new TypeError(`Cannot execute 'attach' on 'StateManager': argument 2 should be either a number or an array`);
}
}

if (arguments.length === 3) {
stateId = stateIdOrFilter;

if (!Number.isFinite(stateId)) {
throw new TypeError(`Cannot execute 'attach' on 'StateManager': argument 2 should be a number`);
}

if (!Array.isArray(filter)) {
throw new TypeError(`Cannot execute 'attach' on 'StateManager': argument 2 should be a number`);
}
}

return new Promise((resolve, reject) => {
const reqId = this._promiseStore.add(resolve, reject, 'attach-request');
const requireSchema = this._cachedSchemas.has(schemaName) ? false : true;
this.client.transport.emit(ATTACH_REQUEST, reqId, schemaName, stateId, requireSchema);
this.client.transport.emit(ATTACH_REQUEST, reqId, schemaName, stateId, requireSchema, filter);
});
}

Expand Down
64 changes: 48 additions & 16 deletions src/common/SharedStatePrivate.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,25 @@ import {
UPDATE_NOTIFICATION,
} from '../common/constants.js';

/**
* Filter update object according to filter list.
* Return identity is filter is null
* @param {object} updates
* @param {array|null} filter
*/
function filterUpdates(updates, filter) {
if (filter === null) {
return updates;
}

return filter.reduce((acc, key) => {
if (key in updates) {
acc[key] = updates[key];
}
return acc;
}, {});
}

/**
* The "real" state, this instance is kept private by the server.StateManager.
* It can only be accessed through a SharedState proxy.
Expand All @@ -30,8 +49,8 @@ class SharedStatePrivate {
this._creatorId = null;
}

_attachClient(remoteId, client, isOwner = false) {
const clientInfos = { client, isOwner };
_attachClient(remoteId, client, isOwner, filter) {
const clientInfos = { client, isOwner, filter };
this._attachedClients.set(remoteId, clientInfos);

if (isOwner) {
Expand Down Expand Up @@ -61,7 +80,7 @@ class SharedStatePrivate {
}

if (hookAborted === false) {
const filteredUpdates = {};
let acknowledgedUpdates = {};
let hasUpdates = false;

for (let name in updates) {
Expand All @@ -76,7 +95,7 @@ class SharedStatePrivate {
}

if ((filterChange && changed) || !filterChange) {
filteredUpdates[name] = newValue;
acknowledgedUpdates[name] = newValue;
hasUpdates = true;
}
}
Expand All @@ -103,41 +122,54 @@ class SharedStatePrivate {

// propagate RESPONSE to the client that originates the request if not the server
if (client.id !== -1) {
// no need to filter updates on requested, is blocked on client-side
client.transport.emit(
`${UPDATE_RESPONSE}-${this.id}-${remoteId}`,
reqId, filteredUpdates, context,
reqId, acknowledgedUpdates, context,
);
}

// propagate NOTIFICATION to all other attached clients except server
// propagate NOTIFICATION to all attached clients except server
for (let [peerRemoteId, clientInfos] of this._attachedClients) {
const peer = clientInfos.client;

if (remoteId !== peerRemoteId && peer.id !== -1) {
peer.transport.emit(
`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`,
filteredUpdates, context,
);
const filter = clientInfos.filter;
const filteredUpdates = filterUpdates(acknowledgedUpdates, filter);

// propagate only if there something left after applying the filter
if (Object.keys(filteredUpdates).length > 0) {
peer.transport.emit(
`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`,
filteredUpdates, context,
);
}
}
}

// propagate RESPONSE to server if it is the requester
if (client.id === -1) {
// no need to filter updates on requested, is blocked on client-side
client.transport.emit(
`${UPDATE_RESPONSE}-${this.id}-${remoteId}`,
reqId, filteredUpdates, context,
reqId, acknowledgedUpdates, context,
);
}

// propagate NOTIFICATION other attached state that belongs to server
// propagate NOTIFICATION to other state attached on the server
for (let [peerRemoteId, clientInfos] of this._attachedClients) {
const peer = clientInfos.client;

if (remoteId !== peerRemoteId && peer.id === -1) {
peer.transport.emit(
`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`,
filteredUpdates, context,
);
const filter = clientInfos.filter;
const filteredUpdates = filterUpdates(acknowledgedUpdates, filter);

if (Object.keys(filteredUpdates).length > 0) {
peer.transport.emit(
`${UPDATE_NOTIFICATION}-${this.id}-${peerRemoteId}`,
filteredUpdates, context,
);
}
}
}
} else {
Expand Down
25 changes: 20 additions & 5 deletions src/server/StateManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@ class StateManager extends BaseStateManager {

// attach client to the state as owner
const isOwner = true;
state._attachClient(remoteId, client, isOwner);
const filter = null;
state._attachClient(remoteId, client, isOwner, filter);

this._sharedStatePrivateById.set(stateId, state);

Expand Down Expand Up @@ -407,7 +408,7 @@ class StateManager extends BaseStateManager {
// ---------------------------------------------
client.transport.addListener(
ATTACH_REQUEST,
(reqId, schemaName, stateId = null, requireSchema = true) => {
(reqId, schemaName, stateId = null, requireSchema = true, filter = null) => {
if (this._schemas.has(schemaName)) {
let state = null;

Expand All @@ -432,15 +433,29 @@ class StateManager extends BaseStateManager {
// i.e. same state -> several remote attach on the same node
const remoteId = generateRemoteId.next().value;
const isOwner = false;
state._attachClient(remoteId, client, isOwner);

const currentValues = state._parameters.getValues();
const schema = this._schemas.get(schemaName);
const schemaOption = requireSchema ? schema : null;

// if filter given, check that all filter entries are valid schema keys
// @todo - improve error reportin: report invalid filters
if (filter !== null) {
const keys = Object.keys(schema);
const isValid = filter.reduce((acc, key) => acc && keys.includes(key), true);

if (!isValid) {
const msg = `[stateManager] Cannot attach, invalid filter (${filter.join(', ')}) for schema "${schemaName}"`;
console.error(msg);

return client.transport.emit(ATTACH_ERROR, reqId, msg);
}
}

state._attachClient(remoteId, client, isOwner, filter);

client.transport.emit(
ATTACH_RESPONSE,
reqId, state.id, remoteId, schemaName, schemaOption, currentValues,
reqId, state.id, remoteId, schemaName, schemaOption, currentValues, filter,
);

} else {
Expand Down
Loading

0 comments on commit 982816b

Please sign in to comment.