Skip to content

Commit

Permalink
Fix/warp handler message (#5)
Browse files Browse the repository at this point in the history
* Fix warp message closed

Signed-off-by: Marcos Candeia <[email protected]>

* Fix server stream cancel

Signed-off-by: Marcos Candeia <[email protected]>

* Add error description

Signed-off-by: Marcos Candeia <[email protected]>

* Ignore abort error

Signed-off-by: Marcos Candeia <[email protected]>

* Close chan

Signed-off-by: Marcos Candeia <[email protected]>

* Check if target is aborted

Signed-off-by: Marcos Candeia <[email protected]>

* Add signal to makeReadableStream

Signed-off-by: Marcos Candeia <[email protected]>

* Controller error when signal is aborted

Signed-off-by: Marcos Candeia <[email protected]>

* Add controller comment to allow error

Signed-off-by: Marcos Candeia <[email protected]>

* bump version

Signed-off-by: Marcos Candeia <[email protected]>

---------

Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia authored Aug 22, 2024
1 parent a6023d0 commit 262b692
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
13 changes: 11 additions & 2 deletions channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,17 @@ export const makeWebSocket = <

export const makeReadableStream = (
ch: Channel<Uint8Array>,
signal?: AbortSignal,
): ReadableStream<Uint8Array> => {
return new ReadableStream({
async start(controller) {
for await (const content of ch.recv()) {
for await (const content of ch.recv(signal)) {
controller.enqueue(content);
}
// Uncomment if necessary. this will send a signal to the controller
// if (signal?.aborted) {
// controller.error(new Error("aborted"));
// }
controller.close();
},
cancel() {
Expand All @@ -195,6 +200,10 @@ export const makeChanStream = (
}
chan.close();
};
processStream().catch(console.error);
processStream().catch((err) => {
if (!err?.target?.aborted) {
console.error("error processing stream", err);
}
});
return chan;
};
2 changes: 1 addition & 1 deletion deno.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@deco/warp",
"version": "0.3.5",
"version": "0.3.6",
"exports": "./mod.ts",
"tasks": {
"check": "deno fmt && deno lint && deno check mod.ts"
Expand Down
4 changes: 3 additions & 1 deletion handlers.client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ const onRequestStart: ServerMessageHandler<RequestStartMessage> = async (
if (!message.hasBody) {
doFetch(message, state, state.ch.out, abortCtrl.signal).catch(
ignoreIfClosed,
);
).finally(() => {
delete state.requests[message.id];
});
} else {
const bodyData = makeChan<Uint8Array>();
state.requests[message.id]!.body = bodyData;
Expand Down
5 changes: 3 additions & 2 deletions handlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const onResponseStart: ClientMessageHandler<ResponseStartMessage> = (
);
const shouldBeNullBody = NULL_BODIES.includes(message.statusCode);
const stream = !shouldBeNullBody && request.responseBodyChan
? makeReadableStream(request.responseBodyChan)
? makeReadableStream(request.responseBodyChan, state.ch.out.signal)
: undefined;
const resp = new Response(stream, {
status: message.statusCode,
Expand All @@ -62,6 +62,7 @@ const onResponseStart: ClientMessageHandler<ResponseStartMessage> = (
request.responseObject.reject(
new DOMException("Connection closed", "AbortError"),
);
request?.responseBodyChan?.close?.();
}
});
request.responseObject.resolve(resp);
Expand Down Expand Up @@ -105,7 +106,7 @@ const onDataEnd: ClientMessageHandler<DataEndMessage> = (state, message) => {
try {
request.responseBodyChan?.close?.();
} catch (_err) {
console.log(_err);
console.log(`error closing body chan`, _err);
}
};

Expand Down

0 comments on commit 262b692

Please sign in to comment.