diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..9e26dfe --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1 @@ +{} \ No newline at end of file diff --git a/channel.ts b/channel.ts index 7defdb3..d2f5708 100644 --- a/channel.ts +++ b/channel.ts @@ -1,4 +1,5 @@ import { Queue } from "./queue.ts"; +import { jsonSerializer } from "./serializers.ts"; export interface Channel { closed: Promise; @@ -99,71 +100,41 @@ export interface DuplexChannel { // deno-lint-ignore no-explicit-any export type Message = TMessageProperties & { - payload?: Uint8Array; + chunk?: Uint8Array; }; -// Function to combine metadata and binary data -function createMessage( - metadata: unknown, - uint8Array?: Uint8Array, -): ArrayBuffer { - const metadataString = JSON.stringify(metadata); - const metadataUint8Array = new TextEncoder().encode(metadataString); - - // Create a buffer to hold the metadata length, metadata, and binary data - const buffer = new ArrayBuffer( - 4 + metadataUint8Array.length + (uint8Array?.length ?? 0), - ); - const view = new DataView(buffer); - - // Write the metadata length (4 bytes) - view.setUint32(0, metadataUint8Array.length, true); - - // Write the metadata - new Uint8Array(buffer, 4, metadataUint8Array.length).set(metadataUint8Array); - - // Write the binary data - uint8Array && - new Uint8Array(buffer, 4 + metadataUint8Array.length).set(uint8Array); - - return buffer; -} - -function parseMessage( - buffer: ArrayBuffer, - // deno-lint-ignore no-explicit-any -): { metadata: any; binaryData: Uint8Array } { - const view = new DataView(buffer); - - // Read the metadata length (4 bytes) - const metadataLength = view.getUint32(0, true); - - // Read the metadata - const metadataUint8Array = new Uint8Array(buffer, 4, metadataLength); - const metadataString = new TextDecoder().decode(metadataUint8Array); - const metadata = JSON.parse(metadataString); - - // Read the binary data - const binaryData = new Uint8Array(buffer, 4 + metadataLength); - - return { metadata, binaryData }; +export interface MessageSerializer< + TSend, + TReceive, + TRawFormat extends string | ArrayBufferLike | ArrayBufferView | Blob, +> { + binaryType?: BinaryType; + serialize: ( + msg: Message, + ) => TRawFormat; + deserialize: (str: TRawFormat) => Message; } export const makeWebSocket = < TSend, TReceive, - TMessageSend = Message, - TMessageRecieve = Message, + TMessageFormat extends string | ArrayBufferLike | ArrayBufferView | Blob = + | string + | ArrayBufferLike + | ArrayBufferView + | Blob, >( socket: WebSocket, - parse: boolean = true, -): Promise> => { - const sendChan = makeChan(); - const recvChan = makeChan(); + _serializer?: MessageSerializer, +): Promise, Message>> => { + const serializer = _serializer ?? + jsonSerializer, Message>(); + const sendChan = makeChan>(); + const recvChan = makeChan>(); const ch = Promise.withResolvers< - DuplexChannel + DuplexChannel, Message> >(); - socket.binaryType = "arraybuffer"; + socket.binaryType = serializer.binaryType ?? "blob"; socket.onclose = () => { sendChan.close(); recvChan.close(); @@ -176,25 +147,14 @@ export const makeWebSocket = < if (recvChan.signal.aborted) { return; } - if (!parse) { - await recvChan.send(msg.data); - return; - } - const { binaryData, metadata } = parseMessage(msg.data); - await recvChan.send({ ...metadata, payload: binaryData }); + await recvChan.send(serializer.deserialize(msg.data)); }; socket.onopen = async () => { ch.resolve({ in: recvChan, out: sendChan }); for await (const message of sendChan.recv()) { try { - if (!parse) { - socket.send(message as unknown as ArrayBuffer); - continue; - } - const { payload, ...rest } = message as { payload?: Uint8Array }; - const msg = createMessage(rest, payload); socket.send( - msg, + serializer.serialize(message), ); } catch (_err) { console.error("error sending message through socket", message); diff --git a/client.ts b/client.ts index 6ffe91e..3492f27 100644 --- a/client.ts +++ b/client.ts @@ -1,7 +1,10 @@ import { type Channel, makeWebSocket } from "./channel.ts"; +import denoJSON from "./deno.json" with { type: "json" }; import { handleServerMessage } from "./handlers.client.ts"; import type { ClientMessage, ClientState, ServerMessage } from "./messages.ts"; +import { dataViewerSerializer } from "./serializers.ts"; +export const CLIENT_VERSION_QUERY_STRING = "v"; /** * Options for establishing a connection. * @typedef {Object} ConnectOptions @@ -49,8 +52,13 @@ export const connectMainThread = async ( }) : undefined; - const socket = new WebSocket(`${opts.server}/_connect`); - const ch = await makeWebSocket(socket); + const socket = new WebSocket( + `${opts.server}/_connect?${CLIENT_VERSION_QUERY_STRING}=${denoJSON.version}`, + ); + const ch = await makeWebSocket( + socket, + dataViewerSerializer(), + ); await ch.out.send({ id: crypto.randomUUID(), type: "register", diff --git a/deno.lock b/deno.lock new file mode 100644 index 0000000..7c78c20 --- /dev/null +++ b/deno.lock @@ -0,0 +1,14 @@ +{ + "version": "3", + "packages": { + "specifiers": { + "jsr:@deco/warp@0.2.4": "jsr:@deco/warp@0.2.4" + }, + "jsr": { + "@deco/warp@0.2.4": { + "integrity": "a4da88f96f98fc77b0c4a1565530d5d66ebe6eb013e2dcfc05121327a7c67dee" + } + } + }, + "remote": {} +} diff --git a/handlers.client.ts b/handlers.client.ts index 8cf4900..0199335 100644 --- a/handlers.client.ts +++ b/handlers.client.ts @@ -18,6 +18,7 @@ import type { ServerMessageHandler, WSMessage, } from "./messages.ts"; +import { arrayBufferSerializer } from "./serializers.ts"; /** * Handler for the 'registered' server message. @@ -77,7 +78,7 @@ const onRequestData: ServerMessageHandler = async ( console.info("[req-data] req not found", message.id); return; } - await reqBody.send?.(message.payload); + await reqBody.send?.(message.chunk); }; /** @@ -146,7 +147,10 @@ async function handleWebSocket( ) { const ws = new WebSocket(new URL(message.url, state.localAddr)); try { - const wsCh = await makeWebSocket(ws, false); + const wsCh = await makeWebSocket( + ws, + arrayBufferSerializer(), + ); await state.ch.out.send({ type: "ws-opened", id: message.id, @@ -224,7 +228,7 @@ async function doFetch( await clientCh.send({ type: "data", id: request.id, - payload: chunk, + chunk, }); } if (signal.aborted) { diff --git a/handlers.server.ts b/handlers.server.ts index f2ddaf3..8687873 100644 --- a/handlers.server.ts +++ b/handlers.server.ts @@ -9,6 +9,7 @@ import type { WSConnectionClosed, WSMessage, } from "./messages.ts"; +import { arrayBufferSerializer } from "./serializers.ts"; /** * List of status codes that represent null bodies in responses. @@ -70,7 +71,7 @@ const data: ClientMessageHandler = async (state, message) => { } try { await request.responseBodyChan?.send( - message.payload, + message.chunk, ); } catch (_err) { console.log("Request was aborted", _err); @@ -137,9 +138,13 @@ const onWsOpened: ClientMessageHandler = async ( try { const { socket, response } = Deno.upgradeWebSocket(request.requestObject); request.responseObject.resolve(response); - const socketChan = await makeWebSocket( + const socketChan = await makeWebSocket< + ArrayBuffer, + ArrayBuffer, + ArrayBuffer + >( socket, - false, + arrayBufferSerializer(), ); request.webSocketChan = socketChan.out; (async () => { diff --git a/messages.ts b/messages.ts index bc90aac..e8dd48d 100644 --- a/messages.ts +++ b/messages.ts @@ -24,7 +24,7 @@ export interface ResponseStartMessage { export interface DataMessage { type: "data"; id: string; - payload: Uint8Array; + chunk: Uint8Array; } export interface DataEndMessage { @@ -70,7 +70,7 @@ export interface RequestDataEndMessage { export interface RequestDataMessage { type: "request-data"; id: string; - payload: Uint8Array; + chunk: Uint8Array; } export interface RegisteredMessage { type: "registered"; diff --git a/serializers.ts b/serializers.ts new file mode 100644 index 0000000..4a81896 --- /dev/null +++ b/serializers.ts @@ -0,0 +1,105 @@ +import type { Message, MessageSerializer } from "./channel.ts"; +import { ensureChunked } from "./server.ts"; + +export const jsonSerializer = (): MessageSerializer< + TSend, + TReceive, + string +> => { + return { + deserialize: (msg) => { + const parsed = JSON.parse(msg); + if (!("chunk" in parsed)) { + return parsed; + } + const { chunk, ...rest } = parsed; + return { + chunk: ensureChunked(chunk), + ...rest, + }; + }, + serialize: JSON.stringify, + }; +}; + +// Function to combine metadata and binary data +function createMessage( + metadata: unknown, + uint8Array?: Uint8Array, +): ArrayBuffer { + const metadataString = JSON.stringify(metadata); + const metadataUint8Array = new TextEncoder().encode(metadataString); + + // Create a buffer to hold the metadata length, metadata, and binary data + const buffer = new ArrayBuffer( + 4 + metadataUint8Array.length + (uint8Array?.length ?? 0), + ); + const view = new DataView(buffer); + + // Write the metadata length (4 bytes) + view.setUint32(0, metadataUint8Array.length, true); + + // Write the metadata + new Uint8Array(buffer, 4, metadataUint8Array.length).set(metadataUint8Array); + + // Write the binary data + uint8Array && + new Uint8Array(buffer, 4 + metadataUint8Array.length).set(uint8Array); + + return buffer; +} + +function parseMessage( + buffer: ArrayBuffer, + // deno-lint-ignore no-explicit-any +): { metadata: any; binaryData: Uint8Array } { + const view = new DataView(buffer); + + // Read the metadata length (4 bytes) + const metadataLength = view.getUint32(0, true); + + // Read the metadata + const metadataUint8Array = new Uint8Array(buffer, 4, metadataLength); + const metadataString = new TextDecoder().decode(metadataUint8Array); + const metadata = JSON.parse(metadataString); + + // Read the binary data + const binaryData = new Uint8Array(buffer, 4 + metadataLength); + + return { metadata, binaryData }; +} + +export const arrayBufferSerializer = (): MessageSerializer< + ArrayBuffer, + ArrayBuffer, + ArrayBuffer +> => { + return { + binaryType: "arraybuffer", + serialize: (msg) => msg, + deserialize: (msg) => msg, + }; +}; + +export const dataViewerSerializer = < + TSend, + TReceive, +>(): MessageSerializer< + TSend, + TReceive, + ArrayBuffer +> => { + return { + binaryType: "arraybuffer", + serialize: ({ chunk, ...rest }: Message) => { + return createMessage(rest, chunk); + }, + deserialize: (buffer: ArrayBuffer) => { + const parsed = parseMessage(buffer); + return { + ...parsed.metadata, + chunk: parsed.binaryData, + }; + }, + }; +}; diff --git a/server.ts b/server.ts index 1673122..a5b71f1 100644 --- a/server.ts +++ b/server.ts @@ -1,10 +1,12 @@ import { link, makeChan, makeChanStream, makeWebSocket } from "./channel.ts"; +import { CLIENT_VERSION_QUERY_STRING } from "./client.ts"; import { handleClientMessage } from "./handlers.server.ts"; import type { ClientMessage, ServerConnectionState, ServerMessage, } from "./messages.ts"; +import { dataViewerSerializer, jsonSerializer } from "./serializers.ts"; /** * Ensures that the given chunk is in the form of a Uint8Array. @@ -81,7 +83,16 @@ export const serveHandler = ( if (url.pathname === connectPath) { const { socket, response } = Deno.upgradeWebSocket(req); (async () => { - const ch = await makeWebSocket(socket); + const clientVersion = url.searchParams.get(CLIENT_VERSION_QUERY_STRING); + const ch = clientVersion === null + ? await makeWebSocket( + socket, + jsonSerializer(), + ) + : await makeWebSocket( + socket, + dataViewerSerializer(), + ); const clientId = crypto.randomUUID(); const hosts: string[] = []; const state: ServerConnectionState = { @@ -159,7 +170,7 @@ export const serveHandler = ( await ch.out.send({ type: "request-data", id: messageId, - payload: chunk, + chunk, }); } if (linked.aborted) {