Skip to content

Commit

Permalink
feat(studio connections): Support for binary serialization of messages (
Browse files Browse the repository at this point in the history
  • Loading branch information
jespertheend authored Jan 6, 2024
1 parent 13446fe commit 0fd75b8
Show file tree
Hide file tree
Showing 25 changed files with 852 additions and 137 deletions.
5 changes: 4 additions & 1 deletion src/inspector/InspectorManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ export class InspectorManager {
* @private
*/
getResponseHandlers() {
return {
/** @satisfies {import("../network/studioConnections/DiscoveryManager.js").ConnectionRequestAcceptOptions<any>} */
const handlers = {
reliableResponseHandlers: {},
};
return handlers;
}
}
16 changes: 13 additions & 3 deletions src/network/studioConnections/DiscoveryManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,23 @@ import {StudioConnection} from "./StudioConnection.js";
* When a correct token is provided, the connection is accepted regardless of any origin allow lists or preferences.
*/

/**
* @template {import("../../mod.js").TypedMessengerSignatures} TReliableResponseHandlers
* @typedef ConnectionRequestAcceptOptions
* @property {TReliableResponseHandlers} [reliableResponseHandlers]
* @property {Object<string, (...args: any[]) => ArrayBuffer | Promise<ArrayBuffer>>} [requestSerializers]
* @property {Object<string, (buffer: ArrayBuffer) => unknown[] | Promise<unknown[]>>} [requestDeserializers]
* @property {Object<string, (...args: any[]) => ArrayBuffer | Promise<ArrayBuffer>>} [responseSerializers]
* @property {Object<string, (buffer: ArrayBuffer) => unknown | Promise<unknown>>} [responseDeserializers]
*/

/**
* @typedef OnConnectionCreatedRequest
* @property {import("../../mod.js").UuidString} otherClientUuid
* @property {boolean} initiatedByMe
* @property {ConnectionRequestData} connectionRequestData
* @property {ClientType} clientType
* @property {<T extends import("../../mod.js").TypedMessengerSignatures>(reliableResponseHandlers: T) => StudioConnection<T, any>} accept Accepts the connection and
* @property {<T extends import("../../mod.js").TypedMessengerSignatures>(options: ConnectionRequestAcceptOptions<T>) => StudioConnection<T, any>} accept Accepts the connection and
* returns a StudioConnection with the provided response handlers.
* If none of the registered callbacks call `accept()` (synchronously), the connection will be closed immediately.
* @property {() => void} reject Closes the connection and notifies the other end that the connection was not accepted.
Expand Down Expand Up @@ -145,11 +155,11 @@ export class DiscoveryManager {
clientType: messageHandler.clientType,
initiatedByMe: messageHandler.initiatedByMe,
connectionRequestData: messageHandler.connectionRequestData,
accept: reliableResponseHandlers => {
accept: options => {
assertFirstCall();
accepted = true;
messageHandler.requestAccepted();
return new StudioConnection(messageHandler, reliableResponseHandlers);
return new StudioConnection(messageHandler, options);
},
reject() {
assertFirstCall();
Expand Down
137 changes: 118 additions & 19 deletions src/network/studioConnections/StudioConnection.js
Original file line number Diff line number Diff line change
@@ -1,65 +1,164 @@
import {StorageType, binaryToObject, createObjectToBinaryOptions, objectToBinary} from "../../util/binarySerialization.js";
import {deserializeErrorHook, serializeErrorHook} from "../../util/TypedMessenger/errorSerialization.js";
import {TypedMessenger} from "../../util/TypedMessenger/TypedMessenger.js";

const typedMessengerSendDataBinaryOpts = createObjectToBinaryOptions({
structure: [
StorageType.UNION_ARRAY,
{
id: StorageType.UINT16,
type: StorageType.STRING,
args: StorageType.ARRAY_BUFFER,
},
{
id: StorageType.UINT16,
type: StorageType.STRING,
returnValue: StorageType.ARRAY_BUFFER,
didThrow: StorageType.BOOL,
},
{
json: StorageType.STRING,
},
],
nameIds: {
id: 1,
type: 2,
args: 3,
returnValue: 4,
didThrow: 5,
json: 6,
},
});

/**
* @template {import("../../mod.js").TypedMessengerSignatures} TReliableRespondHandlers
* @template {import("../../mod.js").TypedMessengerSignatures} TReliableRequestHandlers
*/
export class StudioConnection {
#messageHandler;

/**
* @param {import("./messageHandlers/MessageHandler.js").MessageHandler} messageHandler
* @param {TReliableRespondHandlers} reliableResponseHandlers
* @param {import("./DiscoveryManager.js").ConnectionRequestAcceptOptions<TReliableRespondHandlers>} options
*/
constructor(messageHandler, reliableResponseHandlers) {
/** @private */
this.messageHandler = messageHandler;
constructor(messageHandler, options) {
this.#messageHandler = messageHandler;

/** @type {TypedMessenger<TReliableRespondHandlers, TReliableRequestHandlers>} */
this.messenger = new TypedMessenger();
this.messenger.setResponseHandlers(reliableResponseHandlers);
this.messenger.setSendHandler(data => {
messageHandler.send(data.sendData, {transfer: data.transfer});
this.messenger = new TypedMessenger({
deserializeErrorHook,
serializeErrorHook,
});
this.messenger.setResponseHandlers(options.reliableResponseHandlers || /** @type {TReliableRespondHandlers} */ ({}));
this.messenger.setSendHandler(async data => {
if (messageHandler.supportsSerialization) {
await messageHandler.send(data.sendData, {transfer: data.transfer});
} else {
const castType = /** @type {string} */ (data.sendData.type);
/** @type {ArrayBuffer} */
let buffer;
if (data.sendData.direction == "request" && options.requestSerializers && options.requestSerializers[castType]) {
const serializedArguments = await options.requestSerializers[castType](...data.sendData.args);
buffer = objectToBinary({
id: data.sendData.id,
type: castType,
args: serializedArguments,
}, typedMessengerSendDataBinaryOpts);
} else if (data.sendData.direction == "response" && options.responseSerializers && options.responseSerializers[castType] && !data.sendData.didThrow) {
const serializedReturnType = await options.responseSerializers[castType](data.sendData.returnValue);
buffer = objectToBinary({
id: data.sendData.id,
type: castType,
didThrow: data.sendData.didThrow,
returnValue: serializedReturnType,
}, typedMessengerSendDataBinaryOpts);
} else {
buffer = objectToBinary({
json: JSON.stringify(data.sendData),
}, typedMessengerSendDataBinaryOpts);
}
await messageHandler.send(buffer);
}
});
messageHandler.onMessage(data => {
const castData = /** @type {import("../../mod.js").TypedMessengerMessageSendData<TReliableRespondHandlers, TReliableRequestHandlers>} */ (data);
this.messenger.handleReceivedMessage(castData);
messageHandler.onMessage(async data => {
/** @type {import("../../mod.js").TypedMessengerMessageSendData<any, any>} */
let decodedData;
if (messageHandler.supportsSerialization) {
decodedData = /** @type {import("../../mod.js").TypedMessengerMessageSendData<any, any>} */ (data);
} else {
if (!(data instanceof ArrayBuffer)) {
throw new Error("This message handler is expected to only receive ArrayBuffer messages.");
}
const decoded = binaryToObject(data, typedMessengerSendDataBinaryOpts);
if ("args" in decoded) {
if (!options.requestDeserializers || !options.requestDeserializers[decoded.type]) {
throw new Error(`Unexpected serialized request message was received for "${decoded.type}". The message was serialized by the sender in the 'requestSerializers' object, but no deserializer was defined in the 'requestDeserializers' object.`);
}
const decodedArgs = await options.requestDeserializers[decoded.type](decoded.args);
decodedData = {
direction: "request",
type: decoded.type,
id: decoded.id,
args: /** @type {any} */ (decodedArgs),
};
} else if ("returnValue" in decoded) {
if (!options.responseDeserializers || !options.responseDeserializers[decoded.type]) {
throw new Error(`Unexpected serialized response message was received for "${decoded.type}". The message was serialized by the sender in the 'responseSerializers' object, but no deserializer was defined in the 'responseDeserializers' object.`);
}
const decodedReturnValue = await options.responseDeserializers[decoded.type](decoded.returnValue);
decodedData = {
direction: "response",
type: decoded.type,
id: decoded.id,
didThrow: decoded.didThrow,
returnValue: decodedReturnValue,
};
} else if ("json" in decoded) {
decodedData = JSON.parse(decoded.json);
} else {
throw new Error("An error occurred while deserializing the message.");
}
}
const castData = /** @type {import("../../mod.js").TypedMessengerMessageSendData<TReliableRespondHandlers, TReliableRequestHandlers>} */ (decodedData);
await this.messenger.handleReceivedMessage(castData);
});
}

get otherClientUuid() {
return this.messageHandler.otherClientUuid;
return this.#messageHandler.otherClientUuid;
}

get clientType() {
return this.messageHandler.clientType;
return this.#messageHandler.clientType;
}

get connectionType() {
return this.messageHandler.connectionType;
return this.#messageHandler.connectionType;
}

/**
* True when the connection was initiated by our client (i.e. the client that holds the instance of this class in memory).
*/
get initiatedByMe() {
return this.messageHandler.initiatedByMe;
return this.#messageHandler.initiatedByMe;
}

get projectMetadata() {
return this.messageHandler.projectMetadata;
return this.#messageHandler.projectMetadata;
}

close() {
this.messageHandler.close();
this.#messageHandler.close();
}

/**
* @param {import("./messageHandlers/MessageHandler.js").OnStatusChangeCallback} cb
*/
onStatusChange(cb) {
this.messageHandler.onStatusChange(cb);
this.#messageHandler.onStatusChange(cb);
}

get status() {
return this.messageHandler.status;
return this.#messageHandler.status;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ export class InternalMessageHandler extends MessageHandler {
*/
constructor(options, messagePort, onPermissionResult) {
super(options);

this.supportsSerialization = true;

/** @private */
this.messagePort = messagePort;
/** @private */
Expand Down
16 changes: 13 additions & 3 deletions src/network/studioConnections/messageHandlers/MessageHandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*/

export class MessageHandler {
/** @typedef {(data: unknown) => void} OnMessageCallback */
/** @typedef {(data: unknown) => Promise<void>} OnMessageCallback */

/**
* @param {MessageHandlerOptions} options
Expand All @@ -33,6 +33,13 @@ export class MessageHandler {
this.onMessageCbs = new Set();
/** @type {MessageHandlerStatus} */
this.status = "disconnected";
/**
* Set this to true when the message handler supports serializing arbitrary data.
* This is generally only supported with messaging mechanisms that use `postMessage` like functions.
* When this is false, {@linkcode send} will only receive `ArrayBuffer`s which will be serialized
* and deserialized by the `StudioConnection` class.
*/
this.supportsSerialization = false;

/** @private @type {Set<OnStatusChangeCallback>} */
this.onStatusChangeCbs = new Set();
Expand Down Expand Up @@ -76,6 +83,7 @@ export class MessageHandler {
* @param {unknown} data
* @param {object} [sendOptions]
* @param {Transferable[]} [sendOptions.transfer]
* @returns {void | Promise<void>}
*/
send(data, sendOptions) {}

Expand All @@ -102,8 +110,10 @@ export class MessageHandler {
* @protected
* @param {unknown} data
*/
handleMessageReceived(data) {
this.onMessageCbs.forEach(cb => cb(data));
async handleMessageReceived(data) {
for (const cb of this.onMessageCbs) {
await cb(data);
}
}

/**
Expand Down
8 changes: 6 additions & 2 deletions src/util/TypedMessenger/TypedMessenger.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ import {TimeoutError} from "../TimeoutError.js";
*/

/**
* @template {any} [TReturn = any]
* @typedef TypedMessengerRespondOptions
* @property {Transferable[]} [transfer] An array of objects that should be transferred.
* For this to work, the `TypedMessenger.setSendHandler()` callback should pass the `transfer` data to the correct `postMessage()` argument.
* For more info see https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Transferable_objects.
* @property {any} [returnValue] The value that should be sent to the requester.
* @property {TReturn} [returnValue] The value that should be sent to the requester.
* @property {boolean} [respond] Defaults to true, set to false to not send any response at all.
*
* **Warning:** Make sure to also set `expectResponse` to `false` on the sending end to avoid memory leaks.
Expand All @@ -129,7 +130,10 @@ import {TimeoutError} from "../TimeoutError.js";
* Alternatively you could set a `timeout` or `globalTimeout`, causing the promise to reject once the timeout is reached.
*/

/** @typedef {{"$respondOptions"?: TypedMessengerRespondOptions}} TypedMessengerRequestHandlerReturn */
/**
* @template {any} [TReturn = any]
* @typedef {{"$respondOptions"?: TypedMessengerRespondOptions<TReturn>}} TypedMessengerRequestHandlerReturn
*/

/**
* @template {TypedMessengerSignatures} TReq
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {DiscoveryManager} from "../../../../src/network/studioConnections/DiscoveryManager.js";
import {InternalDiscoveryMethod} from "../../../../src/network/studioConnections/discoveryMethods/InternalDiscoveryMethod.js";
import {WebRtcDiscoveryMethod} from "../../../../src/network/studioConnections/discoveryMethods/WebRtcDiscoveryMethod.js";
import {createStudioHostHandlers, createStudioInspectorHandlers} from "./handlers.js";
import {createStudioClientHandlers, createStudioHostHandlers, createStudioInspectorHandlers} from "./handlers.js";

/**
* @typedef {import("../../../../src/network/studioConnections/DiscoveryManager.js").AvailableConnectionWithType & {connectionState: import("../../../../src/network/studioConnections/messageHandlers/MessageHandler.js").MessageHandlerStatus}} StudioConnectionData
Expand Down Expand Up @@ -106,7 +106,7 @@ export class StudioConnectionsManager {
}
acceptHandler = () => {
/** @type {import("./handlers.js").StudioClientHostConnection} */
const connection = connectionRequest.accept({});
const connection = connectionRequest.accept(createStudioClientHandlers());
this.#projectManager.assignRemoteConnection(connection);
this.#addActiveConnection(connection);
};
Expand Down
Loading

0 comments on commit 0fd75b8

Please sign in to comment.