diff --git a/.cspell.json b/.cspell.json index 5eb847a036..fea16763c9 100644 --- a/.cspell.json +++ b/.cspell.json @@ -39,6 +39,7 @@ "exponentiate", "extip", "fanout", + "sharded", "floodsub", "fontsource", "globby", diff --git a/package-lock.json b/package-lock.json index 9c02c23ace..f12d4f53dd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6506,7 +6506,8 @@ }, "node_modules/chai": { "version": "4.3.8", - "license": "MIT", + "resolved": "https://registry.npmjs.org/chai/-/chai-4.3.8.tgz", + "integrity": "sha512-vX4YvVVtxlfSZ2VecZgFUTU5qPCYsobVI2O9FmwEXBhDigYGQA6jRXCycIs1yJnnWbZ6/+a2zNIF5DfVCcJBFQ==", "dependencies": { "assertion-error": "^1.1.0", "check-error": "^1.0.2", @@ -26180,7 +26181,7 @@ "version": "0.0.11", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/interfaces": "0.0.18", + "chai": "^4.3.8", "debug": "^4.3.4", "uint8arrays": "^4.0.4" }, @@ -26189,7 +26190,7 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.17", + "@waku/interfaces": "0.0.18", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^3.29.2" @@ -26197,14 +26198,6 @@ "engines": { "node": ">=18" } - }, - "packages/utils/node_modules/@waku/interfaces": { - "version": "0.0.17", - "dev": true, - "license": "MIT OR Apache-2.0", - "engines": { - "node": ">=16" - } } }, "dependencies": { @@ -29401,18 +29394,13 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.17", + "@waku/interfaces": "0.0.18", + "chai": "^4.3.8", "cspell": "^7.3.2", "debug": "^4.3.4", "npm-run-all": "^4.1.5", "rollup": "^3.29.2", "uint8arrays": "^4.0.4" - }, - "dependencies": { - "@waku/interfaces": { - "version": "0.0.17", - "dev": true - } } }, "@webassemblyjs/ast": { @@ -30521,6 +30509,8 @@ }, "chai": { "version": "4.3.8", + "resolved": "https://registry.npmjs.org/chai/-/chai-4.3.8.tgz", + "integrity": "sha512-vX4YvVVtxlfSZ2VecZgFUTU5qPCYsobVI2O9FmwEXBhDigYGQA6jRXCycIs1yJnnWbZ6/+a2zNIF5DfVCcJBFQ==", "requires": { "assertion-error": "^1.1.0", "check-error": "^1.0.2", diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index bd102e3787..d0b6d68c58 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -17,7 +17,11 @@ import type { Unsubscribe } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; -import { groupByContentTopic, toAsyncIterator } from "@waku/utils"; +import { + ensurePubsubTopicIsConfigured, + groupByContentTopic, + toAsyncIterator +} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -230,7 +234,7 @@ class Subscription { } class Filter extends BaseProtocol implements IReceiver { - private readonly options: ProtocolCreateOptions; + private readonly pubSubTopics: PubSubTopic[] = []; private activeSubscriptions = new Map(); private readonly NUM_PEERS_PROTOCOL = 1; @@ -253,19 +257,22 @@ class Filter extends BaseProtocol implements IReceiver { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(FilterCodecs.SUBSCRIBE, libp2p.components); + this.pubSubTopics = options?.pubSubTopics || [DefaultPubSubTopic]; + libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log("Failed to register ", FilterCodecs.PUSH, e); }); this.activeSubscriptions = new Map(); - - this.options = options ?? {}; } - async createSubscription(pubSubTopic?: string): Promise { - const _pubSubTopic = - pubSubTopic ?? this.options.pubSubTopic ?? DefaultPubSubTopic; + async createSubscription( + pubSubTopic: string = DefaultPubSubTopic + ): Promise { + ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics); + //TODO: get a relevant peer for the topic/shard + // https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230 const peer = ( await this.getPeers({ maxBootstrapPeers: 1, @@ -274,11 +281,11 @@ class Filter extends BaseProtocol implements IReceiver { )[0]; const subscription = - this.getActiveSubscription(_pubSubTopic, peer.id.toString()) ?? + this.getActiveSubscription(pubSubTopic, peer.id.toString()) ?? this.setActiveSubscription( - _pubSubTopic, + pubSubTopic, peer.id.toString(), - new Subscription(_pubSubTopic, peer, this.getStream.bind(this, peer)) + new Subscription(pubSubTopic, peer, this.getStream.bind(this, peer)) ); return subscription; diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 6227c08257..26f7c296d9 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -1,6 +1,6 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerStore } from "@libp2p/interface/peer-store"; -import type { IRelay } from "@waku/interfaces"; +import type { IRelay, PeerIdStr } from "@waku/interfaces"; import type { KeepAliveOptions } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; @@ -13,7 +13,7 @@ const log = debug("waku:keep-alive"); export class KeepAliveManager { private pingKeepAliveTimers: Map>; - private relayKeepAliveTimers: Map>; + private relayKeepAliveTimers: Map[]>; private options: KeepAliveOptions; private relay?: IRelay; @@ -66,17 +66,12 @@ export class KeepAliveManager { const relay = this.relay; if (relay && relayPeriodSecs !== 0) { - const encoder = createEncoder({ - contentTopic: RelayPingContentTopic, - ephemeral: true - }); - const interval = setInterval(() => { - log("Sending Waku Relay ping message"); - relay - .send(encoder, { payload: new Uint8Array([1]) }) - .catch((e) => log("Failed to send relay ping", e)); - }, relayPeriodSecs * 1000); - this.relayKeepAliveTimers.set(peerId, interval); + const intervals = this.scheduleRelayPings( + relay, + relayPeriodSecs, + peerId.toString() + ); + this.relayKeepAliveTimers.set(peerId, intervals); } } @@ -89,7 +84,7 @@ export class KeepAliveManager { } if (this.relayKeepAliveTimers.has(peerId)) { - clearInterval(this.relayKeepAliveTimers.get(peerId)); + this.relayKeepAliveTimers.get(peerId)?.map(clearInterval); this.relayKeepAliveTimers.delete(peerId); } } @@ -105,4 +100,32 @@ export class KeepAliveManager { this.pingKeepAliveTimers.clear(); this.relayKeepAliveTimers.clear(); } + + private scheduleRelayPings( + relay: IRelay, + relayPeriodSecs: number, + peerIdStr: PeerIdStr + ): NodeJS.Timeout[] { + // send a ping message to each PubSubTopic the peer is part of + const intervals: NodeJS.Timeout[] = []; + for (const topic of relay.pubSubTopics) { + const meshPeers = relay.getMeshPeers(topic); + if (!meshPeers.includes(peerIdStr)) continue; + + const encoder = createEncoder({ + pubSubTopic: topic, + contentTopic: RelayPingContentTopic, + ephemeral: true + }); + const interval = setInterval(() => { + log("Sending Waku Relay ping message"); + relay + .send(encoder, { payload: new Uint8Array([1]) }) + .catch((e) => log("Failed to send relay ping", e)); + }, relayPeriodSecs * 1000); + intervals.push(interval); + } + + return intervals; + } } diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index f4ffcb7fc2..547d1372ff 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -6,11 +6,12 @@ import { IMessage, Libp2p, ProtocolCreateOptions, + PubSubTopic, SendError, SendResult } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { isSizeValid } from "@waku/utils"; +import { ensurePubsubTopicIsConfigured, isSizeValid } from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -41,12 +42,12 @@ type PreparePushMessageResult = * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ class LightPush extends BaseProtocol implements ILightPush { - options: ProtocolCreateOptions; + private readonly pubSubTopics: PubSubTopic[]; private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); - this.options = options || {}; + this.pubSubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic]; } private async preparePushMessage( @@ -82,7 +83,9 @@ class LightPush extends BaseProtocol implements ILightPush { } async send(encoder: IEncoder, message: IMessage): Promise { - const { pubSubTopic = DefaultPubSubTopic } = this.options; + const { pubSubTopic } = encoder; + ensurePubsubTopicIsConfigured(pubSubTopic, this.pubSubTopics); + const recipients: PeerId[] = []; const { query, error: preparationError } = await this.preparePushMessage( @@ -98,6 +101,7 @@ class LightPush extends BaseProtocol implements ILightPush { }; } + //TODO: get a relevant peer for the topic/shard const peers = await this.getPeers({ maxBootstrapPeers: 1, numPeers: this.NUM_PEERS_PROTOCOL diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index 5af8ea9d28..0577d27cac 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -6,11 +6,14 @@ import type { IMessage, IMetaSetter, IProtoMessage, - IRateLimitProof + IRateLimitProof, + PubSubTopic } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; import debug from "debug"; +import { DefaultPubSubTopic } from "../constants.js"; + const log = debug("waku:message:version-0"); const OneMillion = BigInt(1_000_000); @@ -73,6 +76,7 @@ export class Encoder implements IEncoder { constructor( public contentTopic: string, public ephemeral: boolean = false, + public pubSubTopic: PubSubTopic, public metaSetter?: IMetaSetter ) { if (!contentTopic || contentTopic === "") { @@ -115,15 +119,19 @@ export class Encoder implements IEncoder { * messages. */ export function createEncoder({ + pubSubTopic = DefaultPubSubTopic, contentTopic, ephemeral, metaSetter }: EncoderOptions): Encoder { - return new Encoder(contentTopic, ephemeral, metaSetter); + return new Encoder(contentTopic, ephemeral, pubSubTopic, metaSetter); } export class Decoder implements IDecoder { - constructor(public contentTopic: string) { + constructor( + public pubSubTopic: PubSubTopic, + public contentTopic: string + ) { if (!contentTopic || contentTopic === "") { throw new Error("Content topic must be specified"); } @@ -173,6 +181,9 @@ export class Decoder implements IDecoder { * * @param contentTopic The resulting decoder will only decode messages with this content topic. */ -export function createDecoder(contentTopic: string): Decoder { - return new Decoder(contentTopic); +export function createDecoder( + contentTopic: string, + pubsubTopic: PubSubTopic = DefaultPubSubTopic +): Decoder { + return new Decoder(pubsubTopic, contentTopic); } diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 67bd856e5d..9ed938ff3c 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -6,10 +6,11 @@ import { IDecoder, IStore, Libp2p, - ProtocolCreateOptions + ProtocolCreateOptions, + PubSubTopic } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; -import { isDefined } from "@waku/utils"; +import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils"; import { concat, utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; import all from "it-all"; @@ -74,12 +75,12 @@ export interface QueryOptions { * The Waku Store protocol can be used to retrieved historical messages. */ class Store extends BaseProtocol implements IStore { - options: ProtocolCreateOptions; + private readonly pubSubTopics: PubSubTopic[]; private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(StoreCodec, libp2p.components); - this.options = options ?? {}; + this.pubSubTopics = options?.pubSubTopics ?? [DefaultPubSubTopic]; } /** @@ -206,12 +207,20 @@ class Store extends BaseProtocol implements IStore { * @throws If not able to reach a Waku Store peer to query, * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. + * + * This API only supports querying a single pubsub topic at a time. + * If multiple decoders are provided, they must all have the same pubsub topic. + * @throws If multiple decoders with different pubsub topics are provided. + * @throws If no decoders are provided. + * @throws If no decoders are found for the provided pubsub topic. */ async *queryGenerator( decoders: IDecoder[], options?: QueryOptions ): AsyncGenerator[]> { - const { pubSubTopic = DefaultPubSubTopic } = this.options; + if (decoders.length === 0) { + throw new Error("No decoders provided"); + } let startTime, endTime; @@ -220,6 +229,23 @@ class Store extends BaseProtocol implements IStore { endTime = options.timeFilter.endTime; } + // convert array to set to remove duplicates + const uniquePubSubTopicsInQuery = Array.from( + new Set(decoders.map((decoder) => decoder.pubSubTopic)) + ); + + // If multiple pubsub topics are provided, throw an error + if (uniquePubSubTopicsInQuery.length > 1) { + throw new Error( + "API does not support querying multiple pubsub topics at once" + ); + } + + // we can be certain that there is only one pubsub topic in the query + const pubSubTopicForQuery = uniquePubSubTopicsInQuery[0]; + + ensurePubsubTopicIsConfigured(pubSubTopicForQuery, this.pubSubTopics); + const decodersAsMap = new Map(); decoders.forEach((dec) => { if (decodersAsMap.has(dec.contentTopic)) { @@ -230,11 +256,17 @@ class Store extends BaseProtocol implements IStore { decodersAsMap.set(dec.contentTopic, dec); }); - const contentTopics = decoders.map((dec) => dec.contentTopic); + const contentTopics = decoders + .filter((decoder) => decoder.pubSubTopic === pubSubTopicForQuery) + .map((dec) => dec.contentTopic); + + if (contentTopics.length === 0) { + throw new Error("No decoders found for topic " + pubSubTopicForQuery); + } const queryOpts = Object.assign( { - pubSubTopic: pubSubTopic, + pubSubTopic: pubSubTopicForQuery, pageDirection: PageDirection.BACKWARD, pageSize: DefaultPageSize }, diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index c1411ef38d..84901e801f 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -96,15 +96,18 @@ async function waitForConnectedPeer(protocol: IBaseProtocol): Promise { } /** - * Wait for a peer with the given protocol to be connected and in the gossipsub - * mesh. + * Wait for at least one peer with the given protocol to be connected and in the gossipsub + * mesh for all pubSubTopics. */ async function waitForGossipSubPeerInMesh(waku: IRelay): Promise { let peers = waku.getMeshPeers(); + const pubSubTopics = waku.pubSubTopics; - while (peers.length == 0) { - await pEvent(waku.gossipSub, "gossipsub:heartbeat"); - peers = waku.getMeshPeers(); + for (const topic of pubSubTopics) { + while (peers.length == 0) { + await pEvent(waku.gossipSub, "gossipsub:heartbeat"); + peers = waku.getMeshPeers(topic); + } } } diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index eb71a867a1..c9383ffb2b 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -1,3 +1,5 @@ +import type { PubSubTopic } from "./misc.js"; + export interface IRateLimitProof { proof: Uint8Array; merkleRoot: Uint8Array; @@ -36,6 +38,7 @@ export interface IMetaSetter { } export interface EncoderOptions { + pubSubTopic?: PubSubTopic; /** The content topic to set on outgoing messages. */ contentTopic: string; /** @@ -52,6 +55,7 @@ export interface EncoderOptions { } export interface IEncoder { + pubSubTopic: PubSubTopic; contentTopic: string; ephemeral: boolean; toWire: (message: IMessage) => Promise; @@ -61,7 +65,7 @@ export interface IEncoder { export interface IDecodedMessage { payload: Uint8Array; contentTopic: string; - pubSubTopic: string; + pubSubTopic: PubSubTopic; timestamp: Date | undefined; rateLimitProof: IRateLimitProof | undefined; ephemeral: boolean | undefined; @@ -69,6 +73,7 @@ export interface IDecodedMessage { } export interface IDecoder { + pubSubTopic: PubSubTopic; contentTopic: string; fromWireToProtoObj: (bytes: Uint8Array) => Promise; fromProtoObj: ( diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 44f2fd56cd..059672f7d8 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -4,6 +4,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { Libp2pOptions } from "libp2p"; import type { IDecodedMessage } from "./message.js"; +import type { PubSubTopic } from "./misc.js"; export enum Protocols { Relay = "relay", @@ -22,18 +23,23 @@ export interface IBaseProtocol { export type ProtocolCreateOptions = { /** + * Waku supports usage of multiple pubsub topics, but this is still in early stages. + * Waku implements sharding to achieve scalability + * The format of the sharded topic is `/waku/2/rs//` + * To learn more about the sharding specifications implemented, see [Relay Sharding](https://rfc.vac.dev/spec/51/). * The PubSub Topic to use. Defaults to {@link @waku/core!DefaultPubSubTopic }. * - * One and only one pubsub topic is used by Waku. This is used by: + * If no pubsub topic is specified, the default pubsub topic is used. + * The set of pubsub topics that are used to initialize the Waku node, will need to be used by the protocols as well + * You cannot currently add or remove pubsub topics after initialization. + * This is used by: * - WakuRelay to receive, route and send messages, * - WakuLightPush to send messages, * - WakuStore to retrieve messages. - * - * The usage of the default pubsub topic is recommended. * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. * */ - pubSubTopic?: string; + pubSubTopics?: PubSubTopic[]; /** * You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property. * This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) @@ -71,6 +77,11 @@ export enum SendError { * Compressing the message or using an alternative strategy for large messages is recommended. */ SIZE_TOO_BIG = "Size is too big", + /** + * The PubSubTopic passed to the send function is not configured on the Waku node. + * Please ensure that the PubSubTopic is used when initializing the Waku node. + */ + TOPIC_NOT_CONFIGURED = "Topic not configured", /** * Failure to find a peer with suitable protocols. This may due to a connection issue. * Mitigation can be: retrying after a given time period, display connectivity issue diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index 522ab19576..f98f219ba4 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -1,6 +1,7 @@ import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import type { PeerIdStr, TopicStr } from "@chainsafe/libp2p-gossipsub/types"; +import { PubSubTopic } from "./misc.js"; import { IReceiver } from "./receiver.js"; import type { ISender } from "./sender.js"; @@ -12,6 +13,7 @@ import type { ISender } from "./sender.js"; * @property getMeshPeers - Function to retrieve the mesh peers for a given topic or all topics if none is specified. Returns an array of peer IDs as strings. */ export interface IRelayAPI { + readonly pubSubTopics: Set; readonly gossipSub: GossipSub; start: () => Promise; getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 5dc873e84c..4037cc3740 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -1,5 +1,6 @@ +import { DefaultPubSubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; -import { IMetaSetter } from "@waku/interfaces"; +import { IMetaSetter, PubSubTopic } from "@waku/interfaces"; import type { EncoderOptions as BaseEncoderOptions, IDecoder, @@ -32,6 +33,7 @@ const log = debug("waku:message-encryption:ecies"); class Encoder implements IEncoder { constructor( + public pubSubTopic: PubSubTopic, public contentTopic: string, private publicKey: Uint8Array, private sigPrivKey?: Uint8Array, @@ -95,6 +97,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubSubTopic = DefaultPubSubTopic, contentTopic, publicKey, sigPrivKey, @@ -102,6 +105,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( + pubSubTopic, contentTopic, publicKey, sigPrivKey, @@ -112,10 +116,11 @@ export function createEncoder({ class Decoder extends DecoderV0 implements IDecoder { constructor( + pubSubTopic: PubSubTopic, contentTopic: string, private privateKey: Uint8Array ) { - super(contentTopic); + super(pubSubTopic, contentTopic); } async fromProtoObj( @@ -183,7 +188,8 @@ class Decoder extends DecoderV0 implements IDecoder { */ export function createDecoder( contentTopic: string, - privateKey: Uint8Array + privateKey: Uint8Array, + pubSubTopic: PubSubTopic = DefaultPubSubTopic ): Decoder { - return new Decoder(contentTopic, privateKey); + return new Decoder(pubSubTopic, contentTopic, privateKey); } diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index f34f31af98..eb1c2511d2 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -1,3 +1,4 @@ +import { DefaultPubSubTopic } from "@waku/core"; import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; import type { EncoderOptions as BaseEncoderOptions, @@ -5,7 +6,8 @@ import type { IEncoder, IMessage, IMetaSetter, - IProtoMessage + IProtoMessage, + PubSubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import debug from "debug"; @@ -27,6 +29,7 @@ const log = debug("waku:message-encryption:symmetric"); class Encoder implements IEncoder { constructor( + public pubSubTopic: PubSubTopic, public contentTopic: string, private symKey: Uint8Array, private sigPrivKey?: Uint8Array, @@ -90,21 +93,30 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubSubTopic = DefaultPubSubTopic, contentTopic, symKey, sigPrivKey, ephemeral = false, metaSetter }: EncoderOptions): Encoder { - return new Encoder(contentTopic, symKey, sigPrivKey, ephemeral, metaSetter); + return new Encoder( + pubSubTopic, + contentTopic, + symKey, + sigPrivKey, + ephemeral, + metaSetter + ); } class Decoder extends DecoderV0 implements IDecoder { constructor( + pubSubTopic: PubSubTopic, contentTopic: string, private symKey: Uint8Array ) { - super(contentTopic); + super(pubSubTopic, contentTopic); } async fromProtoObj( @@ -172,7 +184,8 @@ class Decoder extends DecoderV0 implements IDecoder { */ export function createDecoder( contentTopic: string, - symKey: Uint8Array + symKey: Uint8Array, + pubSubTopic: PubSubTopic = DefaultPubSubTopic ): Decoder { - return new Decoder(contentTopic, symKey); + return new Decoder(pubSubTopic, contentTopic, symKey); } diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index 20dc7d2f50..bf258c0f6f 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -21,10 +21,12 @@ import { IRelay, Libp2p, ProtocolCreateOptions, + PubSubTopic, SendError, SendResult } from "@waku/interfaces"; -import { groupByContentTopic, isSizeValid, toAsyncIterator } from "@waku/utils"; +import { isSizeValid, toAsyncIterator } from "@waku/utils"; +import { pushOrInitMapSet } from "@waku/utils"; import debug from "debug"; import { RelayCodecs } from "./constants.js"; @@ -46,7 +48,7 @@ export type ContentTopic = string; * Throws if libp2p.pubsub does not support Waku Relay */ class Relay implements IRelay { - private readonly pubSubTopic: string; + public readonly pubSubTopics: Set; private defaultDecoder: IDecoder; public static multicodec: string = RelayCodecs[0]; @@ -56,7 +58,7 @@ class Relay implements IRelay { * observers called when receiving new message. * Observers under key `""` are always called. */ - private observers: Map>; + private observers: Map>>; constructor(libp2p: Libp2p, options?: Partial) { if (!this.isRelayPubSub(libp2p.services.pubsub)) { @@ -66,21 +68,22 @@ class Relay implements IRelay { } this.gossipSub = libp2p.services.pubsub as GossipSub; - this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; + this.pubSubTopics = new Set(options?.pubSubTopics ?? [DefaultPubSubTopic]); if (this.gossipSub.isStarted()) { - this.gossipSubSubscribe(this.pubSubTopic); + this.subscribeToAllTopics(); } this.observers = new Map(); + // Default PubSubTopic decoder // TODO: User might want to decide what decoder should be used (e.g. for RLN) this.defaultDecoder = new TopicOnlyDecoder(); } /** * Mounts the gossipsub protocol onto the libp2p node - * and subscribes to the default topic. + * and subscribes to all the topics. * * @override * @returns {void} @@ -91,7 +94,7 @@ class Relay implements IRelay { } await this.gossipSub.start(); - this.gossipSubSubscribe(this.pubSubTopic); + this.subscribeToAllTopics(); } /** @@ -99,6 +102,16 @@ class Relay implements IRelay { */ public async send(encoder: IEncoder, message: IMessage): Promise { const recipients: PeerId[] = []; + + const { pubSubTopic } = encoder; + if (!this.pubSubTopics.has(pubSubTopic)) { + log("Failed to send waku relay: topic not configured"); + return { + recipients, + errors: [SendError.TOPIC_NOT_CONFIGURED] + }; + } + if (!isSizeValid(message.payload)) { log("Failed to send waku relay: message is bigger that 1MB"); return { @@ -116,50 +129,49 @@ class Relay implements IRelay { }; } - return this.gossipSub.publish(this.pubSubTopic, msg); + return this.gossipSub.publish(pubSubTopic, msg); } - /** - * Add an observer and associated Decoder to process incoming messages on a given content topic. - * - * @returns Function to delete the observer - */ public subscribe( decoders: IDecoder | IDecoder[], callback: Callback ): () => void { - const contentTopicToObservers = Array.isArray(decoders) - ? toObservers(decoders, callback) - : toObservers([decoders], callback); - - for (const contentTopic of contentTopicToObservers.keys()) { - const currObservers = this.observers.get(contentTopic) || new Set(); - const newObservers = - contentTopicToObservers.get(contentTopic) || new Set(); - - this.observers.set(contentTopic, union(currObservers, newObservers)); + const observers: Array<[PubSubTopic, Observer]> = []; + + for (const decoder of Array.isArray(decoders) ? decoders : [decoders]) { + const { pubSubTopic } = decoder; + const ctObs: Map>> = this.observers.get( + pubSubTopic + ) ?? new Map(); + const observer = { pubSubTopic, decoder, callback }; + pushOrInitMapSet(ctObs, decoder.contentTopic, observer); + + this.observers.set(pubSubTopic, ctObs); + observers.push([pubSubTopic, observer]); } return () => { - for (const contentTopic of contentTopicToObservers.keys()) { - const currentObservers = this.observers.get(contentTopic) || new Set(); - const observersToRemove = - contentTopicToObservers.get(contentTopic) || new Set(); - - const nextObservers = leftMinusJoin( - currentObservers, - observersToRemove - ); - - if (nextObservers.size) { - this.observers.set(contentTopic, nextObservers); - } else { - this.observers.delete(contentTopic); - } - } + this.removeObservers(observers); }; } + private removeObservers( + observers: Array<[PubSubTopic, Observer]> + ): void { + for (const [pubSubTopic, observer] of observers) { + const ctObs = this.observers.get(pubSubTopic); + if (!ctObs) continue; + + const contentTopic = observer.decoder.contentTopic; + const _obs = ctObs.get(contentTopic); + if (!_obs) continue; + + _obs.delete(observer); + ctObs.set(contentTopic, _obs); + this.observers.set(pubSubTopic, ctObs); + } + } + public toSubscriptionIterator( decoders: IDecoder | IDecoder[] ): Promise> { @@ -168,12 +180,20 @@ class Relay implements IRelay { public getActiveSubscriptions(): ActiveSubscriptions { const map = new Map(); - map.set(this.pubSubTopic, this.observers.keys()); + for (const pubSubTopic of this.pubSubTopics) { + map.set(pubSubTopic, Array.from(this.observers.keys())); + } return map; } - public getMeshPeers(topic?: TopicStr): PeerIdStr[] { - return this.gossipSub.getMeshPeers(topic ?? this.pubSubTopic); + public getMeshPeers(topic: TopicStr = DefaultPubSubTopic): PeerIdStr[] { + return this.gossipSub.getMeshPeers(topic); + } + + private subscribeToAllTopics(): void { + for (const pubSubTopic of this.pubSubTopics) { + this.gossipSubSubscribe(pubSubTopic); + } } private async processIncomingMessage( @@ -186,12 +206,20 @@ class Relay implements IRelay { return; } - const observers = this.observers.get(topicOnlyMsg.contentTopic) as Set< + // Retrieve the map of content topics for the given pubSubTopic + const contentTopicMap = this.observers.get(pubSubTopic); + if (!contentTopicMap) { + return; + } + + // Retrieve the set of observers for the given contentTopic + const observers = contentTopicMap.get(topicOnlyMsg.contentTopic) as Set< Observer >; if (!observers) { return; } + await Promise.all( Array.from(observers).map(({ decoder, callback }) => { return (async () => { @@ -241,7 +269,7 @@ class Relay implements IRelay { } private isRelayPubSub(pubsub: PubSub | undefined): boolean { - return pubsub?.multicodecs?.includes(Relay.multicodec) || false; + return pubsub?.multicodecs?.includes(Relay.multicodec) ?? false; } } @@ -267,46 +295,3 @@ export function wakuGossipSub( return pubsub; }; } - -function toObservers( - decoders: IDecoder[], - callback: Callback -): Map>> { - const contentTopicToDecoders = Array.from( - groupByContentTopic(decoders).entries() - ); - - const contentTopicToObserversEntries = contentTopicToDecoders.map( - ([contentTopic, decoders]) => - [ - contentTopic, - new Set( - decoders.map( - (decoder) => - ({ - decoder, - callback - }) as Observer - ) - ) - ] as [ContentTopic, Set>] - ); - - return new Map(contentTopicToObserversEntries); -} - -function union(left: Set, right: Set): Set { - for (const val of right.values()) { - left.add(val); - } - return left; -} - -function leftMinusJoin(left: Set, right: Set): Set { - for (const val of right.values()) { - if (left.has(val)) { - left.delete(val); - } - } - return left; -} diff --git a/packages/relay/src/topic_only_message.ts b/packages/relay/src/topic_only_message.ts index 0a81f7a6cd..845280b860 100644 --- a/packages/relay/src/topic_only_message.ts +++ b/packages/relay/src/topic_only_message.ts @@ -1,3 +1,4 @@ +import { DefaultPubSubTopic } from "@waku/core"; import type { IDecodedMessage, IDecoder, @@ -26,6 +27,7 @@ export class TopicOnlyMessage implements IDecodedMessage { } export class TopicOnlyDecoder implements IDecoder { + pubSubTopic = DefaultPubSubTopic; public contentTopic = ""; fromWireToProtoObj(bytes: Uint8Array): Promise { diff --git a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts index 23ee24240c..fa8247a786 100644 --- a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts @@ -1,15 +1,11 @@ +import { createEncoder } from "@waku/core"; import { LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; -import { - messageText, - runNodes, - TestContentTopic, - TestEncoder -} from "./utils.js"; +import { messageText, runNodes, TestContentTopic } from "./utils.js"; describe("Waku Light Push [node only] - custom pubsub topic", function () { this.timeout(15000); @@ -36,7 +32,12 @@ describe("Waku Light Push [node only] - custom pubsub topic", function () { it("Push message", async function () { const nimPeerId = await nwaku.getPeerId(); - const pushResponse = await waku.lightPush.send(TestEncoder, { + const testEncoder = createEncoder({ + contentTopic: TestContentTopic, + pubSubTopic: customPubSubTopic + }); + + const pushResponse = await waku.lightPush.send(testEncoder, { payload: utf8ToBytes(messageText) }); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index cbe4cc32cb..9e2d4b03b7 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -30,7 +30,7 @@ export async function runNodes( let waku: LightNode | undefined; try { waku = await createLightNode({ - pubSubTopic, + pubSubTopics: pubSubTopic ? [pubSubTopic] : undefined, staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts index 4d48f477f6..ecbae84ce0 100644 --- a/packages/tests/tests/relay.node.spec.ts +++ b/packages/tests/tests/relay.node.spec.ts @@ -259,6 +259,15 @@ describe("Waku Relay [node only]", () => { let waku1: RelayNode; let waku2: RelayNode; let waku3: RelayNode; + + const pubSubTopic = "/some/pubsub/topic"; + + const CustomTopicEncoder = createEncoder({ + contentTopic: TestContentTopic, + pubSubTopic: pubSubTopic + }); + const CustomTopicDecoder = createDecoder(TestContentTopic, pubSubTopic); + afterEach(async function () { !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); @@ -271,17 +280,15 @@ describe("Waku Relay [node only]", () => { it("Publish", async function () { this.timeout(10000); - const pubSubTopic = "/some/pubsub/topic"; - // 1 and 2 uses a custom pubsub // 3 uses the default pubsub [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), @@ -310,7 +317,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([TestDecoder], resolve); + void waku2.relay.subscribe([CustomTopicDecoder], resolve); } ); @@ -323,7 +330,7 @@ describe("Waku Relay [node only]", () => { } ); - await waku1.relay.send(TestEncoder, { + await waku1.relay.send(CustomTopicEncoder, { payload: utf8ToBytes(messageText) }); @@ -338,16 +345,14 @@ describe("Waku Relay [node only]", () => { this.timeout(10000); const MB = 1024 ** 2; - const pubSubTopic = "/some/pubsub/topic"; - // 1 and 2 uses a custom pubsub [waku1, waku2] = await Promise.all([ createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubSubTopic: pubSubTopic, + pubSubTopics: [pubSubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) @@ -365,7 +370,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([TestDecoder], () => + void waku2.relay.subscribe([CustomTopicDecoder], () => resolve({ payload: new Uint8Array([]) } as DecodedMessage) @@ -373,18 +378,18 @@ describe("Waku Relay [node only]", () => { } ); - let sendResult = await waku1.relay.send(TestEncoder, { + let sendResult = await waku1.relay.send(CustomTopicEncoder, { payload: generateRandomUint8Array(1 * MB) }); expect(sendResult.recipients.length).to.eq(1); - sendResult = await waku1.relay.send(TestEncoder, { + sendResult = await waku1.relay.send(CustomTopicEncoder, { payload: generateRandomUint8Array(1 * MB + 65536) }); expect(sendResult.recipients.length).to.eq(0); expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG); - sendResult = await waku1.relay.send(TestEncoder, { + sendResult = await waku1.relay.send(CustomTopicEncoder, { payload: generateRandomUint8Array(2 * MB) }); expect(sendResult.recipients.length).to.eq(0); diff --git a/packages/tests/tests/sharding.spec.ts b/packages/tests/tests/sharding.spec.ts new file mode 100644 index 0000000000..3e830e3e7f --- /dev/null +++ b/packages/tests/tests/sharding.spec.ts @@ -0,0 +1,79 @@ +import { createLightNode, LightNode, utf8ToBytes } from "@waku/sdk"; +import { createEncoder } from "@waku/sdk"; +import chai, { expect } from "chai"; +import chaiAsPromised from "chai-as-promised"; + +import { makeLogFileName } from "../src/log_file.js"; +import { NimGoNode } from "../src/node/node.js"; + +const PubSubTopic1 = "/waku/2/rs/0/2"; +const PubSubTopic2 = "/waku/2/rs/0/3"; + +const ContentTopic = "/waku/2/content/test"; + +chai.use(chaiAsPromised); + +describe("Static Sharding", () => { + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15_000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ store: true, lightpush: true, relay: true }); + }); + + afterEach(async function () { + !!nwaku && + nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e)); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + it("configure the node with multiple pubsub topics", async function () { + this.timeout(15_000); + waku = await createLightNode({ + pubSubTopics: [PubSubTopic1, PubSubTopic2] + }); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubSubTopic: PubSubTopic1 + }); + + const encoder2 = createEncoder({ + contentTopic: ContentTopic, + pubSubTopic: PubSubTopic2 + }); + + const request1 = waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + + const request2 = waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + + await expect(request1).to.be.fulfilled; + await expect(request2).to.be.fulfilled; + }); + + it("using a protocol with unconfigured pubsub topic should fail", async function () { + this.timeout(15_000); + waku = await createLightNode({ + pubSubTopics: [PubSubTopic1] + }); + + // use a pubsub topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubSubTopic: PubSubTopic2 + }); + + // the following request should throw an error + const request = waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + await expect(request).to.be.rejectedWith(Error); + }); +}); diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 53736af4bd..09f0ddd324 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -566,6 +566,11 @@ describe("Waku Store, custom pubsub topic", () => { let waku: LightNode; let nwaku: NimGoNode; + const CustomPubSubTestDecoder = createDecoder( + TestContentTopic, + customPubSubTopic + ); + beforeEach(async function () { this.timeout(15_000); nwaku = new NimGoNode(makeLogFileName(this)); @@ -600,7 +605,7 @@ describe("Waku Store, custom pubsub topic", () => { waku = await createLightNode({ staticNoiseKey: NOISE_KEY_1, - pubSubTopic: customPubSubTopic + pubSubTopics: [customPubSubTopic] }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); @@ -608,7 +613,9 @@ describe("Waku Store, custom pubsub topic", () => { const messages: IMessage[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + for await (const msgPromises of waku.store.queryGenerator([ + CustomPubSubTestDecoder + ])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 575ada689d..3034fdbcb1 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -1,4 +1,4 @@ -import { waitForRemotePeer } from "@waku/core"; +import { DefaultPubSubTopic, waitForRemotePeer } from "@waku/core"; import type { LightNode, RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createLightNode, createRelayNode } from "@waku/sdk"; @@ -39,7 +39,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await delay(1000); await waitForRemotePeer(waku1, [Protocols.Relay]); - const peers = waku1.relay.getMeshPeers(); + const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -262,7 +262,7 @@ describe("Wait for remote peer", function () { await waku1.dial(multiAddrWithId); await waitForRemotePeer(waku1); - const peers = waku1.relay.getMeshPeers(); + const peers = waku1.relay.getMeshPeers(DefaultPubSubTopic); const nimPeerId = multiAddrWithId.getPeerId(); diff --git a/packages/utils/package.json b/packages/utils/package.json index 28cca196dd..d4b3f25a42 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -65,16 +65,16 @@ "node": ">=18" }, "dependencies": { + "chai": "^4.3.8", "debug": "^4.3.4", - "uint8arrays": "^4.0.4", - "@waku/interfaces": "0.0.18" + "uint8arrays": "^4.0.4" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.4", "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.1.0", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.17", + "@waku/interfaces": "0.0.18", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^3.29.2" diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index f73240fbeb..f834bc22c4 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -3,6 +3,8 @@ export * from "./random_subset.js"; export * from "./group_by.js"; export * from "./to_async_iterator.js"; export * from "./is_size_valid.js"; +export * from "./sharding.js"; +export * from "./push_or_init_map.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value); diff --git a/packages/core/src/lib/push_or_init_map.spec.ts b/packages/utils/src/common/push_or_init_map.spec.ts similarity index 100% rename from packages/core/src/lib/push_or_init_map.spec.ts rename to packages/utils/src/common/push_or_init_map.spec.ts diff --git a/packages/core/src/lib/push_or_init_map.ts b/packages/utils/src/common/push_or_init_map.ts similarity index 100% rename from packages/core/src/lib/push_or_init_map.ts rename to packages/utils/src/common/push_or_init_map.ts diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts new file mode 100644 index 0000000000..f14ba20d69 --- /dev/null +++ b/packages/utils/src/common/sharding.ts @@ -0,0 +1,12 @@ +import type { PubSubTopic } from "@waku/interfaces"; + +export function ensurePubsubTopicIsConfigured( + pubsubTopic: PubSubTopic, + configuredTopics: PubSubTopic[] +): void { + if (!configuredTopics.includes(pubsubTopic)) { + throw new Error( + `PubSub topic ${pubsubTopic} has not been configured on this instance. Configured topics are: ${configuredTopics}. Please update your configuration by passing in the topic during Waku node instantiation.` + ); + } +}