Skip to content

Commit

Permalink
Fixes linter
Browse files Browse the repository at this point in the history
Signed-off-by: Marcos Candeia <[email protected]>
  • Loading branch information
mcandeia committed May 30, 2024
1 parent ba17ff5 commit 09ac127
Show file tree
Hide file tree
Showing 12 changed files with 810 additions and 643 deletions.
40 changes: 28 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
# Warp

**Warp** is a simple tool that allows your locally running HTTP(s) servers to have a public URL, serving as an easy-to-self-host alternative to services like `ngrok`. Warp is implemented in Deno with the goal of providing flexibility and minimal dependencies.
**Warp** is a simple tool that allows your locally running HTTP(s) servers to
have a public URL, serving as an easy-to-self-host alternative to services like
`ngrok`. Warp is implemented in Deno with the goal of providing flexibility and
minimal dependencies.

The project has two main components:

- **Server**: Deployable on a server, it connects to the outside world and is accessible from any domain.
- **Client**: Runs locally to connect a given HTTP endpoint running on a local or non-public network.
- **Server**: Deployable on a server, it connects to the outside world and is
accessible from any domain.
- **Client**: Runs locally to connect a given HTTP endpoint running on a local
or non-public network.

<img width="1390" alt="image" src="https://github.com/deco-cx/warp/assets/5839364/914ab723-02cf-4a1a-9799-72671ffa5974">

## Server

The Warp server opens a single HTTP port to which the Warp client connects and upgrades to a WebSocket connection. Each request to this HTTP port is forwarded (based on the client's HOST header) to the corresponding connected Warp client connection, which then serves the request.
The Warp server opens a single HTTP port to which the Warp client connects and
upgrades to a WebSocket connection. Each request to this HTTP port is forwarded
(based on the client's HOST header) to the corresponding connected Warp client
connection, which then serves the request.

### Usage

To start the Warp server, import the `serve` function from the Warp package and call it with the appropriate configuration.
To start the Warp server, import the `serve` function from the Warp package and
call it with the appropriate configuration.

#### Example

Expand All @@ -35,11 +44,13 @@ serve({ port, apiKeys });

## Client

The Warp client connects to the Warp server. Upon connection, the client shares the given API key and the domain it wants to receive requests for.
The Warp client connects to the Warp server. Upon connection, the client shares
the given API key and the domain it wants to receive requests for.

### Usage

To connect a client to the Warp server, import the `connect` function from the Warp package and call it with the appropriate configuration.
To connect a client to the Warp server, import the `connect` function from the
Warp package and call it with the appropriate configuration.

#### Example

Expand Down Expand Up @@ -69,13 +80,15 @@ closed.then(() => {
#### Parameters

- `domain`: The domain name that will be used to access your localhost service.
- `localAddr`: The local address of the service you want to expose (e.g., `http://localhost:3000`).
- `localAddr`: The local address of the service you want to expose (e.g.,
`http://localhost:3000`).
- `server`: The WebSocket URL of your Warp server (e.g., `wss://YOUR_SERVER`).
- `apiKey`: The apiKey for connecting to the Warp server.

#### Return Values

- `registered`: A promise that resolves when the client has successfully registered with the server.
- `registered`: A promise that resolves when the client has successfully
registered with the server.
- `closed`: A promise that resolves when the connection to the server is closed.

## Example Workflow
Expand Down Expand Up @@ -124,6 +137,9 @@ const apiKey = "API_KEY";

### Common Issues

- **Invalid API Key**: Ensure that the API key you are using is listed in the `apiKeys` array on the server.
- **Connection Refused**: Check that the server is running and accessible at the specified WebSocket URL.
- **Domain Not Accessible**: Ensure that the domain name is correctly configured and pointing to the Warp server.
- **Invalid API Key**: Ensure that the API key you are using is listed in the
`apiKeys` array on the server.
- **Connection Refused**: Check that the server is running and accessible at the
specified WebSocket URL.
- **Domain Not Accessible**: Ensure that the domain name is correctly configured
and pointing to the Warp server.
252 changes: 133 additions & 119 deletions channel.ts
Original file line number Diff line number Diff line change
@@ -1,149 +1,163 @@
import { Queue } from "./queue.ts";

export interface Channel<T> {
closed: Promise<void>;
signal: AbortSignal;
close(): void;
send(value: T): Promise<void>;
recv(signal?: AbortSignal): AsyncIterableIterator<T>;
closed: Promise<void>;
signal: AbortSignal;
close(): void;
send(value: T): Promise<void>;
recv(signal?: AbortSignal): AsyncIterableIterator<T>;
}

export const link = (...signals: AbortSignal[]): AbortSignal => {
const ctrl = new AbortController();
for (const signal of signals) {
signal.onabort = (evt) => {
if (!ctrl.signal.aborted) {
ctrl.abort(evt);
}
}
}
return ctrl.signal;
}
const ctrl = new AbortController();
for (const signal of signals) {
signal.onabort = (evt) => {
if (!ctrl.signal.aborted) {
ctrl.abort(evt);
}
};
}
return ctrl.signal;
};

export class ClosedChannelError extends Error {
constructor() {
super("Channel is closed");
}
constructor() {
super("Channel is closed");
}
}
export const ifClosedChannel = (cb: () => Promise<void> | void) => (err: unknown) => {
export const ifClosedChannel =
(cb: () => Promise<void> | void) => (err: unknown) => {
if (err instanceof ClosedChannelError) {
return cb();
return cb();
}
throw err;
}
};

export const ignoreIfClosed = ifClosedChannel(() => { })
export const ignoreIfClosed = ifClosedChannel(() => {});
export const makeChan = <T>(): Channel<T> => {
const queue: Queue<{ value: T, resolve: () => void }> = new Queue();
const ctrl = new AbortController();
const abortPromise = Promise.withResolvers<void>();
ctrl.signal.onabort = () => {
abortPromise.resolve();
}
const queue: Queue<{ value: T; resolve: () => void }> = new Queue();
const ctrl = new AbortController();
const abortPromise = Promise.withResolvers<void>();
ctrl.signal.onabort = () => {
abortPromise.resolve();
};

const send = (value: T): Promise<void> => {
return new Promise((resolve, reject) => {
if (ctrl.signal.aborted) reject(new ClosedChannelError());
queue.push({ value, resolve });
});
};
const send = (value: T): Promise<void> => {
return new Promise((resolve, reject) => {
if (ctrl.signal.aborted) reject(new ClosedChannelError());
queue.push({ value, resolve });
});
};

const close = () => {
ctrl.abort();
};
const close = () => {
ctrl.abort();
};

const recv = async function* (signal?: AbortSignal): AsyncIterableIterator<T> {
const linked = signal ? link(ctrl.signal, signal) : ctrl.signal;
while (true) {
if (linked.aborted) {
return;
}
try {
const next = await queue.pop({ signal: linked });
next.resolve();
yield next.value;
} catch (_err) {
if (linked.aborted) {
return;
}
throw _err;
}
const recv = async function* (
signal?: AbortSignal,
): AsyncIterableIterator<T> {
const linked = signal ? link(ctrl.signal, signal) : ctrl.signal;
while (true) {
if (linked.aborted) {
return;
}
try {
const next = await queue.pop({ signal: linked });
next.resolve();
yield next.value;
} catch (_err) {
if (linked.aborted) {
return;
}
};
throw _err;
}
}
};

return { send, recv, close, signal: ctrl.signal, closed: abortPromise.promise };
return {
send,
recv,
close,
signal: ctrl.signal,
closed: abortPromise.promise,
};
};

export interface DuplexChannel<TSend, TReceive> {
in: Channel<TReceive>
out: Channel<TSend>
in: Channel<TReceive>;
out: Channel<TSend>;
}

export const makeWebSocket = <TSend, TReceive>(socket: WebSocket, parse: boolean = true): Promise<DuplexChannel<TSend, TReceive>> => {
const sendChan = makeChan<TSend>();
const recvChan = makeChan<TReceive>();
const ch = Promise.withResolvers<DuplexChannel<TSend, TReceive>>();
socket.onclose = () => {
sendChan.close();
recvChan.close();
export const makeWebSocket = <TSend, TReceive>(
socket: WebSocket,
parse: boolean = true,
): Promise<DuplexChannel<TSend, TReceive>> => {
const sendChan = makeChan<TSend>();
const recvChan = makeChan<TReceive>();
const ch = Promise.withResolvers<DuplexChannel<TSend, TReceive>>();
socket.onclose = () => {
sendChan.close();
recvChan.close();
};
socket.onerror = (err) => {
socket.close();
ch.reject(err);
};
socket.onmessage = async (msg) => {
let eventData = msg.data;
const target = msg?.target;
if (
target && "binaryType" in target &&
target.binaryType === "blob" && typeof eventData === "object" &&
"text" in eventData
) {
eventData = await eventData.text();
}
socket.onerror = (err) => {
socket.close();
ch.reject(err);
const message = parse ? JSON.parse(eventData) : eventData;
await recvChan.send(message);
};
socket.onopen = async () => {
ch.resolve({ in: recvChan, out: sendChan });
for await (const message of sendChan.recv()) {
try {
socket.send(parse ? JSON.stringify(message) : message as ArrayBuffer);
} catch (_err) {
console.error("error sending message through socket", message);
}
}
socket.onmessage = async (msg) => {
let eventData = msg.data;
const target = msg?.target;
if (
target && "binaryType" in target &&
target.binaryType === "blob" && typeof eventData === "object" &&
"text" in eventData
) {
eventData = await eventData.text();
}
const message = parse ? JSON.parse(eventData) : eventData;
await recvChan.send(message);
}
socket.onopen = async () => {
ch.resolve({ in: recvChan, out: sendChan });
for await (const message of sendChan.recv()) {
try {
socket.send(parse ? JSON.stringify(message) : message as ArrayBuffer);
} catch (_err) {
console.error("error sending message through socket", message);
}
}
socket.close();
}
return ch.promise;
}
socket.close();
};
return ch.promise;
};

export const makeReadableStream = (ch: Channel<Uint8Array>): ReadableStream<Uint8Array> => {
return new ReadableStream({
async start(controller) {
for await (const content of ch.recv()) {
controller.enqueue(content);
}
controller.close();
},
cancel() {
ch.close();
},
})
}
export const makeReadableStream = (
ch: Channel<Uint8Array>,
): ReadableStream<Uint8Array> => {
return new ReadableStream({
async start(controller) {
for await (const content of ch.recv()) {
controller.enqueue(content);
}
controller.close();
},
cancel() {
ch.close();
},
});
};
export const makeChanStream = (stream: ReadableStream): Channel<Uint8Array> => {
const chan = makeChan<Uint8Array>();
const chan = makeChan<Uint8Array>();

// Consume the transformed stream to trigger the pipeline
const reader = stream.getReader();
const processStream = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) break;
await chan.send(value);
}
chan.close();
};
processStream().catch(console.error);
return chan;
// Consume the transformed stream to trigger the pipeline
const reader = stream.getReader();
const processStream = async () => {
while (true) {
const { done, value } = await reader.read();
if (done) break;
await chan.send(value);
}
chan.close();
};
processStream().catch(console.error);
return chan;
};
Loading

0 comments on commit 09ac127

Please sign in to comment.