From bad7efb78704ae5d03eed3b140a214fdc4515ce5 Mon Sep 17 00:00:00 2001 From: Marcos Candeia Date: Wed, 29 May 2024 09:12:39 -0300 Subject: [PATCH] Use signals to sinalize request aborts Signed-off-by: Marcos Candeia --- channel.ts | 27 ++++++++++++---- deno.json | 4 +-- handlers.client.ts | 76 ++++++++++++++++++++++++++++------------------ handlers.server.ts | 13 ++++++-- server.ts | 26 +++++++++++----- 5 files changed, 98 insertions(+), 48 deletions(-) diff --git a/channel.ts b/channel.ts index c2741e2..17a2a63 100644 --- a/channel.ts +++ b/channel.ts @@ -1,9 +1,23 @@ import { Queue } from "./queue.ts"; export interface Channel { + closed: Promise; + signal: AbortSignal; close(): void; send(value: T): Promise; - recv(): AsyncIterableIterator; + recv(signal?: AbortSignal): AsyncIterableIterator; +} + +export const link = (...signals: AbortSignal[]): AbortSignal => { + const ctrl = new AbortController(); + for (const signal of signals) { + signal.onabort = (evt) => { + if (!ctrl.signal.aborted) { + ctrl.abort(evt); + } + } + } + return ctrl.signal; } export class ClosedChannelError extends Error { @@ -38,17 +52,18 @@ export const makeChan = (): Channel => { ctrl.abort(); }; - const recv = async function* (): AsyncIterableIterator { + const recv = async function* (signal?: AbortSignal): AsyncIterableIterator { + const linked = signal ? link(ctrl.signal, signal) : ctrl.signal; while (true) { - if (ctrl.signal.aborted) { + if (linked.aborted) { return; } try { - const next = await queue.pop({ signal: ctrl.signal }); + const next = await queue.pop({ signal: linked }); next.resolve(); yield next.value; } catch (_err) { - if (ctrl.signal.aborted) { + if (linked.aborted) { return; } throw _err; @@ -56,7 +71,7 @@ export const makeChan = (): Channel => { } }; - return { send, recv, close }; + return { send, recv, close, signal: ctrl.signal, closed: abortPromise.promise }; }; export interface DuplexChannel { diff --git a/deno.json b/deno.json index 9e1b251..f019a3a 100644 --- a/deno.json +++ b/deno.json @@ -1,8 +1,8 @@ { "name": "@deco/warp", - "version": "0.1.2", + "version": "0.1.3", "exports": "./mod.ts", "tasks": { - "start": "deno run -A main.ts" + "check": "deno fmt && deno lint && deno check mod.ts" } } diff --git a/handlers.client.ts b/handlers.client.ts index ffcb46b..a46beba 100644 --- a/handlers.client.ts +++ b/handlers.client.ts @@ -117,24 +117,30 @@ async function handleWebSocket(message: RequestStartMessage, state: ClientState) state.wsMessages[message.id] = wsCh.out; (async () => { try { - for await (const data of wsCh.in.recv()) { + for await (const data of wsCh.in.recv(state.ch.out.signal)) { await state.ch.out.send({ type: "ws-message", data, id: message.id, }); } + if (state.ch.out.signal.aborted) { + return; + } await state.ch.out.send({ type: "ws-closed", id: message.id, }); - } catch (_err) { - // ignore - } finally { + } catch (error) { + if (state.ch.out.signal.aborted) { + return; + } await state.ch.out.send({ type: "ws-closed", id: message.id, - }).catch(_err => { }); + }).catch(ignoreIfClosed); + throw error; + } finally { delete state.wsMessages[message.id]; } })(); @@ -143,7 +149,7 @@ async function handleWebSocket(message: RequestStartMessage, state: ClientState) type: "data-end", error: err, id: message.id - }).catch(console.error); + }).catch(ignoreIfClosed); } } @@ -155,34 +161,44 @@ async function handleWebSocket(message: RequestStartMessage, state: ClientState) */ async function doFetch(request: RequestStartMessage & { body?: ReadableStream; }, state: ClientState, clientCh: Channel) { // Read from the stream - const response = await fetch(new URL(request.url, state.localAddr), { - ...state.client ? { client: state.client } : {}, - method: request.method, - headers: request.headers, - body: request.body, - }); - await clientCh.send({ - type: "response-start", - id: request.id, - statusCode: response.status, - statusMessage: response.statusText, - headers: Object.fromEntries(response.headers.entries()), - }) - const body = response?.body; - const stream = body ? makeChanStream(body) : undefined; - for await (const chunk of stream?.recv() ?? []) { + const signal = clientCh.signal; + try { + const response = await fetch(new URL(request.url, state.localAddr), { + ...state.client ? { client: state.client } : {}, + method: request.method, + headers: request.headers, + body: request.body, + signal, + }); await clientCh.send({ - type: "data", + type: "response-start", + id: request.id, + statusCode: response.status, + statusMessage: response.statusText, + headers: Object.fromEntries(response.headers.entries()), + }) + const body = response?.body; + const stream = body ? makeChanStream(body) : undefined; + for await (const chunk of stream?.recv(signal) ?? []) { + await clientCh.send({ + type: "data", + id: request.id, + chunk, + }); + } + if (signal.aborted) { + return; + } + await clientCh.send({ + type: "data-end", id: request.id, - chunk, }); + } catch (err) { + if (signal.aborted) { + return; + } + throw err; } - await clientCh.send({ - type: "data-end", - id: request.id, - }); - - return response; } /** diff --git a/handlers.server.ts b/handlers.server.ts index 0c387a0..b882b51 100644 --- a/handlers.server.ts +++ b/handlers.server.ts @@ -126,14 +126,21 @@ const onWsOpened: ClientMessageHandler = async (state, message) const socketChan = await makeWebSocket(socket, false); request.socketChan = socketChan.out; (async () => { + const signal = state.ch.out.signal; try { - for await (const msg of socketChan.in.recv()) { + for await (const msg of socketChan.in.recv(signal)) { await state.ch.out.send({ type: "ws-message", id: message.id, data: msg }); } + if (signal.aborted) { + return; + } await state.ch.out.send({ type: "ws-closed", id: message.id }) - socket.close(); } catch (error) { - console.log("sending through a closed channel error", error, message); + if (signal.aborted) { + console.log("sending through a closed channel error", error, message); + } else { + console.error(`unexpected error when handling websocket message`, error, message); + } } finally { try { socket.close(); diff --git a/server.ts b/server.ts index faeda57..85cae07 100644 --- a/server.ts +++ b/server.ts @@ -88,20 +88,32 @@ export const start = (options?: ServerOptions): Deno.HttpServer => dataChan: makeChan(), } try { + const signal = ch.out.signal; await ch.out.send(requestForward); const dataChan = req.body ? makeChanStream(req.body) : undefined; (async () => { - for await (const chunk of dataChan?.recv() ?? []) { + try { + for await (const chunk of dataChan?.recv(signal) ?? []) { + await ch.out.send({ + type: "request-data", + id: messageId, + chunk, + }); + } + if (signal.aborted) { + return; + } await ch.out.send({ - type: "request-data", + type: "request-end", id: messageId, - chunk, }); + } catch (err) { + responseObject.resolve(new Response("Error sending request to remote client", { status: 503 })); + if (signal.aborted) { + return; + } + console.log(`unexpected error when sending request`, err, req, messageId); } - await ch.out.send({ - type: "request-end", - id: messageId, - }); })() return responseObject.promise; } catch (err) {