diff --git a/.size-limit.cjs b/.size-limit.cjs index 588546904b..3fe0b6f7f7 100644 --- a/.size-limit.cjs +++ b/.size-limit.cjs @@ -33,9 +33,14 @@ module.exports = [ import: "{ wakuRelay }", }, { - name: "Light protocols", + name: "Waku Filter", path: "packages/core/bundle/index.js", - import: "{ wakuLightPush, wakuFilter }", + import: "{ wakuFilter }", + }, + { + name: "Waku LightPush", + path: "packages/sdk/bundle/index.js", + import: "{ wakuLightPush }", }, { name: "History retrieval protocols", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index a0b8371168..716b06c100 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -10,7 +10,7 @@ export * as waku_filter from "./lib/filter/index.js"; export { wakuFilter, FilterCodecs } from "./lib/filter/index.js"; export * as waku_light_push from "./lib/light_push/index.js"; -export { LightPushCodec, wakuLightPush } from "./lib/light_push/index.js"; +export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index aea1ab1847..6ede084500 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -1,7 +1,7 @@ import type { Libp2p } from "@libp2p/interface"; import type { Peer, PeerStore, Stream } from "@libp2p/interface"; import type { - IBaseProtocol, + IBaseProtocolCore, Libp2pComponents, ProtocolCreateOptions, PubsubTopic @@ -16,27 +16,22 @@ import { import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; -const DEFAULT_NUM_PEERS_TO_USE = 3; - /** * A class with predefined helpers, to be used as a base to implement Waku * Protocols. */ -export class BaseProtocol implements IBaseProtocol { +export class BaseProtocol implements IBaseProtocolCore { public readonly addLibp2pEventListener: Libp2p["addEventListener"]; public readonly removeLibp2pEventListener: Libp2p["removeEventListener"]; - readonly numPeersToUse: number; protected streamManager: StreamManager; constructor( public multicodec: string, private components: Libp2pComponents, private log: Logger, - protected pubsubTopics: PubsubTopic[], + public readonly pubsubTopics: PubsubTopic[], private options?: ProtocolCreateOptions ) { - this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; - this.addLibp2pEventListener = components.events.addEventListener.bind( components.events ); @@ -86,7 +81,7 @@ export class BaseProtocol implements IBaseProtocol { * @returns A list of peers that support the protocol sorted by latency. */ - protected async getPeers( + async getPeers( { numPeers, maxBootstrapPeers diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index cadf1720c2..8936ef27bb 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -340,6 +340,8 @@ class Subscription { } } +const DEFAULT_NUM_PEERS = 3; + class Filter extends BaseProtocol implements IReceiver { private activeSubscriptions = new Map(); @@ -357,6 +359,9 @@ class Filter extends BaseProtocol implements IReceiver { return subscription; } + //TODO: Remove when FilterCore and FilterSDK are introduced + private readonly numPeersToUse: number; + constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super( FilterCodecs.SUBSCRIBE, @@ -366,6 +371,8 @@ class Filter extends BaseProtocol implements IReceiver { options ); + this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS; + libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log.error("Failed to register ", FilterCodecs.PUSH, e); }); diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index d983ea5233..5c34160889 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -1,18 +1,15 @@ -import type { PeerId, Stream } from "@libp2p/interface"; +import type { Peer, PeerId, Stream } from "@libp2p/interface"; import { + Failure, + IBaseProtocolCore, IEncoder, - ILightPush, IMessage, Libp2p, ProtocolCreateOptions, - SendError, - SendResult + SendError } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { - ensurePubsubTopicIsConfigured, - isMessageSizeUnderCap -} from "@waku/utils"; +import { isMessageSizeUnderCap } from "@waku/utils"; import { Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -38,10 +35,20 @@ type PreparePushMessageResult = error: SendError; }; +type CoreSendResult = + | { + success: null; + failure: Failure; + } + | { + success: PeerId; + failure: null; + }; + /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ -class LightPush extends BaseProtocol implements ILightPush { +export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super( LightPushCodec, @@ -54,8 +61,7 @@ class LightPush extends BaseProtocol implements ILightPush { private async preparePushMessage( encoder: IEncoder, - message: IMessage, - pubsubTopic: string + message: IMessage ): Promise { try { if (!message.payload || message.payload.length === 0) { @@ -77,7 +83,7 @@ class LightPush extends BaseProtocol implements ILightPush { }; } - const query = PushRpc.createRequest(protoMessage, pubsubTopic); + const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic); return { query, error: null }; } catch (error) { log.error("Failed to prepare push message", error); @@ -89,116 +95,104 @@ class LightPush extends BaseProtocol implements ILightPush { } } - async send(encoder: IEncoder, message: IMessage): Promise { - const { pubsubTopic } = encoder; - ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); - - const recipients: PeerId[] = []; - + async send( + encoder: IEncoder, + message: IMessage, + peer: Peer + ): Promise { const { query, error: preparationError } = await this.preparePushMessage( encoder, - message, - pubsubTopic + message ); if (preparationError || !query) { return { - recipients, - errors: [preparationError] + success: null, + failure: { + error: preparationError, + peerId: peer.id + } }; } - const peers = await this.getPeers({ - maxBootstrapPeers: 1, - numPeers: this.numPeersToUse - }); - - if (!peers.length) { + let stream: Stream | undefined; + try { + stream = await this.getStream(peer); + } catch (err) { + log.error( + `Failed to get a stream for remote peer${peer.id.toString()}`, + err + ); return { - recipients, - errors: [SendError.NO_PEER_AVAILABLE] + success: null, + failure: { + error: SendError.REMOTE_PEER_FAULT, + peerId: peer.id + } }; } - const promises = peers.map(async (peer) => { - let stream: Stream | undefined; - try { - stream = await this.getStream(peer); - } catch (err) { - log.error( - `Failed to get a stream for remote peer${peer.id.toString()}`, - err - ); - return { recipients, error: SendError.REMOTE_PEER_FAULT }; - } - - let res: Uint8ArrayList[] | undefined; - try { - res = await pipe( - [query.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); - } catch (err) { - log.error("Failed to send waku light push request", err); - return { recipients, error: SendError.GENERIC_FAIL }; - } - - const bytes = new Uint8ArrayList(); - res.forEach((chunk) => { - bytes.append(chunk); - }); - - let response: PushResponse | undefined; - try { - response = PushRpc.decode(bytes).response; - } catch (err) { - log.error("Failed to decode push reply", err); - return { recipients, error: SendError.DECODE_FAILED }; - } + let res: Uint8ArrayList[] | undefined; + try { + res = await pipe( + [query.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + } catch (err) { + log.error("Failed to send waku light push request", err); + return { + success: null, + failure: { + error: SendError.GENERIC_FAIL, + peerId: peer.id + } + }; + } - if (!response) { - log.error("Remote peer fault: No response in PushRPC"); - return { recipients, error: SendError.REMOTE_PEER_FAULT }; - } + const bytes = new Uint8ArrayList(); + res.forEach((chunk) => { + bytes.append(chunk); + }); - if (!response.isSuccess) { - log.error("Remote peer rejected the message: ", response.info); - return { recipients, error: SendError.REMOTE_PEER_REJECTED }; - } + let response: PushResponse | undefined; + try { + response = PushRpc.decode(bytes).response; + } catch (err) { + log.error("Failed to decode push reply", err); + return { + success: null, + failure: { + error: SendError.DECODE_FAILED, + peerId: peer.id + } + }; + } - recipients.some((recipient) => recipient.equals(peer.id)) || - recipients.push(peer.id); + if (!response) { + log.error("Remote peer fault: No response in PushRPC"); + return { + success: null, + failure: { + error: SendError.REMOTE_PEER_FAULT, + peerId: peer.id + } + }; + } - return { recipients }; - }); + if (!response.isSuccess) { + log.error("Remote peer rejected the message: ", response.info); + return { + success: null, + failure: { + error: SendError.REMOTE_PEER_REJECTED, + peerId: peer.id + } + }; + } - const results = await Promise.allSettled(promises); - - // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) - const errors = results - .filter( - ( - result - ): result is PromiseFulfilledResult<{ - recipients: PeerId[]; - error: SendError | undefined; - }> => result.status === "fulfilled" - ) - .map((result) => result.value.error) - .filter((error) => error !== undefined) as SendError[]; - - return { - recipients, - errors - }; + return { success: peer.id, failure: null }; } } - -export function wakuLightPush( - init: Partial = {} -): (libp2p: Libp2p) => ILightPush { - return (libp2p: Libp2p) => new LightPush(libp2p, init); -} diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index ae5d5cf849..487df7c243 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -1,10 +1,16 @@ import type { IdentifyResult } from "@libp2p/interface"; -import type { IBaseProtocol, IMetadata, IRelay, Waku } from "@waku/interfaces"; +import type { + IBaseProtocolCore, + IMetadata, + IRelay, + Waku +} from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { pEvent } from "p-event"; const log = new Logger("wait-for-remote-peer"); +//TODO: move this function within the Waku class: https://github.com/waku-org/js-waku/issues/1761 /** * Wait for a remote peer to be ready given the passed protocols. * Must be used after attempting to connect to nodes, using @@ -53,7 +59,10 @@ export async function waitForRemotePeer( if (!waku.lightPush) throw new Error("Cannot wait for LightPush peer: protocol not mounted"); promises.push( - waitForConnectedPeer(waku.lightPush, waku.libp2p.services.metadata) + waitForConnectedPeer( + waku.lightPush.protocol, + waku.libp2p.services.metadata + ) ); } @@ -76,12 +85,13 @@ export async function waitForRemotePeer( } } +//TODO: move this function within protocol SDK class: https://github.com/waku-org/js-waku/issues/1761 /** * Wait for a peer with the given protocol to be connected. * If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service. */ async function waitForConnectedPeer( - protocol: IBaseProtocol, + protocol: IBaseProtocolCore, metadataService?: IMetadata ): Promise { const codec = protocol.multicodec; diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 52f817580b..76139170be 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -2,7 +2,7 @@ import type { PeerId } from "@libp2p/interface"; import type { IDecodedMessage, IDecoder, SingleShardInfo } from "./message.js"; import type { ContentTopic, PubsubTopic } from "./misc.js"; -import type { Callback, IBaseProtocol } from "./protocols.js"; +import type { Callback, IBaseProtocolCore } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; export type ContentFilter = { @@ -23,7 +23,7 @@ export interface IFilterSubscription { } export type IFilter = IReceiver & - IBaseProtocol & { + IBaseProtocolCore & { createSubscription( pubsubTopicShardInfo?: SingleShardInfo | PubsubTopic, peerId?: PeerId diff --git a/packages/interfaces/src/light_push.ts b/packages/interfaces/src/light_push.ts index 9c6107a2b9..26bc8bacdc 100644 --- a/packages/interfaces/src/light_push.ts +++ b/packages/interfaces/src/light_push.ts @@ -1,4 +1,5 @@ -import type { IBaseProtocol } from "./protocols.js"; +import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js"; import type { ISender } from "./sender.js"; -export type ILightPush = ISender & IBaseProtocol; +export type ILightPushSDK = ISender & + IBaseProtocolSDK & { protocol: IBaseProtocolCore }; diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index 120e755fb6..a847fce52d 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,10 +1,10 @@ import type { PeerId } from "@libp2p/interface"; import type { ShardInfo } from "./enr.js"; -import type { IBaseProtocol, ShardingParams } from "./protocols.js"; +import type { IBaseProtocolCore, ShardingParams } from "./protocols.js"; // IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol -export interface IMetadata extends Omit { +export interface IMetadata extends Omit { shardInfo: ShardingParams; confirmOrAttemptHandshake(peerId: PeerId): Promise; query(peerId: PeerId): Promise; diff --git a/packages/interfaces/src/peer_exchange.ts b/packages/interfaces/src/peer_exchange.ts index 9e713fcdb6..fc64965cbe 100644 --- a/packages/interfaces/src/peer_exchange.ts +++ b/packages/interfaces/src/peer_exchange.ts @@ -3,9 +3,9 @@ import type { PeerStore } from "@libp2p/interface"; import type { ConnectionManager } from "@libp2p/interface-internal"; import { IEnr } from "./enr.js"; -import { IBaseProtocol } from "./protocols.js"; +import { IBaseProtocolCore } from "./protocols.js"; -export interface IPeerExchange extends IBaseProtocol { +export interface IPeerExchange extends IBaseProtocolCore { query(params: PeerExchangeQueryParams): Promise; } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0d89834bf8..8d64e83b46 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -14,7 +14,7 @@ export enum Protocols { Filter = "filter" } -export interface IBaseProtocol { +export type IBaseProtocolCore = { shardInfo?: ShardInfo; multicodec: string; peerStore: PeerStore; @@ -22,7 +22,11 @@ export interface IBaseProtocol { connectedPeers: () => Promise; addLibp2pEventListener: Libp2p["addEventListener"]; removeLibp2pEventListener: Libp2p["removeEventListener"]; -} +}; + +export type IBaseProtocolSDK = { + numPeers: number; +}; export type ContentTopicInfo = { clusterId: number; @@ -145,7 +149,12 @@ export enum SendError { REMOTE_PEER_REJECTED = "Remote peer rejected" } +export interface Failure { + error: SendError; + peerId?: PeerId; +} + export interface SendResult { - errors?: SendError[]; - recipients: PeerId[]; + failures?: Failure[]; + successes: PeerId[]; } diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index 9127d35d4d..7780801301 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -1,5 +1,5 @@ import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { IBaseProtocol } from "./protocols.js"; +import type { IBaseProtocolCore } from "./protocols.js"; export enum PageDirection { BACKWARD = "backward", @@ -45,7 +45,7 @@ export type StoreQueryOptions = { cursor?: Cursor; }; -export interface IStore extends IBaseProtocol { +export interface IStore extends IBaseProtocolCore { queryWithOrderedCallback: ( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index ffb6118ca3..0948d05369 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -4,7 +4,7 @@ import type { Multiaddr } from "@multiformats/multiaddr"; import { IConnectionManager } from "./connection_manager.js"; import type { IFilter } from "./filter.js"; import type { Libp2p } from "./libp2p.js"; -import type { ILightPush } from "./light_push.js"; +import type { ILightPushSDK } from "./light_push.js"; import { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { IStore } from "./store.js"; @@ -14,7 +14,7 @@ export interface Waku { relay?: IRelay; store?: IStore; filter?: IFilter; - lightPush?: ILightPush; + lightPush?: ILightPushSDK; connectionManager: IConnectionManager; @@ -33,7 +33,7 @@ export interface LightNode extends Waku { relay: undefined; store: IStore; filter: IFilter; - lightPush: ILightPush; + lightPush: ILightPushSDK; } export interface RelayNode extends Waku { @@ -47,5 +47,5 @@ export interface FullNode extends Waku { relay: IRelay; store: IStore; filter: IFilter; - lightPush: ILightPush; + lightPush: ILightPushSDK; } diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index a5f6f241f7..e2e524fd3e 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -100,14 +100,18 @@ class Relay implements IRelay { * Send Waku message. */ public async send(encoder: IEncoder, message: IMessage): Promise { - const recipients: PeerId[] = []; + const successes: PeerId[] = []; const { pubsubTopic } = encoder; if (!this.pubsubTopics.has(pubsubTopic)) { log.error("Failed to send waku relay: topic not configured"); return { - recipients, - errors: [SendError.TOPIC_NOT_CONFIGURED] + successes, + failures: [ + { + error: SendError.TOPIC_NOT_CONFIGURED + } + ] }; } @@ -115,20 +119,31 @@ class Relay implements IRelay { if (!msg) { log.error("Failed to encode message, aborting publish"); return { - recipients, - errors: [SendError.ENCODE_FAILED] + successes, + failures: [ + { + error: SendError.ENCODE_FAILED + } + ] }; } if (!isWireSizeUnderCap(msg)) { log.error("Failed to send waku relay: message is bigger that 1MB"); return { - recipients, - errors: [SendError.SIZE_TOO_BIG] + successes, + failures: [ + { + error: SendError.SIZE_TOO_BIG + } + ] }; } - return this.gossipSub.publish(pubsubTopic, msg); + const { recipients } = await this.gossipSub.publish(pubsubTopic, msg); + return { + successes: recipients + }; } public subscribe( diff --git a/packages/sdk/package.json b/packages/sdk/package.json index bd4b87b6be..0387c2bc12 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -10,8 +10,8 @@ "import": "./dist/index.js" }, "./relay": { - "types": "./dist/relay/index.d.ts", - "import": "./dist/relay/index.js" + "types": "./dist/relay-node/index.d.ts", + "import": "./dist/relay-node/index.js" } }, "typesVersions": { diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 0cdf46482b..4ef5d65353 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -10,7 +10,10 @@ export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes"; export { defaultLibp2p } from "./utils/libp2p.js"; export * from "./utils/content_topic.js"; export * from "./waku.js"; -export * from "./create.js"; + +export { createLightNode, createNode } from "./light-node/index.js"; +export { wakuLightPush } from "./protocols/light_push.js"; + export * as waku from "@waku/core"; export * as utils from "@waku/utils"; export * from "@waku/interfaces"; diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/light-node/index.ts similarity index 83% rename from packages/sdk/src/create.ts rename to packages/sdk/src/light-node/index.ts index ba3e6d9ab9..073b485f62 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/light-node/index.ts @@ -1,8 +1,9 @@ -import { wakuFilter, wakuLightPush, wakuStore } from "@waku/core"; +import { wakuFilter, wakuStore } from "@waku/core"; import { type Libp2pComponents, type LightNode } from "@waku/interfaces"; -import { createLibp2pAndUpdateOptions } from "./utils/libp2p.js"; -import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "./waku.js"; +import { wakuLightPush } from "../protocols/light_push.js"; +import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; +import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; export { Libp2pComponents }; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts new file mode 100644 index 0000000000..42dfa8a3b2 --- /dev/null +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -0,0 +1,15 @@ +import { IBaseProtocolSDK } from ".."; + +interface Options { + numPeersToUse?: number; +} + +const DEFAULT_NUM_PEERS_TO_USE = 3; + +export class BaseProtocolSDK implements IBaseProtocolSDK { + public readonly numPeers: number; + + constructor(options: Options) { + this.numPeers = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; + } +} diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts new file mode 100644 index 0000000000..aca7d78b8a --- /dev/null +++ b/packages/sdk/src/protocols/light_push.ts @@ -0,0 +1,88 @@ +import type { PeerId } from "@libp2p/interface"; +import { LightPushCore } from "@waku/core"; +import { + Failure, + type IEncoder, + ILightPushSDK, + type IMessage, + type Libp2p, + type ProtocolCreateOptions, + SendError, + type SendResult +} from "@waku/interfaces"; +import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; + +import { BaseProtocolSDK } from "./base_protocol.js"; + +const DEFAULT_NUM_PEERS = 3; +const log = new Logger("sdk:light-push"); + +export class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { + public readonly protocol: LightPushCore; + + constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { + super({ numPeersToUse: options?.numPeersToUse ?? DEFAULT_NUM_PEERS }); + this.protocol = new LightPushCore(libp2p, options); + } + + async send(encoder: IEncoder, message: IMessage): Promise { + const successes: PeerId[] = []; + const failures: Failure[] = []; + + const { pubsubTopic } = encoder; + try { + ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); + } catch (error) { + log.error("Failed to send waku light push: pubsub topic not configured"); + return { + failures: [ + { + error: SendError.TOPIC_NOT_CONFIGURED + } + ], + successes: [] + }; + } + + const peers = await this.protocol.getPeers(); + if (!peers.length) { + return { + successes, + failures: [{ error: SendError.NO_PEER_AVAILABLE }] + }; + } + + const sendPromises = peers.map((peer) => + this.protocol.send(encoder, message, peer) + ); + + const results = await Promise.allSettled(sendPromises); + + for (const result of results) { + if (result.status === "fulfilled") { + const { failure, success } = result.value; + if (success) { + successes.push(success); + } + if (failure) { + failures.push(failure); + } + } else { + log.error("Failed to send message to peer", result.reason); + failures.push({ error: SendError.GENERIC_FAIL }); + // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) + } + } + + return { + successes, + failures + }; + } +} + +export function wakuLightPush( + init: Partial = {} +): (libp2p: Libp2p) => ILightPushSDK { + return (libp2p: Libp2p) => new LightPushSDK(libp2p, init); +} diff --git a/packages/sdk/src/relay/index.ts b/packages/sdk/src/relay-node/index.ts similarity index 95% rename from packages/sdk/src/relay/index.ts rename to packages/sdk/src/relay-node/index.ts index 6fdc39a32d..136713919a 100644 --- a/packages/sdk/src/relay/index.ts +++ b/packages/sdk/src/relay-node/index.ts @@ -1,7 +1,8 @@ -import { wakuFilter, wakuLightPush, wakuStore } from "@waku/core"; +import { wakuFilter, wakuStore } from "@waku/core"; import { type FullNode, type RelayNode } from "@waku/interfaces"; import { RelayCreateOptions, wakuRelay } from "@waku/relay"; +import { wakuLightPush } from "../protocols/light_push.js"; import { createLibp2pAndUpdateOptions } from "../utils/libp2p.js"; import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js"; diff --git a/packages/sdk/src/utils/content_topic.ts b/packages/sdk/src/utils/content_topic.ts index dc716eb190..62a3e7fb2a 100644 --- a/packages/sdk/src/utils/content_topic.ts +++ b/packages/sdk/src/utils/content_topic.ts @@ -12,7 +12,7 @@ import { shardInfoToPubsubTopics } from "@waku/utils"; -import { createLightNode } from "../create.js"; +import { createLightNode } from "../light-node/index.js"; interface CreateTopicOptions { waku?: LightNode; diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index 7455a8221c..8fa9aba555 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -6,7 +6,7 @@ import type { Callback, IFilter, IFilterSubscription, - ILightPush, + ILightPushSDK, IRelay, IStore, Libp2p, @@ -57,7 +57,7 @@ export class WakuNode implements Waku { public relay?: IRelay; public store?: IStore; public filter?: IFilter; - public lightPush?: ILightPush; + public lightPush?: ILightPushSDK; public connectionManager: ConnectionManager; public readonly pubsubTopics: PubsubTopic[]; @@ -65,7 +65,7 @@ export class WakuNode implements Waku { options: WakuOptions, libp2p: Libp2p, store?: (libp2p: Libp2p) => IStore, - lightPush?: (libp2p: Libp2p) => ILightPush, + lightPush?: (libp2p: Libp2p) => ILightPushSDK, filter?: (libp2p: Libp2p) => IFilter, relay?: (libp2p: Libp2p) => IRelay ) { @@ -157,7 +157,7 @@ export class WakuNode implements Waku { } if (_protocols.includes(Protocols.LightPush)) { if (this.lightPush) { - codecs.push(this.lightPush.multicodec); + codecs.push(this.lightPush.protocol.multicodec); } else { log.error( "Light Push codec not included in dial codec: protocol not mounted locally" diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 8035046900..f8ba7a69fa 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -463,37 +463,37 @@ describe("getPeers", function () { lowPingBootstrapPeer = { id: lowPingBootstrapPeerId, - protocols: [waku.lightPush.multicodec], + protocols: [waku.lightPush.protocol.multicodec], metadata: new Map().set("ping", lowPingBytes), tags: new Map().set(Tags.BOOTSTRAP, {}) } as Peer; lowPingNonBootstrapPeer = { id: lowPingNonBootstrapPeerId, - protocols: [waku.lightPush.multicodec], + protocols: [waku.lightPush.protocol.multicodec], metadata: new Map().set("ping", lowPingBytes), tags: new Map().set(Tags.PEER_EXCHANGE, {}) } as Peer; midPingBootstrapPeer = { id: midPingBootstrapPeerId, - protocols: [waku.lightPush.multicodec], + protocols: [waku.lightPush.protocol.multicodec], metadata: new Map().set("ping", midPingBytes), tags: new Map().set(Tags.BOOTSTRAP, {}) } as Peer; midPingNonBootstrapPeer = { id: midPingNonBootstrapPeerId, - protocols: [waku.lightPush.multicodec], + protocols: [waku.lightPush.protocol.multicodec], metadata: new Map().set("ping", midPingBytes), tags: new Map().set(Tags.PEER_EXCHANGE, {}) } as Peer; highPingBootstrapPeer = { id: highPingBootstrapPeerId, - protocols: [waku.lightPush.multicodec], + protocols: [waku.lightPush.protocol.multicodec], metadata: new Map().set("ping", highPingBytes), tags: new Map().set(Tags.BOOTSTRAP, {}) } as Peer; highPingNonBootstrapPeer = { id: highPingNonBootstrapPeerId, - protocols: [waku.lightPush.multicodec], + protocols: [waku.lightPush.protocol.multicodec], metadata: new Map().set("ping", highPingBytes), tags: new Map().set(Tags.PEER_EXCHANGE, {}) } as Peer; @@ -562,7 +562,7 @@ describe("getPeers", function () { maxBootstrapPeersValues.forEach((maxBootstrapPeers) => { describe(`maxBootstrapPeers=${maxBootstrapPeers}`, function () { it(`numPeers=1 -- returns one bootstrap peer `, async function () { - const result = (await (waku.lightPush as any).getPeers({ + const result = (await (waku.lightPush.protocol as any).getPeers({ numPeers: 1, maxBootstrapPeers })) as Peer[]; @@ -574,15 +574,16 @@ describe("getPeers", function () { expect(result[0].tags.has(Tags.BOOTSTRAP)).to.be.true; // Peer should be of the same protocol - expect(result[0].protocols.includes(waku.lightPush.multicodec)).to.be - .true; + expect( + result[0].protocols.includes(waku.lightPush.protocol.multicodec) + ).to.be.true; // Peer should have the lowest ping expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); }); it(`numPeers=2 -- returns total 2 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { - const result = (await (waku.lightPush as any).getPeers({ + const result = (await (waku.lightPush.protocol as any).getPeers({ numPeers: 2, maxBootstrapPeers })) as Peer[]; @@ -598,7 +599,7 @@ describe("getPeers", function () { // Should return peers with the same protocol expect( result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.multicodec) + peer.protocols.includes(waku.lightPush.protocol.multicodec) ) ).to.be.true; @@ -608,7 +609,7 @@ describe("getPeers", function () { }); it(`numPeers=3 -- returns total 3 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { - const result = (await (waku.lightPush as any).getPeers({ + const result = (await (waku.lightPush.protocol as any).getPeers({ numPeers: 3, maxBootstrapPeers })) as Peer[]; @@ -624,7 +625,7 @@ describe("getPeers", function () { // Should return peers with the same protocol expect( result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.multicodec) + peer.protocols.includes(waku.lightPush.protocol.multicodec) ) ).to.be.true; @@ -634,7 +635,7 @@ describe("getPeers", function () { }); it(`numPeers=4 -- returns total 4 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { - const result = (await (waku.lightPush as any).getPeers({ + const result = (await (waku.lightPush.protocol as any).getPeers({ numPeers: 4, maxBootstrapPeers })) as Peer[]; @@ -650,7 +651,7 @@ describe("getPeers", function () { // Should return peers with the same protocol expect( result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.multicodec) + peer.protocols.includes(waku.lightPush.protocol.multicodec) ) ).to.be.true; @@ -660,7 +661,7 @@ describe("getPeers", function () { }); it(`numPeers=0 -- returns all peers including all non-bootstrap with maxBootstrapPeers: ${maxBootstrapPeers}`, async function () { - const result = (await (waku.lightPush as any).getPeers({ + const result = (await (waku.lightPush.protocol as any).getPeers({ numPeers: 0, maxBootstrapPeers })) as Peer[]; @@ -686,7 +687,7 @@ describe("getPeers", function () { // Peers should be of the same protocol expect( result.every((peer: Peer) => - peer.protocols.includes(waku.lightPush.multicodec) + peer.protocols.includes(waku.lightPush.protocol.multicodec) ) ).to.be.true; @@ -707,7 +708,7 @@ describe("getPeers", function () { //numPeers fc.integer({ min: 0, max: 100 }), async (maxBootstrapPeers, numPeers) => { - const result = (await (waku.lightPush as any).getPeers({ + const result = (await (waku.lightPush.protocol as any).getPeers({ numPeers, maxBootstrapPeers })) as Peer[]; diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index 2934c263b8..a4055d10cc 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -55,7 +55,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); - expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -74,7 +74,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); - expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); } expect(await serviceNodes.messageCollector.waitForMessages(30)).to.eq( @@ -94,9 +94,11 @@ const runTests = (strictNodeCheck: boolean): void => { payload: new Uint8Array() }); - expect(pushResponse.recipients.length).to.eq(0); + expect(pushResponse.successes.length).to.eq(0); console.log("validated 1"); - expect(pushResponse.errors).to.include(SendError.EMPTY_PAYLOAD); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + SendError.EMPTY_PAYLOAD + ); console.log("validated 2"); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( false @@ -113,7 +115,7 @@ const runTests = (strictNodeCheck: boolean): void => { customEncoder, messagePayload ); - expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -146,7 +148,7 @@ const runTests = (strictNodeCheck: boolean): void => { customTestEncoder, messagePayload ); - expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -176,7 +178,7 @@ const runTests = (strictNodeCheck: boolean): void => { ); if (serviceNodes.type == "go-waku") { - expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true ); @@ -185,8 +187,10 @@ const runTests = (strictNodeCheck: boolean): void => { expectedContentTopic: TestContentTopic }); } else { - expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.REMOTE_PEER_REJECTED); + expect(pushResponse.successes.length).to.eq(0); + expect( + pushResponse.failures?.map((failure) => failure.error) + ).to.include(SendError.REMOTE_PEER_REJECTED); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( false ); @@ -208,7 +212,7 @@ const runTests = (strictNodeCheck: boolean): void => { payload: utf8ToBytes(messageText), rateLimitProof: rateLimitProof }); - expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -229,7 +233,7 @@ const runTests = (strictNodeCheck: boolean): void => { payload: utf8ToBytes(messageText), timestamp: new Date(testItem) }); - expect(pushResponse.recipients.length).to.eq(numServiceNodes); + expect(pushResponse.successes.length).to.eq(numServiceNodes); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( true @@ -247,7 +251,7 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: bigPayload }); - expect(pushResponse.recipients.length).to.greaterThan(0); + expect(pushResponse.successes.length).to.greaterThan(0); }); it("Fails to push message bigger that 1MB", async function () { @@ -256,8 +260,10 @@ const runTests = (strictNodeCheck: boolean): void => { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: generateRandomUint8Array(MB + 65536) }); - expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + SendError.SIZE_TOO_BIG + ); expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( false ); diff --git a/packages/tests/tests/light-push/single_node/index.node.spec.ts b/packages/tests/tests/light-push/single_node/index.node.spec.ts index e794aa4a53..fb1779ae91 100644 --- a/packages/tests/tests/light-push/single_node/index.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/index.node.spec.ts @@ -48,7 +48,7 @@ describe("Waku Light Push: Single Node", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -65,7 +65,7 @@ describe("Waku Light Push: Single Node", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(generateMessageText(i)) }); - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); } expect(await messageCollector.waitForMessages(30)).to.eq(true); @@ -83,8 +83,10 @@ describe("Waku Light Push: Single Node", function () { payload: new Uint8Array() }); - expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.EMPTY_PAYLOAD); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + SendError.EMPTY_PAYLOAD + ); expect(await messageCollector.waitForMessages(1)).to.eq(false); }); @@ -97,7 +99,7 @@ describe("Waku Light Push: Single Node", function () { customEncoder, messagePayload ); - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -128,7 +130,7 @@ describe("Waku Light Push: Single Node", function () { customTestEncoder, messagePayload ); - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -156,15 +158,17 @@ describe("Waku Light Push: Single Node", function () { ); if (nwaku.type == "go-waku") { - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: TestContentTopic }); } else { - expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.REMOTE_PEER_REJECTED); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + SendError.REMOTE_PEER_REJECTED + ); expect(await messageCollector.waitForMessages(1)).to.eq(false); } }); @@ -184,7 +188,7 @@ describe("Waku Light Push: Single Node", function () { payload: utf8ToBytes(messageText), rateLimitProof: rateLimitProof }); - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -203,7 +207,7 @@ describe("Waku Light Push: Single Node", function () { payload: utf8ToBytes(messageText), timestamp: new Date(testItem) }); - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -219,7 +223,7 @@ describe("Waku Light Push: Single Node", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: bigPayload }); - expect(pushResponse.recipients.length).to.greaterThan(0); + expect(pushResponse.successes.length).to.greaterThan(0); }); it("Fails to push message bigger that 1MB", async function () { @@ -228,8 +232,10 @@ describe("Waku Light Push: Single Node", function () { const pushResponse = await waku.lightPush.send(TestEncoder, { payload: generateRandomUint8Array(MB + 65536) }); - expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + SendError.SIZE_TOO_BIG + ); expect(await messageCollector.waitForMessages(1)).to.eq(false); }); }); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index d16893f3e2..1144e5095c 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -4,7 +4,6 @@ import { ContentTopicInfo, LightNode, Protocols, - SendResult, ShardInfo, SingleShardInfo } from "@waku/interfaces"; @@ -76,7 +75,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); - expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); expect( await messageCollector.waitForMessages(1, { @@ -96,8 +95,8 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { const pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); - expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); const messageCollector2 = new MessageCollector(nwaku); @@ -143,27 +142,20 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { const messageCollector2 = new MessageCollector(nwaku2); - let pushResponse1: SendResult; - let pushResponse2: SendResult; - // Making sure that we send messages to both nwaku nodes - // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 - while ( - !(await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic1 - })) || - !(await messageCollector2.waitForMessages(1, { - pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) - })) || - pushResponse1!.recipients[0].toString() === - pushResponse2!.recipients[0].toString() - ) { - pushResponse1 = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - pushResponse2 = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - } + await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + + await messageCollector.waitForMessages(1, { + pubsubTopic: customPubsubTopic1 + }); + + await messageCollector2.waitForMessages(1, { + pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) + }); messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", @@ -230,8 +222,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); - expect(pushResponse.errors).to.be.empty; - expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse.failures).to.be.empty; + expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); expect( await messageCollector.waitForMessagesAutosharding(1, { @@ -251,8 +243,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { const pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); - expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); const messageCollector2 = new MessageCollector(nwaku); @@ -296,27 +288,19 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { const messageCollector2 = new MessageCollector(nwaku2); - let pushResponse1: SendResult; - let pushResponse2: SendResult; - // Making sure that we send messages to both nwaku nodes - // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 - while ( - !(await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 - })) || - !(await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 - })) || - pushResponse1!.recipients[0].toString() === - pushResponse2!.recipients[0].toString() - ) { - pushResponse1 = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - pushResponse2 = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - } + await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 + }); + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 + }); messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", @@ -384,7 +368,7 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () payload: utf8ToBytes(messageText) }); - expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse.successes[0].toString()).to.eq(nimPeerId.toString()); expect( await messageCollector.waitForMessages(1, { @@ -404,8 +388,8 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () const pushResponse2 = await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); - expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse1.successes[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse2.successes[0].toString()).to.eq(nimPeerId.toString()); const messageCollector2 = new MessageCollector(nwaku); @@ -448,27 +432,19 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () const messageCollector2 = new MessageCollector(nwaku2); - let pushResponse1: SendResult; - let pushResponse2: SendResult; - // Making sure that we send messages to both nwaku nodes - // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 - while ( - !(await messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 - })) || - !(await messageCollector2.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic2 - })) || - pushResponse1!.recipients[0].toString() === - pushResponse2!.recipients[0].toString() - ) { - pushResponse1 = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - pushResponse2 = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - } + await waku.lightPush.send(customEncoder1, { + payload: utf8ToBytes("M1") + }); + await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + + await messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 + }); + await messageCollector2.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic2 + }); messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index c2e086e22f..b5261e92a1 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -139,16 +139,16 @@ describe("Waku Relay, multiple pubsub topics", function () { payload: utf8ToBytes("M3") }); - expect(relayResponse1.recipients[0].toString()).to.eq( + expect(relayResponse1.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); - expect(relayResponse3.recipients[0].toString()).to.eq( + expect(relayResponse3.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + expect(relayResponse2.successes.map((r) => r.toString())).to.include( waku1.libp2p.peerId.toString() ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + expect(relayResponse2.successes.map((r) => r.toString())).to.include( waku3.libp2p.peerId.toString() ); @@ -431,16 +431,16 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { payload: utf8ToBytes("M3") }); - expect(relayResponse1.recipients[0].toString()).to.eq( + expect(relayResponse1.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); - expect(relayResponse3.recipients[0].toString()).to.eq( + expect(relayResponse3.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + expect(relayResponse2.successes.map((r) => r.toString())).to.include( waku1.libp2p.peerId.toString() ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + expect(relayResponse2.successes.map((r) => r.toString())).to.include( waku3.libp2p.peerId.toString() ); @@ -739,16 +739,16 @@ describe("Waku Relay (named sharding), multiple pubsub topics", function () { payload: utf8ToBytes("M3") }); - expect(relayResponse1.recipients[0].toString()).to.eq( + expect(relayResponse1.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); - expect(relayResponse3.recipients[0].toString()).to.eq( + expect(relayResponse3.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + expect(relayResponse2.successes.map((r) => r.toString())).to.include( waku1.libp2p.peerId.toString() ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + expect(relayResponse2.successes.map((r) => r.toString())).to.include( waku3.libp2p.peerId.toString() ); diff --git a/packages/tests/tests/relay/publish.node.spec.ts b/packages/tests/tests/relay/publish.node.spec.ts index 892553655e..50b6fb7340 100644 --- a/packages/tests/tests/relay/publish.node.spec.ts +++ b/packages/tests/tests/relay/publish.node.spec.ts @@ -62,8 +62,8 @@ describe("Waku Relay, Publish", function () { const pushResponse = await waku1.relay.send(TestEncoder, { payload: utf8ToBytes(testItem.value) }); - expect(pushResponse.recipients.length).to.eq(1); - expect(pushResponse.recipients[0].toString()).to.eq( + expect(pushResponse.successes.length).to.eq(1); + expect(pushResponse.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); expect(await messageCollector.waitForMessages(1)).to.eq(true); @@ -86,8 +86,8 @@ describe("Waku Relay, Publish", function () { timestamp: testItem }); - expect(pushResponse.recipients.length).to.eq(1); - expect(pushResponse.recipients[0].toString()).to.eq( + expect(pushResponse.successes.length).to.eq(1); + expect(pushResponse.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); @@ -134,7 +134,9 @@ describe("Waku Relay, Publish", function () { const pushResponse = await waku1.relay.send(wrong_encoder, { payload: utf8ToBytes("") }); - expect(pushResponse.errors?.[0]).to.eq("Topic not configured"); + expect(pushResponse.failures?.[0].error).to.eq( + SendError.TOPIC_NOT_CONFIGURED + ); await delay(400); expect(await messageCollector.waitForMessages(1)).to.eq(false); }); @@ -144,8 +146,10 @@ describe("Waku Relay, Publish", function () { const pushResponse = await waku1.relay.send(TestEncoder, { payload: generateRandomUint8Array(testItem) }); - expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); + expect(pushResponse.successes.length).to.eq(0); + expect(pushResponse.failures?.map((failure) => failure.error)).to.include( + SendError.SIZE_TOO_BIG + ); await delay(400); expect(await messageCollector.waitForMessages(1)).to.eq(false); }); @@ -169,8 +173,8 @@ describe("Waku Relay, Publish", function () { const pushResponse = await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m2") }); - expect(pushResponse.recipients.length).to.eq(1); - expect(pushResponse.recipients[0].toString()).to.eq( + expect(pushResponse.successes.length).to.eq(1); + expect(pushResponse.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); expect(await messageCollector.waitForMessages(2)).to.eq(true); @@ -194,8 +198,8 @@ describe("Waku Relay, Publish", function () { const pushResponse = await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m2") }); - expect(pushResponse.recipients.length).to.eq(1); - expect(pushResponse.recipients[0].toString()).to.eq( + expect(pushResponse.successes.length).to.eq(1); + expect(pushResponse.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); expect(await messageCollector.waitForMessages(2)).to.eq(true); @@ -210,8 +214,8 @@ describe("Waku Relay, Publish", function () { const pushResponse = await waku1.relay.send(customTestEncoder, { payload: utf8ToBytes(messageText) }); - expect(pushResponse.recipients.length).to.eq(1); - expect(pushResponse.recipients[0].toString()).to.eq( + expect(pushResponse.successes.length).to.eq(1); + expect(pushResponse.successes[0].toString()).to.eq( waku2.libp2p.peerId.toString() ); expect(await messageCollector.waitForMessages(1)).to.eq(true); @@ -232,7 +236,7 @@ describe("Waku Relay, Publish", function () { payload: utf8ToBytes(messageText), rateLimitProof: rateLimitProof }); - expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.successes.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { diff --git a/packages/tests/tests/sharding/running_nodes.spec.ts b/packages/tests/tests/sharding/running_nodes.spec.ts index f122f33d52..24beb3f519 100644 --- a/packages/tests/tests/sharding/running_nodes.spec.ts +++ b/packages/tests/tests/sharding/running_nodes.spec.ts @@ -1,6 +1,7 @@ import { LightNode, Protocols, + SendError, ShardInfo, SingleShardInfo } from "@waku/interfaces"; @@ -10,10 +11,7 @@ import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk"; -import { - contentTopicToShardIndex, - singleShardInfoToPubsubTopic -} from "@waku/utils"; +import { contentTopicToShardIndex } from "@waku/utils"; import { expect } from "chai"; import { @@ -24,14 +22,6 @@ import { tearDownNodes } from "../../src/index.js"; -const PubsubTopic1 = singleShardInfoToPubsubTopic({ - clusterId: 0, - shard: 2 -}); -const PubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: 0, - shard: 3 -}); const shardInfoFirstShard: ShardInfo = { clusterId: 0, shards: [2] }; const shardInfoBothShards: ShardInfo = { clusterId: 0, shards: [2, 3] }; const singleShardInfo1: SingleShardInfo = { clusterId: 0, shard: 2 }; @@ -78,8 +68,8 @@ describe("Static Sharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request1.recipients.length).to.eq(1); - expect(request2.recipients.length).to.eq(1); + expect(request1.successes.length).to.eq(1); + expect(request2.successes.length).to.eq(1); }); it("using a protocol with unconfigured pubsub topic should fail", async function () { @@ -94,21 +84,16 @@ describe("Static Sharding: Running Nodes", function () { pubsubTopicShardInfo: singleShardInfo2 }); - try { - await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); + const { successes, failures } = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + if (successes.length > 0 || failures?.length === 0) { throw new Error("The request should've thrown an error"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - `Pubsub topic ${PubsubTopic2} has not been configured on this instance. Configured topics are: ${PubsubTopic1}` - ) - ) { - throw err; - } } + + const errors = failures?.map((failure) => failure.error); + expect(errors).to.include(SendError.TOPIC_NOT_CONFIGURED); }); }); @@ -161,7 +146,7 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request1.recipients.length).to.eq(1); - expect(request2.recipients.length).to.eq(1); + expect(request1.successes.length).to.eq(1); + expect(request2.successes.length).to.eq(1); }); }); diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index b6b590b368..d50c5e29eb 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -172,8 +172,8 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waitForRemotePeer(waku2, [Protocols.LightPush]); - const peers = (await waku2.lightPush.connectedPeers()).map((peer) => - peer.id.toString() + const peers = (await waku2.lightPush.protocol.connectedPeers()).map( + (peer) => peer.id.toString() ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -238,9 +238,9 @@ describe("Wait for remote peer", function () { const storePeers = (await waku2.store.connectedPeers()).map((peer) => peer.id.toString() ); - const lightPushPeers = (await waku2.lightPush.connectedPeers()).map( - (peer) => peer.id.toString() - ); + const lightPushPeers = ( + await waku2.lightPush.protocol.connectedPeers() + ).map((peer) => peer.id.toString()); const nimPeerId = multiAddrWithId.getPeerId(); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index 236ebc0f1f..c2911234cd 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -99,6 +99,8 @@ export const pubsubTopicToSingleShardInfo = ( }; }; +//TODO: move part of BaseProtocol instead of utils +// return `SendError.TOPIC_NOT_CONFIGURED` instead of throwing export function ensurePubsubTopicIsConfigured( pubsubTopic: PubsubTopic, configuredTopics: PubsubTopic[]