Skip to content

Commit

Permalink
Use signals to sinalize request aborts
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed May 29, 2024
1 parent 196e9f6 commit bad7efb
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 48 deletions.
27 changes: 21 additions & 6 deletions channel.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,23 @@
import { Queue } from "./queue.ts";

export interface Channel<T> {
closed: Promise<void>;
signal: AbortSignal;
close(): void;
send(value: T): Promise<void>;
recv(): AsyncIterableIterator<T>;
recv(signal?: AbortSignal): AsyncIterableIterator<T>;
}

export const link = (...signals: AbortSignal[]): AbortSignal => {
const ctrl = new AbortController();
for (const signal of signals) {
signal.onabort = (evt) => {
if (!ctrl.signal.aborted) {
ctrl.abort(evt);
}
}
}
return ctrl.signal;
}

export class ClosedChannelError extends Error {
Expand Down Expand Up @@ -38,25 +52,26 @@ export const makeChan = <T>(): Channel<T> => {
ctrl.abort();
};

const recv = async function* (): AsyncIterableIterator<T> {
const recv = async function* (signal?: AbortSignal): AsyncIterableIterator<T> {
const linked = signal ? link(ctrl.signal, signal) : ctrl.signal;
while (true) {
if (ctrl.signal.aborted) {
if (linked.aborted) {
return;
}
try {
const next = await queue.pop({ signal: ctrl.signal });
const next = await queue.pop({ signal: linked });
next.resolve();
yield next.value;
} catch (_err) {
if (ctrl.signal.aborted) {
if (linked.aborted) {
return;
}
throw _err;
}
}
};

return { send, recv, close };
return { send, recv, close, signal: ctrl.signal, closed: abortPromise.promise };
};

export interface DuplexChannel<TSend, TReceive> {
Expand Down
4 changes: 2 additions & 2 deletions deno.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "@deco/warp",
"version": "0.1.2",
"version": "0.1.3",
"exports": "./mod.ts",
"tasks": {
"start": "deno run -A main.ts"
"check": "deno fmt && deno lint && deno check mod.ts"
}
}
76 changes: 46 additions & 30 deletions handlers.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,30 @@ async function handleWebSocket(message: RequestStartMessage, state: ClientState)
state.wsMessages[message.id] = wsCh.out;
(async () => {
try {
for await (const data of wsCh.in.recv()) {
for await (const data of wsCh.in.recv(state.ch.out.signal)) {
await state.ch.out.send({
type: "ws-message",
data,
id: message.id,
});
}
if (state.ch.out.signal.aborted) {
return;
}
await state.ch.out.send({
type: "ws-closed",
id: message.id,
});
} catch (_err) {
// ignore
} finally {
} catch (error) {
if (state.ch.out.signal.aborted) {
return;
}
await state.ch.out.send({
type: "ws-closed",
id: message.id,
}).catch(_err => { });
}).catch(ignoreIfClosed);
throw error;
} finally {
delete state.wsMessages[message.id];
}
})();
Expand All @@ -143,7 +149,7 @@ async function handleWebSocket(message: RequestStartMessage, state: ClientState)
type: "data-end",
error: err,
id: message.id
}).catch(console.error);
}).catch(ignoreIfClosed);
}
}

Expand All @@ -155,34 +161,44 @@ async function handleWebSocket(message: RequestStartMessage, state: ClientState)
*/
async function doFetch(request: RequestStartMessage & { body?: ReadableStream; }, state: ClientState, clientCh: Channel<ClientMessage>) {
// Read from the stream
const response = await fetch(new URL(request.url, state.localAddr), {
...state.client ? { client: state.client } : {},
method: request.method,
headers: request.headers,
body: request.body,
});
await clientCh.send({
type: "response-start",
id: request.id,
statusCode: response.status,
statusMessage: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
})
const body = response?.body;
const stream = body ? makeChanStream(body) : undefined;
for await (const chunk of stream?.recv() ?? []) {
const signal = clientCh.signal;
try {
const response = await fetch(new URL(request.url, state.localAddr), {
...state.client ? { client: state.client } : {},
method: request.method,
headers: request.headers,
body: request.body,
signal,
});
await clientCh.send({
type: "data",
type: "response-start",
id: request.id,
statusCode: response.status,
statusMessage: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
})
const body = response?.body;
const stream = body ? makeChanStream(body) : undefined;
for await (const chunk of stream?.recv(signal) ?? []) {
await clientCh.send({
type: "data",
id: request.id,
chunk,
});
}
if (signal.aborted) {
return;
}
await clientCh.send({
type: "data-end",
id: request.id,
chunk,
});
} catch (err) {
if (signal.aborted) {
return;
}
throw err;
}
await clientCh.send({
type: "data-end",
id: request.id,
});

return response;
}

/**
Expand Down
13 changes: 10 additions & 3 deletions handlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,21 @@ const onWsOpened: ClientMessageHandler<DataEndMessage> = async (state, message)
const socketChan = await makeWebSocket<ArrayBuffer, ArrayBuffer>(socket, false);
request.socketChan = socketChan.out;
(async () => {
const signal = state.ch.out.signal;
try {
for await (const msg of socketChan.in.recv()) {
for await (const msg of socketChan.in.recv(signal)) {
await state.ch.out.send({ type: "ws-message", id: message.id, data: msg });
}
if (signal.aborted) {
return;
}
await state.ch.out.send({ type: "ws-closed", id: message.id })
socket.close();
} catch (error) {
console.log("sending through a closed channel error", error, message);
if (signal.aborted) {
console.log("sending through a closed channel error", error, message);
} else {
console.error(`unexpected error when handling websocket message`, error, message);
}
} finally {
try {
socket.close();
Expand Down
26 changes: 19 additions & 7 deletions server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,20 +88,32 @@ export const start = (options?: ServerOptions): Deno.HttpServer<Deno.NetAddr> =>
dataChan: makeChan(),
}
try {
const signal = ch.out.signal;
await ch.out.send(requestForward);
const dataChan = req.body ? makeChanStream(req.body) : undefined;
(async () => {
for await (const chunk of dataChan?.recv() ?? []) {
try {
for await (const chunk of dataChan?.recv(signal) ?? []) {
await ch.out.send({
type: "request-data",
id: messageId,
chunk,
});
}
if (signal.aborted) {
return;
}
await ch.out.send({
type: "request-data",
type: "request-end",
id: messageId,
chunk,
});
} catch (err) {
responseObject.resolve(new Response("Error sending request to remote client", { status: 503 }));
if (signal.aborted) {
return;
}
console.log(`unexpected error when sending request`, err, req, messageId);
}
await ch.out.send({
type: "request-end",
id: messageId,
});
})()
return responseObject.promise;
} catch (err) {
Expand Down

0 comments on commit bad7efb

Please sign in to comment.