Skip to content

Commit

Permalink
Add compat support for old clients (#2)
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Jun 2, 2024
1 parent 24638ee commit f71e099
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 79 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{}
94 changes: 27 additions & 67 deletions channel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Queue } from "./queue.ts";
import { jsonSerializer } from "./serializers.ts";

export interface Channel<T> {
closed: Promise<void>;
Expand Down Expand Up @@ -99,71 +100,41 @@ export interface DuplexChannel<TSend, TReceive> {

// deno-lint-ignore no-explicit-any
export type Message<TMessageProperties = any> = TMessageProperties & {
payload?: Uint8Array;
chunk?: Uint8Array;
};

// Function to combine metadata and binary data
function createMessage(
metadata: unknown,
uint8Array?: Uint8Array,
): ArrayBuffer {
const metadataString = JSON.stringify(metadata);
const metadataUint8Array = new TextEncoder().encode(metadataString);

// Create a buffer to hold the metadata length, metadata, and binary data
const buffer = new ArrayBuffer(
4 + metadataUint8Array.length + (uint8Array?.length ?? 0),
);
const view = new DataView(buffer);

// Write the metadata length (4 bytes)
view.setUint32(0, metadataUint8Array.length, true);

// Write the metadata
new Uint8Array(buffer, 4, metadataUint8Array.length).set(metadataUint8Array);

// Write the binary data
uint8Array &&
new Uint8Array(buffer, 4 + metadataUint8Array.length).set(uint8Array);

return buffer;
}

function parseMessage(
buffer: ArrayBuffer,
// deno-lint-ignore no-explicit-any
): { metadata: any; binaryData: Uint8Array } {
const view = new DataView(buffer);

// Read the metadata length (4 bytes)
const metadataLength = view.getUint32(0, true);

// Read the metadata
const metadataUint8Array = new Uint8Array(buffer, 4, metadataLength);
const metadataString = new TextDecoder().decode(metadataUint8Array);
const metadata = JSON.parse(metadataString);

// Read the binary data
const binaryData = new Uint8Array(buffer, 4 + metadataLength);

return { metadata, binaryData };
export interface MessageSerializer<
TSend,
TReceive,
TRawFormat extends string | ArrayBufferLike | ArrayBufferView | Blob,
> {
binaryType?: BinaryType;
serialize: (
msg: Message<TSend>,
) => TRawFormat;
deserialize: (str: TRawFormat) => Message<TReceive>;
}

export const makeWebSocket = <
TSend,
TReceive,
TMessageSend = Message<TSend>,
TMessageRecieve = Message<TReceive>,
TMessageFormat extends string | ArrayBufferLike | ArrayBufferView | Blob =
| string
| ArrayBufferLike
| ArrayBufferView
| Blob,
>(
socket: WebSocket,
parse: boolean = true,
): Promise<DuplexChannel<TMessageSend, TMessageRecieve>> => {
const sendChan = makeChan<TMessageSend>();
const recvChan = makeChan<TMessageRecieve>();
_serializer?: MessageSerializer<TSend, TReceive, TMessageFormat>,
): Promise<DuplexChannel<Message<TSend>, Message<TReceive>>> => {
const serializer = _serializer ??
jsonSerializer<Message<TSend>, Message<TReceive>>();
const sendChan = makeChan<Message<TSend>>();
const recvChan = makeChan<Message<TReceive>>();
const ch = Promise.withResolvers<
DuplexChannel<TMessageSend, TMessageRecieve>
DuplexChannel<Message<TSend>, Message<TReceive>>
>();
socket.binaryType = "arraybuffer";
socket.binaryType = serializer.binaryType ?? "blob";
socket.onclose = () => {
sendChan.close();
recvChan.close();
Expand All @@ -176,25 +147,14 @@ export const makeWebSocket = <
if (recvChan.signal.aborted) {
return;
}
if (!parse) {
await recvChan.send(msg.data);
return;
}
const { binaryData, metadata } = parseMessage(msg.data);
await recvChan.send({ ...metadata, payload: binaryData });
await recvChan.send(serializer.deserialize(msg.data));
};
socket.onopen = async () => {
ch.resolve({ in: recvChan, out: sendChan });
for await (const message of sendChan.recv()) {
try {
if (!parse) {
socket.send(message as unknown as ArrayBuffer);
continue;
}
const { payload, ...rest } = message as { payload?: Uint8Array };
const msg = createMessage(rest, payload);
socket.send(
msg,
serializer.serialize(message),
);
} catch (_err) {
console.error("error sending message through socket", message);
Expand Down
12 changes: 10 additions & 2 deletions client.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { type Channel, makeWebSocket } from "./channel.ts";
import denoJSON from "./deno.json" with { type: "json" };
import { handleServerMessage } from "./handlers.client.ts";
import type { ClientMessage, ClientState, ServerMessage } from "./messages.ts";
import { dataViewerSerializer } from "./serializers.ts";

export const CLIENT_VERSION_QUERY_STRING = "v";
/**
* Options for establishing a connection.
* @typedef {Object} ConnectOptions
Expand Down Expand Up @@ -49,8 +52,13 @@ export const connectMainThread = async (
})
: undefined;

const socket = new WebSocket(`${opts.server}/_connect`);
const ch = await makeWebSocket<ClientMessage, ServerMessage>(socket);
const socket = new WebSocket(
`${opts.server}/_connect?${CLIENT_VERSION_QUERY_STRING}=${denoJSON.version}`,
);
const ch = await makeWebSocket<ClientMessage, ServerMessage, ArrayBuffer>(
socket,
dataViewerSerializer(),
);
await ch.out.send({
id: crypto.randomUUID(),
type: "register",
Expand Down
14 changes: 14 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions handlers.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type {
ServerMessageHandler,
WSMessage,
} from "./messages.ts";
import { arrayBufferSerializer } from "./serializers.ts";

/**
* Handler for the 'registered' server message.
Expand Down Expand Up @@ -77,7 +78,7 @@ const onRequestData: ServerMessageHandler<RequestDataMessage> = async (
console.info("[req-data] req not found", message.id);
return;
}
await reqBody.send?.(message.payload);
await reqBody.send?.(message.chunk);
};

/**
Expand Down Expand Up @@ -146,7 +147,10 @@ async function handleWebSocket(
) {
const ws = new WebSocket(new URL(message.url, state.localAddr));
try {
const wsCh = await makeWebSocket<ArrayBuffer, ArrayBuffer>(ws, false);
const wsCh = await makeWebSocket<ArrayBuffer, ArrayBuffer, ArrayBuffer>(
ws,
arrayBufferSerializer(),
);
await state.ch.out.send({
type: "ws-opened",
id: message.id,
Expand Down Expand Up @@ -224,7 +228,7 @@ async function doFetch(
await clientCh.send({
type: "data",
id: request.id,
payload: chunk,
chunk,
});
}
if (signal.aborted) {
Expand Down
11 changes: 8 additions & 3 deletions handlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import type {
WSConnectionClosed,
WSMessage,
} from "./messages.ts";
import { arrayBufferSerializer } from "./serializers.ts";

/**
* List of status codes that represent null bodies in responses.
Expand Down Expand Up @@ -70,7 +71,7 @@ const data: ClientMessageHandler<DataMessage> = async (state, message) => {
}
try {
await request.responseBodyChan?.send(
message.payload,
message.chunk,
);
} catch (_err) {
console.log("Request was aborted", _err);
Expand Down Expand Up @@ -137,9 +138,13 @@ const onWsOpened: ClientMessageHandler<DataEndMessage> = async (
try {
const { socket, response } = Deno.upgradeWebSocket(request.requestObject);
request.responseObject.resolve(response);
const socketChan = await makeWebSocket<ArrayBuffer, ArrayBuffer>(
const socketChan = await makeWebSocket<
ArrayBuffer,
ArrayBuffer,
ArrayBuffer
>(
socket,
false,
arrayBufferSerializer(),
);
request.webSocketChan = socketChan.out;
(async () => {
Expand Down
4 changes: 2 additions & 2 deletions messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export interface ResponseStartMessage {
export interface DataMessage {
type: "data";
id: string;
payload: Uint8Array;
chunk: Uint8Array;
}

export interface DataEndMessage {
Expand Down Expand Up @@ -70,7 +70,7 @@ export interface RequestDataEndMessage {
export interface RequestDataMessage {
type: "request-data";
id: string;
payload: Uint8Array;
chunk: Uint8Array;
}
export interface RegisteredMessage {
type: "registered";
Expand Down
105 changes: 105 additions & 0 deletions serializers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import type { Message, MessageSerializer } from "./channel.ts";
import { ensureChunked } from "./server.ts";

export const jsonSerializer = <TSend, TReceive>(): MessageSerializer<
TSend,
TReceive,
string
> => {
return {
deserialize: (msg) => {
const parsed = JSON.parse(msg);
if (!("chunk" in parsed)) {
return parsed;
}
const { chunk, ...rest } = parsed;
return {
chunk: ensureChunked(chunk),
...rest,
};
},
serialize: JSON.stringify,
};
};

// Function to combine metadata and binary data
function createMessage(
metadata: unknown,
uint8Array?: Uint8Array,
): ArrayBuffer {
const metadataString = JSON.stringify(metadata);
const metadataUint8Array = new TextEncoder().encode(metadataString);

// Create a buffer to hold the metadata length, metadata, and binary data
const buffer = new ArrayBuffer(
4 + metadataUint8Array.length + (uint8Array?.length ?? 0),
);
const view = new DataView(buffer);

// Write the metadata length (4 bytes)
view.setUint32(0, metadataUint8Array.length, true);

// Write the metadata
new Uint8Array(buffer, 4, metadataUint8Array.length).set(metadataUint8Array);

// Write the binary data
uint8Array &&
new Uint8Array(buffer, 4 + metadataUint8Array.length).set(uint8Array);

return buffer;
}

function parseMessage(
buffer: ArrayBuffer,
// deno-lint-ignore no-explicit-any
): { metadata: any; binaryData: Uint8Array } {
const view = new DataView(buffer);

// Read the metadata length (4 bytes)
const metadataLength = view.getUint32(0, true);

// Read the metadata
const metadataUint8Array = new Uint8Array(buffer, 4, metadataLength);
const metadataString = new TextDecoder().decode(metadataUint8Array);
const metadata = JSON.parse(metadataString);

// Read the binary data
const binaryData = new Uint8Array(buffer, 4 + metadataLength);

return { metadata, binaryData };
}

export const arrayBufferSerializer = (): MessageSerializer<
ArrayBuffer,
ArrayBuffer,
ArrayBuffer
> => {
return {
binaryType: "arraybuffer",
serialize: (msg) => msg,
deserialize: (msg) => msg,
};
};

export const dataViewerSerializer = <
TSend,
TReceive,
>(): MessageSerializer<
TSend,
TReceive,
ArrayBuffer
> => {
return {
binaryType: "arraybuffer",
serialize: ({ chunk, ...rest }: Message<TSend>) => {
return createMessage(rest, chunk);
},
deserialize: (buffer: ArrayBuffer) => {
const parsed = parseMessage(buffer);
return {
...parsed.metadata,
chunk: parsed.binaryData,
};
},
};
};
Loading

0 comments on commit f71e099

Please sign in to comment.