Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(std): watch function for logging #35

Merged
merged 5 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export type ClientConfig = NodeConfig;
export type ClientOptions = Partial<ClientConfig>;

export interface ClientEventTypeRecord {
message: ClientToRelayMessage;
receive: ClientToRelayMessage;
}

/**
Expand Down Expand Up @@ -38,7 +38,7 @@ export class Client extends Node<
this.ws.addEventListener("message", (ev: MessageEvent<string>) => {
const message = JSON.parse(ev.data) as ClientToRelayMessage;
// TODO: Validate the message.
this.dispatch("message", message);
this.dispatch("receive", message);
});
}
}
1 change: 1 addition & 0 deletions core/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"exports": {
".": "./mod.ts",
"./clients": "./clients.ts",
"./nodes": "./nodes.ts",
"./protocol": "./protocol.ts",
"./relays": "./relays.ts"
},
Expand Down
8 changes: 4 additions & 4 deletions core/nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,9 @@ export class Node<
// ------------------------------

// deno-lint-ignore no-explicit-any
type AnyEventTypeRecord = any;
export type AnyEventTypeRecord = any;

type EventType<R = AnyEventTypeRecord> = keyof R & string;
export type EventType<R = AnyEventTypeRecord> = keyof R & string;

export class NodeEvent<
R = AnyEventTypeRecord,
Expand All @@ -115,13 +115,13 @@ type NodeEventListener<
R = AnyEventTypeRecord,
T extends EventType<R> = EventType<R>,
> // deno-lint-ignore no-explicit-any
= (this: Node<W, R>, ev: MessageEvent<R[T]>) => any;
= (this: Node<W, R>, ev: NodeEvent<R, T>) => any;

type NodeEventListenerObject<
W extends InterNodeMessage,
R = AnyEventTypeRecord,
T extends EventType<R> = EventType<R>,
> = {
// deno-lint-ignore no-explicit-any
handleEvent(this: Node<W, R>, ev: MessageEvent<R[T]>): any;
handleEvent(this: Node<W, R>, ev: NodeEvent<R, T>): any;
};
8 changes: 4 additions & 4 deletions core/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ type PublicationMessage = {
// Events
//---------

export interface RelayEventTypeRecord {
message: RelayToClientMessage;
export interface RelayEventMap {
receive: RelayToClientMessage;
subscribe: SubscriptionContext & {
controller: ReadableStreamDefaultController<NostrEvent>;
};
Expand All @@ -87,7 +87,7 @@ export interface RelayEventTypeRecord {
*/
export class Relay extends Node<
ClientToRelayMessage,
RelayEventTypeRecord
RelayEventMap
> {
declare ws: LazyWebSocket;
declare config: RelayConfig;
Expand All @@ -108,7 +108,7 @@ export class Relay extends Node<
this.ws.addEventListener("message", (ev: MessageEvent<string>) => {
const message = JSON.parse(ev.data) as RelayToClientMessage;
// TODO: Validate the message.
this.dispatch("message", message);
this.dispatch("receive", message);
});
}

Expand Down
1 change: 1 addition & 0 deletions deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@lophus/std/pools": "./std/pools.ts",
"@lophus/std/signs": "./std/signs.ts",
"@lophus/std/times": "./std/times.ts",
"@lophus/std/watch": "./std/watch.ts",
"@std/assert": "jsr:@std/assert@^0.219.1",
"@std/streams": "jsr:@std/streams@^0.219.1",
"@std/testing": "jsr:@std/testing@^0.219.1"
Expand Down
2 changes: 1 addition & 1 deletion nips/01/clients.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ declare module "@lophus/core/clients" {
}

const M: NIPModule<typeof Client> = (client) => {
client.on("message", (message) => {
client.on("receive", (message) => {
switch (message[0]) {
case "EVENT": {
const event = message[1];
Expand Down
4 changes: 2 additions & 2 deletions nips/01/clients_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe("Client (NIP-01)", () => {
it("should receive an event and send a OK message", async () => {
const event = { id: "test-ok", kind: 0 };
const received = new Promise<ClientToRelayMessage<"EVENT">>((resolve) =>
client.on("message", (msg) => {
client.on("receive", (msg) => {
if (msg[0] === "EVENT") resolve(msg);
})
);
Expand All @@ -49,7 +49,7 @@ describe("Client (NIP-01)", () => {
subid = "test-req" as SubscriptionId;
const request: ClientToRelayMessage<"REQ"> = ["REQ", subid, { kinds: [1] }];
const received = new Promise<ClientToRelayMessage<"REQ">>((resolve) => {
client.on("message", (msg) => {
client.on("receive", (msg) => {
if (msg[0] === "REQ" && msg[1] === subid) resolve(msg);
});
});
Expand Down
2 changes: 1 addition & 1 deletion nips/01/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { EventRejected, Relay, SubscriptionClosed } from "@lophus/core/relays";
import { NIPModule } from "../nodes.ts";

export const M: NIPModule<typeof Relay> = (relay) => {
relay.on("message", (message) => {
relay.on("receive", (message) => {
switch (message[0]) {
case "EVENT":
case "OK":
Expand Down
2 changes: 1 addition & 1 deletion nips/42/relays.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ declare module "@lophus/core/relays" {
}

const M: NIPModule<typeof Relay> = (relay) => {
relay.on("message", (message) => {
relay.on("receive", (message) => {
if (message[0] !== "AUTH") {
// This NIP only handles AUTH messages
return;
Expand Down
3 changes: 2 additions & 1 deletion std/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
"./pools": "./pools.ts",
"./relays": "./relays.ts",
"./signs": "./signs.ts",
"./times": "./times.ts"
"./times": "./times.ts",
"./watch": "./watch.ts"
},
"imports": {
"@std/streams": "jsr:@std/streams@^0.219.1",
Expand Down
4 changes: 2 additions & 2 deletions std/relays_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ describe("RelayGroup", () => {
const messages = Array.fromAsync(sub);
relays.filter((r) => r.config.read).forEach((relay, i) => {
relay.dispatch(
"message",
"receive",
// deno-lint-ignore no-explicit-any
["EVENT", "test-group", { kind: 1, id: i }] as any,
);
relay.dispatch(
"message",
"receive",
["EOSE", "test-group" as SubscriptionId],
);
});
Expand Down
52 changes: 52 additions & 0 deletions std/watch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { WebSocketEventType, WebSocketLike } from "@lophus/lib/websockets";
import { AnyEventTypeRecord } from "@lophus/core/nodes";
import { EventType, Node, NodeEvent } from "@lophus/core/nodes";
import type { InterNodeMessage } from "@lophus/core/protocol";

interface WatchNodeChainable<
R extends AnyEventTypeRecord,
> {
<T extends EventType<R>>(...events: T[]): ReadableStream<NodeEvent<R, T>>;
}

interface WatchWebSocketChainable {
<T extends WebSocketEventType>(
...events: T[]
): ReadableStream<WebSocketEventMap[T]>;
}

export function watch<R extends AnyEventTypeRecord>(
...nodes: Node<InterNodeMessage, R>[]
): WatchNodeChainable<R>;

export function watch(
...wss: WebSocketLike[]
): WatchWebSocketChainable;

export function watch<R extends AnyEventTypeRecord>(
...targets: Node<InterNodeMessage, R>[] | WebSocketLike[]
): WatchNodeChainable<R> | WatchWebSocketChainable {
const aborter = new AbortController();
return <T extends EventType<R> | WebSocketEventType>(
...events: T[]
) => {
return new ReadableStream(
{
start(controller) {
targets.forEach((target) =>
events.forEach((type) =>
// @ts-ignore we do not type this strictly for readability
target.addEventListener(type, (event) => {
// de-prioritize to regular listeners
queueMicrotask(() => controller.enqueue(event));
}, { signal: aborter.signal })
)
);
},
cancel() {
aborter.abort();
},
},
);
};
}
106 changes: 106 additions & 0 deletions std/watch_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import { assertEquals, assertExists, assertInstanceOf } from "@std/assert";
import { beforeAll, describe, it } from "@std/testing/bdd";
import { MockWebSocket } from "@lophus/lib/testing";
import { Relay } from "@lophus/nips/relays";
import { watch } from "./watch.ts";

describe("watch - websockets", () => {
const ws = new MockWebSocket();

it("should create a chainable from a websocket", () => {
const chainable = watch(ws);
assertExists(chainable.call);
});

it("should create a stream of events from multiple websockets", () => {
const chainable = watch(ws, ws);
assertExists(chainable.call);
});

it("should create a stream of events from a websocket", () => {
const stream = watch(ws)("message");
assertInstanceOf(stream, ReadableStream);
});

it("should create a stream of events of multiple types from a websocket", () => {
const stream = watch(ws)("message", "open");
assertInstanceOf(stream, ReadableStream);
});

it("should create a stream of events of multiple types from multiple websockets", () => {
const stream = watch(ws, ws)("message", "open");
assertInstanceOf(stream, ReadableStream);
});

it("should receive an event from a websocket", async () => {
const stream = watch(ws)("message");
const reader = stream.getReader();
ws.dispatchEvent(new MessageEvent("message", { data: "test" }));
const { value } = await reader.read();
assertExists(value);
assertEquals(value.type, "message");
assertEquals(value.data, "test");
});

it("should remove the event listener when the stream is canceled", async () => {
const stream = watch(ws)("message");
const reader = stream.getReader();
reader.cancel();
ws.dispatchEvent(new MessageEvent("message", { data: "test" }));
const { done } = await reader.read();
assertEquals(done, true);
});
});

describe("watch - relays", () => {
let relay: Relay;

beforeAll(() => {
relay = new Relay("wss://localhost:8080");
globalThis.WebSocket = MockWebSocket;
});

it("should create a chainable from a relay", () => {
const chainable = watch(relay);
assertExists(chainable.call);
});

it("should create a chainable from multiple relays", () => {
const chainable = watch(relay, relay);
assertExists(chainable.call);
});

it("should create a stream of events from a relay", () => {
const stream = watch(relay)("receive");
assertInstanceOf(stream, ReadableStream);
});

it("should create a stream of events of multiple types from a relay", () => {
const stream = watch(relay)("receive", "subscribe");
assertInstanceOf(stream, ReadableStream);
});

it("should create a stream of events of multiple types from multiple relays", () => {
const stream = watch(relay, relay)("receive", "subscribe");
assertInstanceOf(stream, ReadableStream);
});

it("should receive an event from a relay", async () => {
const stream = watch(relay)("receive");
const reader = stream.getReader();
relay.dispatch("receive", ["NOTICE", "test"]);
const { value } = await reader.read();
assertExists(value);
assertEquals(value.type, "receive");
assertEquals(value.data, ["NOTICE", "test"]);
});

it("should remove the event listener when the stream is canceled", async () => {
const stream = watch(relay)("receive");
const reader = stream.getReader();
reader.cancel();
relay.dispatch("receive", ["NOTICE", "test"]);
const { done } = await reader.read();
assertEquals(done, true);
});
});