From 76989872ef613193ce3cf2330456d8eacef3c0bf Mon Sep 17 00:00:00 2001 From: hasundue Date: Fri, 22 Mar 2024 13:28:14 +0900 Subject: [PATCH 1/5] feat(std): `watch` for logging --- core/deno.json | 1 + core/nodes.ts | 8 ++++---- std/watch.ts | 35 +++++++++++++++++++++++++++++++++ std/watch_test.ts | 49 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 std/watch.ts create mode 100644 std/watch_test.ts diff --git a/core/deno.json b/core/deno.json index ffe0eb2..6cfa0ea 100644 --- a/core/deno.json +++ b/core/deno.json @@ -4,6 +4,7 @@ "exports": { ".": "./mod.ts", "./clients": "./clients.ts", + "./nodes": "./nodes.ts", "./protocol": "./protocol.ts", "./relays": "./relays.ts" }, diff --git a/core/nodes.ts b/core/nodes.ts index c01b2f6..b03b64f 100644 --- a/core/nodes.ts +++ b/core/nodes.ts @@ -88,9 +88,9 @@ export class Node< // ------------------------------ // deno-lint-ignore no-explicit-any -type AnyEventTypeRecord = any; +export type AnyEventTypeRecord = any; -type EventType = keyof R & string; +export type EventType = keyof R & string; export class NodeEvent< R = AnyEventTypeRecord, @@ -115,7 +115,7 @@ type NodeEventListener< R = AnyEventTypeRecord, T extends EventType = EventType, > // deno-lint-ignore no-explicit-any - = (this: Node, ev: MessageEvent) => any; + = (this: Node, ev: NodeEvent) => any; type NodeEventListenerObject< W extends InterNodeMessage, @@ -123,5 +123,5 @@ type NodeEventListenerObject< T extends EventType = EventType, > = { // deno-lint-ignore no-explicit-any - handleEvent(this: Node, ev: MessageEvent): any; + handleEvent(this: Node, ev: NodeEvent): any; }; diff --git a/std/watch.ts b/std/watch.ts new file mode 100644 index 0000000..4dba82a --- /dev/null +++ b/std/watch.ts @@ -0,0 +1,35 @@ +import { + AnyEventTypeRecord, + EventType, + Node, + NodeEvent, +} from "@lophus/core/nodes"; +import type { InterNodeMessage } from "@lophus/core/protocol"; + +interface WatchChainable< + R extends AnyEventTypeRecord, +> { + >(...events: T[]): ReadableStream>; +} + +export function watch( + ...nodes: Node[] +): WatchChainable { + const aborter = new AbortController(); + return >(...events: T[]) => + new ReadableStream>({ + start(controller) { + nodes.forEach((node) => + events.forEach((type) => + node.addEventListener(type, (event) => { + // De-prioritize to regular listeners + queueMicrotask(() => controller.enqueue(event)); + }, { signal: aborter.signal }) + ) + ); + }, + cancel() { + aborter.abort(); + }, + }); +} diff --git a/std/watch_test.ts b/std/watch_test.ts new file mode 100644 index 0000000..0e5ddf9 --- /dev/null +++ b/std/watch_test.ts @@ -0,0 +1,49 @@ +import { assertEquals, assertExists, assertInstanceOf } from "@std/assert"; +import { beforeAll, describe, it } from "@std/testing/bdd"; +import { MockWebSocket } from "@lophus/lib/testing"; +import { Relay } from "@lophus/nips/relays"; +import { watch } from "./watch.ts"; + +describe("watch - Relay", () => { + let relay: Relay; + + beforeAll(() => { + relay = new Relay("wss://localhost:8080"); + globalThis.WebSocket = MockWebSocket; + }); + + it("should create a chainable from a relay", () => { + const chainable = watch(relay); + assertExists(chainable.call); + }); + + it("should create a chainable from multiple relays", () => { + const chainable = watch(relay, relay); + assertExists(chainable.call); + }); + + it("should create a stream of events from a relay", () => { + const stream = watch(relay)("message"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should create a stream of events of multiple types from a relay", () => { + const stream = watch(relay)("message", "subscribe"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should create a stream of events of multiple types from multiple relays", () => { + const stream = watch(relay, relay)("message", "subscribe"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should receive an event from a relay", async () => { + const stream = watch(relay)("message"); + const reader = stream.getReader(); + relay.dispatch("message", ["NOTICE", "test"]); + const { value } = await reader.read(); + assertExists(value); + assertEquals(value.type, "message"); + assertEquals(value.data, ["NOTICE", "test"]); + }); +}); From b539414c1dba2d2491cbd3723cfa3a9c1b7d1311 Mon Sep 17 00:00:00 2001 From: hasundue Date: Fri, 22 Mar 2024 14:59:38 +0900 Subject: [PATCH 2/5] refactor(core): rename `message` event as `receive` --- core/clients.ts | 4 ++-- core/relays.ts | 8 ++++---- nips/01/clients.ts | 2 +- nips/01/clients_test.ts | 4 ++-- nips/01/relays.ts | 2 +- nips/42/relays.ts | 2 +- std/relays_test.ts | 4 ++-- std/watch.ts | 8 ++------ std/watch_test.ts | 12 ++++++------ 9 files changed, 21 insertions(+), 25 deletions(-) diff --git a/core/clients.ts b/core/clients.ts index e84fe0d..b19863b 100644 --- a/core/clients.ts +++ b/core/clients.ts @@ -10,7 +10,7 @@ export type ClientConfig = NodeConfig; export type ClientOptions = Partial; export interface ClientEventTypeRecord { - message: ClientToRelayMessage; + receive: ClientToRelayMessage; } /** @@ -38,7 +38,7 @@ export class Client extends Node< this.ws.addEventListener("message", (ev: MessageEvent) => { const message = JSON.parse(ev.data) as ClientToRelayMessage; // TODO: Validate the message. - this.dispatch("message", message); + this.dispatch("receive", message); }); } } diff --git a/core/relays.ts b/core/relays.ts index 2556f70..504b53b 100644 --- a/core/relays.ts +++ b/core/relays.ts @@ -70,8 +70,8 @@ type PublicationMessage = { // Events //--------- -export interface RelayEventTypeRecord { - message: RelayToClientMessage; +export interface RelayEventMap { + receive: RelayToClientMessage; subscribe: SubscriptionContext & { controller: ReadableStreamDefaultController; }; @@ -87,7 +87,7 @@ export interface RelayEventTypeRecord { */ export class Relay extends Node< ClientToRelayMessage, - RelayEventTypeRecord + RelayEventMap > { declare ws: LazyWebSocket; declare config: RelayConfig; @@ -108,7 +108,7 @@ export class Relay extends Node< this.ws.addEventListener("message", (ev: MessageEvent) => { const message = JSON.parse(ev.data) as RelayToClientMessage; // TODO: Validate the message. - this.dispatch("message", message); + this.dispatch("receive", message); }); } diff --git a/nips/01/clients.ts b/nips/01/clients.ts index a4684a2..76a71a7 100644 --- a/nips/01/clients.ts +++ b/nips/01/clients.ts @@ -15,7 +15,7 @@ declare module "@lophus/core/clients" { } const M: NIPModule = (client) => { - client.on("message", (message) => { + client.on("receive", (message) => { switch (message[0]) { case "EVENT": { const event = message[1]; diff --git a/nips/01/clients_test.ts b/nips/01/clients_test.ts index f57de7a..8425d7e 100644 --- a/nips/01/clients_test.ts +++ b/nips/01/clients_test.ts @@ -31,7 +31,7 @@ describe("Client (NIP-01)", () => { it("should receive an event and send a OK message", async () => { const event = { id: "test-ok", kind: 0 }; const received = new Promise>((resolve) => - client.on("message", (msg) => { + client.on("receive", (msg) => { if (msg[0] === "EVENT") resolve(msg); }) ); @@ -49,7 +49,7 @@ describe("Client (NIP-01)", () => { subid = "test-req" as SubscriptionId; const request: ClientToRelayMessage<"REQ"> = ["REQ", subid, { kinds: [1] }]; const received = new Promise>((resolve) => { - client.on("message", (msg) => { + client.on("receive", (msg) => { if (msg[0] === "REQ" && msg[1] === subid) resolve(msg); }); }); diff --git a/nips/01/relays.ts b/nips/01/relays.ts index 16eddbb..730077e 100644 --- a/nips/01/relays.ts +++ b/nips/01/relays.ts @@ -2,7 +2,7 @@ import { EventRejected, Relay, SubscriptionClosed } from "@lophus/core/relays"; import { NIPModule } from "../nodes.ts"; export const M: NIPModule = (relay) => { - relay.on("message", (message) => { + relay.on("receive", (message) => { switch (message[0]) { case "EVENT": case "OK": diff --git a/nips/42/relays.ts b/nips/42/relays.ts index 9f4da5f..54c0ce2 100644 --- a/nips/42/relays.ts +++ b/nips/42/relays.ts @@ -11,7 +11,7 @@ declare module "@lophus/core/relays" { } const M: NIPModule = (relay) => { - relay.on("message", (message) => { + relay.on("receive", (message) => { if (message[0] !== "AUTH") { // This NIP only handles AUTH messages return; diff --git a/std/relays_test.ts b/std/relays_test.ts index a67914e..0c072be 100644 --- a/std/relays_test.ts +++ b/std/relays_test.ts @@ -89,12 +89,12 @@ describe("RelayGroup", () => { const messages = Array.fromAsync(sub); relays.filter((r) => r.config.read).forEach((relay, i) => { relay.dispatch( - "message", + "receive", // deno-lint-ignore no-explicit-any ["EVENT", "test-group", { kind: 1, id: i }] as any, ); relay.dispatch( - "message", + "receive", ["EOSE", "test-group" as SubscriptionId], ); }); diff --git a/std/watch.ts b/std/watch.ts index 4dba82a..a074546 100644 --- a/std/watch.ts +++ b/std/watch.ts @@ -1,9 +1,5 @@ -import { - AnyEventTypeRecord, - EventType, - Node, - NodeEvent, -} from "@lophus/core/nodes"; +import { AnyEventTypeRecord } from "@lophus/core/nodes"; +import { EventType, Node, NodeEvent } from "@lophus/core/nodes"; import type { InterNodeMessage } from "@lophus/core/protocol"; interface WatchChainable< diff --git a/std/watch_test.ts b/std/watch_test.ts index 0e5ddf9..2ccf31c 100644 --- a/std/watch_test.ts +++ b/std/watch_test.ts @@ -23,27 +23,27 @@ describe("watch - Relay", () => { }); it("should create a stream of events from a relay", () => { - const stream = watch(relay)("message"); + const stream = watch(relay)("receive"); assertInstanceOf(stream, ReadableStream); }); it("should create a stream of events of multiple types from a relay", () => { - const stream = watch(relay)("message", "subscribe"); + const stream = watch(relay)("receive", "subscribe"); assertInstanceOf(stream, ReadableStream); }); it("should create a stream of events of multiple types from multiple relays", () => { - const stream = watch(relay, relay)("message", "subscribe"); + const stream = watch(relay, relay)("receive", "subscribe"); assertInstanceOf(stream, ReadableStream); }); it("should receive an event from a relay", async () => { - const stream = watch(relay)("message"); + const stream = watch(relay)("receive"); const reader = stream.getReader(); - relay.dispatch("message", ["NOTICE", "test"]); + relay.dispatch("receive", ["NOTICE", "test"]); const { value } = await reader.read(); assertExists(value); - assertEquals(value.type, "message"); + assertEquals(value.type, "receive"); assertEquals(value.data, ["NOTICE", "test"]); }); }); From 5ac39ecd50a7b2746dc221df910f5194dcdd6836 Mon Sep 17 00:00:00 2001 From: hasundue Date: Fri, 22 Mar 2024 15:33:38 +0900 Subject: [PATCH 3/5] feat(std): `watch` for websockets --- std/watch.ts | 55 ++++++++++++++++++++++++++++++++--------------- std/watch_test.ts | 41 ++++++++++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 18 deletions(-) diff --git a/std/watch.ts b/std/watch.ts index a074546..0237838 100644 --- a/std/watch.ts +++ b/std/watch.ts @@ -1,31 +1,52 @@ +import { WebSocketEventType, WebSocketLike } from "@lophus/lib/websockets"; import { AnyEventTypeRecord } from "@lophus/core/nodes"; import { EventType, Node, NodeEvent } from "@lophus/core/nodes"; import type { InterNodeMessage } from "@lophus/core/protocol"; -interface WatchChainable< +interface WatchNodeChainable< R extends AnyEventTypeRecord, > { >(...events: T[]): ReadableStream>; } +interface WatchWebSocketChainable { + ( + ...events: T[] + ): ReadableStream; +} + export function watch( ...nodes: Node[] -): WatchChainable { +): WatchNodeChainable; + +export function watch( + ...wss: WebSocketLike[] +): WatchWebSocketChainable; + +export function watch( + ...targets: Node[] | WebSocketLike[] +): WatchNodeChainable | WatchWebSocketChainable { const aborter = new AbortController(); - return >(...events: T[]) => - new ReadableStream>({ - start(controller) { - nodes.forEach((node) => - events.forEach((type) => - node.addEventListener(type, (event) => { - // De-prioritize to regular listeners - queueMicrotask(() => controller.enqueue(event)); - }, { signal: aborter.signal }) - ) - ); - }, - cancel() { - aborter.abort(); + return | WebSocketEventType>( + ...events: T[] + ) => { + return new ReadableStream( + { + start(controller) { + targets.forEach((target) => + events.forEach((type) => + // @ts-ignore we do not type this strictly for readability + target.addEventListener(type, (event) => { + // de-prioritize to regular listeners + queueMicrotask(() => controller.enqueue(event)); + }, { signal: aborter.signal }) + ) + ); + }, + cancel() { + aborter.abort(); + }, }, - }); + ); + }; } diff --git a/std/watch_test.ts b/std/watch_test.ts index 2ccf31c..2b242f8 100644 --- a/std/watch_test.ts +++ b/std/watch_test.ts @@ -4,7 +4,46 @@ import { MockWebSocket } from "@lophus/lib/testing"; import { Relay } from "@lophus/nips/relays"; import { watch } from "./watch.ts"; -describe("watch - Relay", () => { +describe("watch - websockets", () => { + const ws = new MockWebSocket(); + + it("should create a chainable from a websocket", () => { + const chainable = watch(ws); + assertExists(chainable.call); + }); + + it("should create a stream of events from multiple websockets", () => { + const chainable = watch(ws, ws); + assertExists(chainable.call); + }); + + it("should create a stream of events from a websocket", () => { + const stream = watch(ws)("message"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should create a stream of events of multiple types from a websocket", () => { + const stream = watch(ws)("message", "open"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should create a stream of events of multiple types from multiple websockets", () => { + const stream = watch(ws, ws)("message", "open"); + assertInstanceOf(stream, ReadableStream); + }); + + it("should receive an event from a websocket", async () => { + const stream = watch(ws)("message"); + const reader = stream.getReader(); + ws.dispatchEvent(new MessageEvent("message", { data: "test" })); + const { value } = await reader.read(); + assertExists(value); + assertEquals(value.type, "message"); + assertEquals(value.data, "test"); + }); +}); + +describe("watch - relays", () => { let relay: Relay; beforeAll(() => { From 0419a3f929a57e756af26488499dbbc0794fc6a5 Mon Sep 17 00:00:00 2001 From: hasundue Date: Fri, 22 Mar 2024 15:37:42 +0900 Subject: [PATCH 4/5] chore: update deno.json --- deno.json | 1 + std/deno.json | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/deno.json b/deno.json index 8b07dcb..523d2e8 100644 --- a/deno.json +++ b/deno.json @@ -28,6 +28,7 @@ "@lophus/std/pools": "./std/pools.ts", "@lophus/std/signs": "./std/signs.ts", "@lophus/std/times": "./std/times.ts", + "@lophus/std/watch": "./std/watch.ts", "@std/assert": "jsr:@std/assert@^0.219.1", "@std/streams": "jsr:@std/streams@^0.219.1", "@std/testing": "jsr:@std/testing@^0.219.1" diff --git a/std/deno.json b/std/deno.json index afa6240..072c3a6 100644 --- a/std/deno.json +++ b/std/deno.json @@ -7,7 +7,8 @@ "./pools": "./pools.ts", "./relays": "./relays.ts", "./signs": "./signs.ts", - "./times": "./times.ts" + "./times": "./times.ts", + "./watch": "./watch.ts" }, "imports": { "@std/streams": "jsr:@std/streams@^0.219.1", From cff54bd8aec526398b3b0c8928ed988273af5f1f Mon Sep 17 00:00:00 2001 From: hasundue Date: Fri, 22 Mar 2024 15:42:09 +0900 Subject: [PATCH 5/5] test(std/watch): add tests for closing streams --- std/watch_test.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/std/watch_test.ts b/std/watch_test.ts index 2b242f8..bf01542 100644 --- a/std/watch_test.ts +++ b/std/watch_test.ts @@ -41,6 +41,15 @@ describe("watch - websockets", () => { assertEquals(value.type, "message"); assertEquals(value.data, "test"); }); + + it("should remove the event listener when the stream is canceled", async () => { + const stream = watch(ws)("message"); + const reader = stream.getReader(); + reader.cancel(); + ws.dispatchEvent(new MessageEvent("message", { data: "test" })); + const { done } = await reader.read(); + assertEquals(done, true); + }); }); describe("watch - relays", () => { @@ -85,4 +94,13 @@ describe("watch - relays", () => { assertEquals(value.type, "receive"); assertEquals(value.data, ["NOTICE", "test"]); }); + + it("should remove the event listener when the stream is canceled", async () => { + const stream = watch(relay)("receive"); + const reader = stream.getReader(); + reader.cancel(); + relay.dispatch("receive", ["NOTICE", "test"]); + const { done } = await reader.read(); + assertEquals(done, true); + }); });