Skip to content

Commit

Permalink
Support req abort event (#4)
Browse files Browse the repository at this point in the history
* Support req abort event

Signed-off-by: Marcos Candeia <[email protected]>

* Bump version

Signed-off-by: Marcos Candeia <[email protected]>

---------

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Aug 22, 2024
1 parent c30514a commit a6023d0
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![JSR](https://jsr.io/badges/@deco/warp)](https://jsr.io/@deco/warp)
[![JSR](https://jsr.io/badges/@deco/warp)](https://jsr.io/@deco/warp)
[![JSR Score](https://jsr.io/badges/@deco/warp/score)](https://jsr.io/@deco/warp)

# Warp
Expand Down
5 changes: 2 additions & 3 deletions client.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type Channel, makeWebSocket } from "./channel.ts";
import { makeWebSocket } from "./channel.ts";
import denoJSON from "./deno.json" with { type: "json" };
import { handleServerMessage } from "./handlers.client.ts";
import type { ClientMessage, ClientState, ServerMessage } from "./messages.ts";
Expand Down Expand Up @@ -62,7 +62,6 @@ export const connectMainThread = async (
apiKey: opts.apiKey,
domain: opts.domain,
});
const requestBody: Record<string, Channel<Uint8Array>> = {};
const wsSockets: Record<string, WebSocket> = {};

(async () => {
Expand All @@ -71,7 +70,7 @@ export const connectMainThread = async (
client,
localAddr: opts.localAddr,
live: false,
requestBody,
requests: {},
wsSockets,
ch,
};
Expand Down
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@deco/warp",
"version": "0.3.4",
"version": "0.3.5",
"exports": "./mod.ts",
"tasks": {
"check": "deno fmt && deno lint && deno check mod.ts"
Expand Down
33 changes: 27 additions & 6 deletions handlers.client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
type Channel,
ignoreIfClosed,
link,
makeChan,
makeChanStream,
makeReadableStream,
Expand All @@ -11,6 +12,7 @@ import type {
ClientState,
ErrorMessage,
RegisteredMessage,
RequestAbortedMessage,
RequestDataEndMessage,
RequestDataMessage,
RequestStartMessage,
Expand Down Expand Up @@ -49,17 +51,22 @@ const onRequestStart: ServerMessageHandler<RequestStartMessage> = async (
await handleWebSocket(message, state);
return;
}
const abortCtrl = new AbortController();
state.requests[message.id] = { abortCtrl };
if (!message.hasBody) {
doFetch(message, state, state.ch.out).catch(ignoreIfClosed);
doFetch(message, state, state.ch.out, abortCtrl.signal).catch(
ignoreIfClosed,
);
} else {
const bodyData = makeChan<Uint8Array>();
state.requestBody[message.id] = bodyData;
state.requests[message.id]!.body = bodyData;
doFetch(
{ ...message, body: makeReadableStream(bodyData) },
state,
state.ch.out,
abortCtrl.signal,
).catch(ignoreIfClosed).finally(() => {
delete state.requestBody[message.id];
delete state.requests[message.id];
});
}
};
Expand All @@ -73,14 +80,26 @@ const onRequestData: ServerMessageHandler<RequestDataMessage> = async (
state,
message,
) => {
const reqBody = state.requestBody[message.id];
const reqBody = state.requests[message.id]?.body;
if (!reqBody) {
console.info("[req-data] req not found", message.id);
return;
}
await reqBody.send?.(message.chunk);
};

/**
* Handler for the 'request-aborted' server message.
* @param {ClientState} state - The client state.
* @param {RequestAbortedMessage} message - The message data.
*/
const onRequestAborted: ServerMessageHandler<RequestAbortedMessage> = (
state,
message,
) => {
state.requests[message.id]?.abortCtrl?.abort();
};

/**
* Handler for the 'request-data-end' server message.
* @param {ClientState} state - The client state.
Expand All @@ -90,7 +109,7 @@ const onRequestDataEnd: ServerMessageHandler<RequestDataEndMessage> = (
state,
message,
) => {
const reqBody = state.requestBody[message.id];
const reqBody = state.requests[message.id]?.body;
if (!reqBody) {
return;
}
Expand Down Expand Up @@ -129,6 +148,7 @@ const handlersByType: Record<ServerMessage["type"], ServerMessageHandler<any>> =
{
registered,
error,
"request-aborted": onRequestAborted,
"request-start": onRequestStart,
"request-data": onRequestData,
"request-end": onRequestDataEnd,
Expand Down Expand Up @@ -204,9 +224,10 @@ async function doFetch(
request: RequestStartMessage & { body?: ReadableStream },
state: ClientState,
clientCh: Channel<ClientMessage>,
reqSignal: AbortSignal,
) {
// Read from the stream
const signal = clientCh.signal;
const signal = link(clientCh.signal, reqSignal);
try {
const response = await fetch(
new URL(request.url, state.localAddr),
Expand Down
12 changes: 11 additions & 1 deletion messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ export interface RequestDataMessage {
id: string;
chunk: Uint8Array;
}

export interface RequestAbortedMessage {
type: "request-aborted";
id: string;
}
export interface RegisteredMessage {
type: "registered";
id: string;
Expand All @@ -82,6 +87,7 @@ export interface ErrorMessage {
message: string;
}
export type ServerMessage =
| RequestAbortedMessage
| WSMessage
| WSConnectionClosed
| RequestStartMessage
Expand All @@ -90,9 +96,13 @@ export type ServerMessage =
| RegisteredMessage
| ErrorMessage;

export interface RequestState {
body?: Channel<ArrayBuffer>;
abortCtrl: AbortController;
}
export interface ClientState {
ch: DuplexChannel<ClientMessage, ServerMessage>;
requestBody: Record<string, Channel<ArrayBuffer>>;
requests: Record<string, RequestState | undefined>;
wsSockets: Record<string, WebSocket>;
live: boolean;
localAddr: string;
Expand Down
8 changes: 8 additions & 0 deletions server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ export const serveHandler = (
await ch.out.send(requestForward);
const dataChan = req.body ? makeChanStream(req.body) : undefined;
const linked = link(ch.out.signal, req.signal);
req.signal.addEventListener("abort", () => {
if (!ch.out.signal.aborted) {
ch.out.send({
type: "request-aborted",
id: messageId,
}).catch(() => {});
}
});
(async () => {
try {
for await (const chunk of dataChan?.recv(linked) ?? []) {
Expand Down

0 comments on commit a6023d0

Please sign in to comment.