Skip to content

Commit

Permalink
Support binary chunks to avoid overhead of stringify/parsing chunks, …
Browse files Browse the repository at this point in the history
…also support service workers (#1)

* Add test using different workers

Signed-off-by: Marcos Candeia <[email protected]>

* Support service worker and encode messages to allow binary chunks

Signed-off-by: Marcos Candeia <[email protected]>

* Set default chan capacity to 0

Signed-off-by: Marcos Candeia <[email protected]>

* Remove default binary type

Signed-off-by: Marcos Candeia <[email protected]>

* Regen deno lock

Signed-off-by: Marcos Candeia <[email protected]>

* Remove unused funcs

Signed-off-by: Marcos Candeia <[email protected]>

---------

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Jun 2, 2024
1 parent f7525bf commit 2e25c5d
Show file tree
Hide file tree
Showing 13 changed files with 3,082 additions and 636 deletions.
2,813 changes: 2,813 additions & 0 deletions big.response.html

Large diffs are not rendered by default.

116 changes: 95 additions & 21 deletions channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ export const ifClosedChannel =
};

export const ignoreIfClosed = ifClosedChannel(() => {});
export const makeChan = <T>(): Channel<T> => {
export const makeChan = <T>(capacity = 0): Channel<T> => {
let currentCapacity = capacity;
const queue: Queue<{ value: T; resolve: () => void }> = new Queue();
const ctrl = new AbortController();
const abortPromise = Promise.withResolvers<void>();
Expand All @@ -45,7 +46,15 @@ export const makeChan = <T>(): Channel<T> => {
const send = (value: T): Promise<void> => {
return new Promise((resolve, reject) => {
if (ctrl.signal.aborted) reject(new ClosedChannelError());
queue.push({ value, resolve });
let mResolve = resolve;
if (currentCapacity > 0) {
currentCapacity--;
mResolve = () => {
currentCapacity++;
};
resolve();
}
queue.push({ value, resolve: mResolve });
});
};

Expand Down Expand Up @@ -88,13 +97,73 @@ export interface DuplexChannel<TSend, TReceive> {
out: Channel<TSend>;
}

export const makeWebSocket = <TSend, TReceive>(
// deno-lint-ignore no-explicit-any
export type Message<TMessageProperties = any> = TMessageProperties & {
payload?: Uint8Array;
};

// Function to combine metadata and binary data
function createMessage(
metadata: unknown,
uint8Array?: Uint8Array,
): ArrayBuffer {
const metadataString = JSON.stringify(metadata);
const metadataUint8Array = new TextEncoder().encode(metadataString);

// Create a buffer to hold the metadata length, metadata, and binary data
const buffer = new ArrayBuffer(
4 + metadataUint8Array.length + (uint8Array?.length ?? 0),
);
const view = new DataView(buffer);

// Write the metadata length (4 bytes)
view.setUint32(0, metadataUint8Array.length, true);

// Write the metadata
new Uint8Array(buffer, 4, metadataUint8Array.length).set(metadataUint8Array);

// Write the binary data
uint8Array &&
new Uint8Array(buffer, 4 + metadataUint8Array.length).set(uint8Array);

return buffer;
}

function parseMessage(
buffer: ArrayBuffer,
// deno-lint-ignore no-explicit-any
): { metadata: any; binaryData: Uint8Array } {
const view = new DataView(buffer);

// Read the metadata length (4 bytes)
const metadataLength = view.getUint32(0, true);

// Read the metadata
const metadataUint8Array = new Uint8Array(buffer, 4, metadataLength);
const metadataString = new TextDecoder().decode(metadataUint8Array);
const metadata = JSON.parse(metadataString);

// Read the binary data
const binaryData = new Uint8Array(buffer, 4 + metadataLength);

return { metadata, binaryData };
}

export const makeWebSocket = <
TSend,
TReceive,
TMessageSend = Message<TSend>,
TMessageRecieve = Message<TReceive>,
>(
socket: WebSocket,
parse: boolean = true,
): Promise<DuplexChannel<TSend, TReceive>> => {
const sendChan = makeChan<TSend>();
const recvChan = makeChan<TReceive>();
const ch = Promise.withResolvers<DuplexChannel<TSend, TReceive>>();
): Promise<DuplexChannel<TMessageSend, TMessageRecieve>> => {
const sendChan = makeChan<TMessageSend>();
const recvChan = makeChan<TMessageRecieve>();
const ch = Promise.withResolvers<
DuplexChannel<TMessageSend, TMessageRecieve>
>();
socket.binaryType = "arraybuffer";
socket.onclose = () => {
sendChan.close();
recvChan.close();
Expand All @@ -104,26 +173,29 @@ export const makeWebSocket = <TSend, TReceive>(
ch.reject(err);
};
socket.onmessage = async (msg) => {
let eventData = msg.data;
const target = msg?.target;
if (
target && "binaryType" in target &&
target.binaryType === "blob" && typeof eventData === "object" &&
"text" in eventData
) {
eventData = await eventData.text();
}
const message = parse ? JSON.parse(eventData) : eventData;
if (recvChan.signal.aborted) {
return;
}
await recvChan.send(message);
if (!parse) {
await recvChan.send(msg.data);
return;
}
const { binaryData, metadata } = parseMessage(msg.data);
await recvChan.send({ ...metadata, payload: binaryData });
};
socket.onopen = async () => {
ch.resolve({ in: recvChan, out: sendChan });
for await (const message of sendChan.recv()) {
try {
socket.send(parse ? JSON.stringify(message) : message as ArrayBuffer);
if (!parse) {
socket.send(message as unknown as ArrayBuffer);
continue;
}
const { payload, ...rest } = message as { payload?: Uint8Array };
const msg = createMessage(rest, payload);
socket.send(
msg,
);
} catch (_err) {
console.error("error sending message through socket", message);
}
Expand All @@ -148,8 +220,10 @@ export const makeReadableStream = (
},
});
};
export const makeChanStream = (stream: ReadableStream): Channel<Uint8Array> => {
const chan = makeChan<Uint8Array>();
export const makeChanStream = (
stream: ReadableStream,
): Channel<Uint8Array> => {
const chan = makeChan<Uint8Array>(0); // capacity

// Consume the transformed stream to trigger the pipeline
const reader = stream.getReader();
Expand Down
71 changes: 58 additions & 13 deletions client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import type { ClientMessage, ClientState, ServerMessage } from "./messages.ts";
* @property {string} domain - The domain to register the connection with.
* @property {string} server - The WebSocket server URL.
* @property {string} localAddr - The local address for the WebSocket connection.
* @property {boolean} sw - If it should use a service worker.
*/
export interface ConnectOptions {
apiKey: string;
domain: string;
server: string;
localAddr: string;
sw?: boolean;
}

/**
Expand All @@ -33,7 +35,9 @@ export interface Connected {
* @param {ConnectOptions} opts - Options for establishing the connection.
* @returns {Promise<Connected>} A promise that resolves with the connection status.
*/
export const connect = async (opts: ConnectOptions): Promise<Connected> => {
export const connectMainThread = async (
opts: ConnectOptions,
): Promise<Connected> => {
const closed = Promise.withResolvers<void>();
const registered = Promise.withResolvers<void>();
const client = typeof Deno.createHttpClient === "function"
Expand Down Expand Up @@ -65,23 +69,64 @@ export const connect = async (opts: ConnectOptions): Promise<Connected> => {
wsSockets,
ch,
};
const ctrl = new AbortController();
try {
for await (const message of ch.in.recv(ctrl.signal)) {
Promise.resolve(handleServerMessage(state, message)).then(() => {
if (state.live) {
registered.resolve();
}
}).catch((err) => {
console.error(new Date(), "error handling message", err);
!ctrl.signal.aborted && ctrl.abort();
});
for await (const message of ch.in.recv()) {
await handleServerMessage(state, message);
if (state.live) {
registered.resolve();
}
}
} catch (_err) {
// ignore
} catch (err) {
console.error(new Date(), "error handling message", err);
} finally {
closed.resolve();
}
})();
return { closed: closed.promise, registered: registered.promise };
};

/**
* Establishes a WebSocket connection with the server.
* @param {ConnectOptions} opts - Options for establishing the connection.
* @returns {Promise<Connected>} A promise that resolves with the connection status.
*/
export const connectSW = (opts: ConnectOptions): Promise<Connected> => {
const closed = Promise.withResolvers<void>();
const registered = Promise.withResolvers<void>();
const worker = new Worker(import.meta.url, {
type: "module",
deno: { permissions: "inherit" },
});
worker.addEventListener("message", (message) => {
if (message.data === "closed") {
closed.resolve();
}
if (message.data === "registered") {
registered.resolve();
}
});
worker.postMessage(opts);

return Promise.resolve({
closed: closed.promise,
registered: registered.promise,
});
};

// @ts-ignore: "trust-me"
self.onmessage = async (evt) => {
const { closed, registered } = await connectMainThread(evt.data);
// @ts-ignore: "trust-me"
closed.then(() => self.postMessage("closed"));
// @ts-ignore: "trust-me"
registered.then(() => self.postMessage("registered"));
};

/**
* Establishes a WebSocket connection with the server.
* @param {ConnectOptions} opts - Options for establishing the connection.
* @returns {Promise<Connected>} A promise that resolves with the connection status.
*/
export const connect = async (opts: ConnectOptions): Promise<Connected> => {
return opts.sw ? connectSW(opts) : await connectMainThread(opts);
};
Loading

0 comments on commit 2e25c5d

Please sign in to comment.