diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 790a1953d4..b6dc724a32 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -1,17 +1,20 @@ -import type { Peer } from "@libp2p/interface"; +import type { Peer, Stream } from "@libp2p/interface"; import type { IncomingStreamData } from "@libp2p/interface-internal"; -import type { - ContentTopic, - IBaseProtocolCore, - Libp2p, - ProtocolCreateOptions, - PubsubTopic +import { + type ContentTopic, + type CoreProtocolResult, + type IBaseProtocolCore, + type Libp2p, + type ProtocolCreateOptions, + ProtocolError, + type PubsubTopic } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; +import { Uint8ArrayList } from "uint8arraylist"; import { BaseProtocol } from "../base_protocol.js"; @@ -90,7 +93,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { pubsubTopic: PubsubTopic, peer: Peer, contentTopics: ContentTopic[] - ): Promise { + ): Promise { const stream = await this.getStream(peer); const request = FilterSubscribeRpc.createSubscribeRequest( @@ -98,45 +101,98 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { contentTopics ); - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); - - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` + let res: Uint8ArrayList[] | undefined; + try { + res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) ); + } catch (error) { + log.error("Failed to send subscribe request", error); + return { + success: null, + failure: { + error: ProtocolError.GENERIC_FAIL, + peerId: peer.id + } + }; } const { statusCode, requestId, statusDesc } = FilterSubscribeResponse.decode(res[0].slice()); if (statusCode < 200 || statusCode >= 300) { - throw new Error( + log.error( `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` ); + return { + failure: { + error: ProtocolError.REMOTE_PEER_REJECTED, + peerId: peer.id + }, + success: null + }; } + + return { + failure: null, + success: peer.id + }; } async unsubscribe( pubsubTopic: PubsubTopic, peer: Peer, contentTopics: ContentTopic[] - ): Promise { - const stream = await this.getStream(peer); + ): Promise { + let stream: Stream | undefined; + try { + stream = await this.getStream(peer); + } catch (error) { + log.error( + `Failed to get a stream for remote peer${peer.id.toString()}`, + error + ); + return { + success: null, + failure: { + error: ProtocolError.REMOTE_PEER_FAULT, + peerId: peer.id + } + }; + } + const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( pubsubTopic, contentTopics ); - await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); + try { + await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); + } catch (error) { + log.error("Failed to send unsubscribe request", error); + return { + success: null, + failure: { + error: ProtocolError.GENERIC_FAIL, + peerId: peer.id + } + }; + } + + return { + success: peer.id, + failure: null + }; } - async unsubscribeAll(pubsubTopic: PubsubTopic, peer: Peer): Promise { + async unsubscribeAll( + pubsubTopic: PubsubTopic, + peer: Peer + ): Promise { const stream = await this.getStream(peer); const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic); @@ -150,53 +206,105 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore { ); if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` - ); + return { + failure: { + error: ProtocolError.REMOTE_PEER_FAULT, + peerId: peer.id + }, + success: null + }; } const { statusCode, requestId, statusDesc } = FilterSubscribeResponse.decode(res[0].slice()); if (statusCode < 200 || statusCode >= 300) { - throw new Error( + log.error( `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` ); + return { + failure: { + error: ProtocolError.REMOTE_PEER_REJECTED, + peerId: peer.id + }, + success: null + }; } + + return { + failure: null, + success: peer.id + }; } - async ping(peer: Peer): Promise { - const stream = await this.getStream(peer); + async ping(peer: Peer): Promise { + let stream: Stream | undefined; + try { + stream = await this.getStream(peer); + } catch (error) { + log.error( + `Failed to get a stream for remote peer${peer.id.toString()}`, + error + ); + return { + success: null, + failure: { + error: ProtocolError.REMOTE_PEER_FAULT, + peerId: peer.id + } + }; + } const request = FilterSubscribeRpc.createSubscriberPingRequest(); + let res: Uint8ArrayList[] | undefined; try { - const res = await pipe( + res = await pipe( [request.encode()], lp.encode, stream, lp.decode, async (source) => await all(source) ); - - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` - ); - } - - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); - - if (statusCode < 200 || statusCode >= 300) { - throw new Error( - `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` - ); - } - log.info(`Ping successful for peer ${peer.id.toString()}`); } catch (error) { - log.error("Error pinging: ", error); - throw error; // Rethrow the actual error instead of wrapping it + log.error("Failed to send ping request", error); + return { + success: null, + failure: { + error: ProtocolError.GENERIC_FAIL, + peerId: peer.id + } + }; + } + + if (!res || !res.length) { + return { + success: null, + failure: { + error: ProtocolError.REMOTE_PEER_FAULT, + peerId: peer.id + } + }; + } + + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + log.error( + `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + return { + success: null, + failure: { + error: ProtocolError.REMOTE_PEER_REJECTED, + peerId: peer.id + } + }; } + return { + success: peer.id, + failure: null + }; } } diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 62d922c805..2b8fb3ebd2 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -1,13 +1,13 @@ -import type { Peer, PeerId, Stream } from "@libp2p/interface"; +import type { Peer, Stream } from "@libp2p/interface"; import { - Failure, - IBaseProtocolCore, - IEncoder, - IMessage, - Libp2p, - ProtocolCreateOptions, + type CoreProtocolResult, + type IBaseProtocolCore, + type IEncoder, + type IMessage, + type Libp2p, + type ProtocolCreateOptions, ProtocolError, - ProtocolResult + type ThisOrThat } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; import { isMessageSizeUnderCap } from "@waku/utils"; @@ -26,9 +26,7 @@ const log = new Logger("light-push"); export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"; export { PushResponse }; -type PreparePushMessageResult = ProtocolResult<"query", PushRpc>; - -type CoreSendResult = ProtocolResult<"success", PeerId, "failure", Failure>; +type PreparePushMessageResult = ThisOrThat<"query", PushRpc>; /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). @@ -84,7 +82,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore { encoder: IEncoder, message: IMessage, peer: Peer - ): Promise { + ): Promise { const { query, error: preparationError } = await this.preparePushMessage( encoder, message diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 4fed198168..e402b7d37c 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -3,9 +3,9 @@ import { IncomingStreamData } from "@libp2p/interface"; import { type IMetadata, type Libp2pComponents, + type MetadataQueryResult, type PeerIdStr, ProtocolError, - QueryResult, type ShardInfo } from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; @@ -74,7 +74,7 @@ class Metadata extends BaseProtocol implements IMetadata { /** * Make a metadata query to a peer */ - async query(peerId: PeerId): Promise { + async query(peerId: PeerId): Promise { const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo); const peer = await this.peerStore.get(peerId); @@ -112,7 +112,9 @@ class Metadata extends BaseProtocol implements IMetadata { }; } - public async confirmOrAttemptHandshake(peerId: PeerId): Promise { + public async confirmOrAttemptHandshake( + peerId: PeerId + ): Promise { const shardInfo = this.handshakesConfirmed.get(peerId.toString()); if (shardInfo) { return { @@ -126,7 +128,7 @@ class Metadata extends BaseProtocol implements IMetadata { private decodeMetadataResponse( encodedResponse: Uint8ArrayList[] - ): QueryResult { + ): MetadataQueryResult { const bytes = new Uint8ArrayList(); encodedResponse.forEach((chunk) => { diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts index babb6bf688..078094cf9d 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange.ts @@ -4,7 +4,7 @@ import { IPeerExchange, Libp2pComponents, PeerExchangeQueryParams, - PeerExchangeResult, + PeerExchangeQueryResult, ProtocolError, PubsubTopic } from "@waku/interfaces"; @@ -35,7 +35,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { /** * Make a peer exchange query to a peer */ - async query(params: PeerExchangeQueryParams): Promise { + async query( + params: PeerExchangeQueryParams + ): Promise { const { numPeers } = params; const rpcQuery = PeerExchangeRPC.createRequest({ numPeers: BigInt(numPeers) diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts index bbf3ca6ff6..5b52340d05 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts +++ b/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts @@ -8,8 +8,8 @@ import type { PeerInfo } from "@libp2p/interface"; import { - Libp2pComponents, - PeerExchangeResult, + type Libp2pComponents, + type PeerExchangeQueryResult, PubsubTopic, Tags } from "@waku/interfaces"; @@ -165,7 +165,7 @@ export class PeerExchangeDiscovery }, queryInterval * currentAttempt); }; - private async query(peerId: PeerId): Promise { + private async query(peerId: PeerId): Promise { const { error, peerInfos } = await this.peerExchange.query({ numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES, peerId diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index e953910721..b4071a4d26 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -1,9 +1,11 @@ import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { ContentTopic, PubsubTopic } from "./misc.js"; +import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js"; import type { Callback, IBaseProtocolCore, IBaseProtocolSDK, + ProtocolError, + SDKProtocolResult, ShardingParams } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; @@ -12,18 +14,20 @@ export type SubscribeOptions = { keepAlive?: number; }; -export interface IFilterSubscription { +export type IFilter = IReceiver & IBaseProtocolCore; + +export interface ISubscriptionSDK { subscribe( decoders: IDecoder | IDecoder[], callback: Callback, options?: SubscribeOptions - ): Promise; + ): Promise; - unsubscribe(contentTopics: ContentTopic[]): Promise; + unsubscribe(contentTopics: ContentTopic[]): Promise; - ping(): Promise; + ping(): Promise; - unsubscribeAll(): Promise; + unsubscribeAll(): Promise; } export type IFilterSDK = IReceiver & @@ -31,5 +35,12 @@ export type IFilterSDK = IReceiver & createSubscription( pubsubTopicShardInfo?: ShardingParams | PubsubTopic, options?: SubscribeOptions - ): Promise; + ): Promise; }; + +export type CreateSubscriptionResult = ThisOrThat< + "subscription", + ISubscriptionSDK, + "error", + ProtocolError +>; diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index 47d561b936..31f0ced66e 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,17 +1,14 @@ import type { PeerId } from "@libp2p/interface"; import { type ShardInfo } from "./enr.js"; -import type { - IBaseProtocolCore, - ProtocolResult, - ShardingParams -} from "./protocols.js"; +import { ThisOrThat } from "./misc.js"; +import type { IBaseProtocolCore, ShardingParams } from "./protocols.js"; -export type QueryResult = ProtocolResult<"shardInfo", ShardInfo>; +export type MetadataQueryResult = ThisOrThat<"shardInfo", ShardInfo>; // IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol export interface IMetadata extends Omit { shardInfo: ShardingParams; - confirmOrAttemptHandshake(peerId: PeerId): Promise; - query(peerId: PeerId): Promise; + confirmOrAttemptHandshake(peerId: PeerId): Promise; + query(peerId: PeerId): Promise; } diff --git a/packages/interfaces/src/misc.ts b/packages/interfaces/src/misc.ts index 4382c61566..91170c88fa 100644 --- a/packages/interfaces/src/misc.ts +++ b/packages/interfaces/src/misc.ts @@ -1,4 +1,5 @@ import type { IDecodedMessage } from "./message.js"; +import { ProtocolError } from "./protocols.js"; export interface IAsyncIterator { iterator: AsyncIterator; @@ -11,3 +12,23 @@ export type PubsubTopic = string; export type ContentTopic = string; export type PeerIdStr = string; + +// SK = success key name +// SV = success value type +// EK = error key name (default: "error") +// EV = error value type (default: ProtocolError) +export type ThisOrThat< + SK extends string, + SV, + EK extends string = "error", + EV = ProtocolError +> = + | ({ [key in SK]: SV } & { [key in EK]: null }) + | ({ [key in SK]: null } & { [key in EK]: EV }); + +export type ThisAndThat< + SK extends string, + SV, + EK extends string = "error", + EV = ProtocolError +> = { [key in SK]: SV } & { [key in EK]: EV }; diff --git a/packages/interfaces/src/peer_exchange.ts b/packages/interfaces/src/peer_exchange.ts index 9f9c624ab5..2078899cae 100644 --- a/packages/interfaces/src/peer_exchange.ts +++ b/packages/interfaces/src/peer_exchange.ts @@ -3,13 +3,14 @@ import type { PeerStore } from "@libp2p/interface"; import type { ConnectionManager } from "@libp2p/interface-internal"; import { IEnr } from "./enr.js"; -import { IBaseProtocolCore, ProtocolResult } from "./protocols.js"; +import { ThisOrThat } from "./misc.js"; +import { IBaseProtocolCore } from "./protocols.js"; export interface IPeerExchange extends IBaseProtocolCore { - query(params: PeerExchangeQueryParams): Promise; + query(params: PeerExchangeQueryParams): Promise; } -export type PeerExchangeResult = ProtocolResult<"peerInfos", PeerInfo[]>; +export type PeerExchangeQueryResult = ThisOrThat<"peerInfos", PeerInfo[]>; export interface PeerExchangeQueryParams { numPeers: number; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 889bc0b9bc..0ea4557614 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -5,7 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface"; import type { ShardInfo } from "./enr.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; -import { PubsubTopic } from "./misc.js"; +import { PubsubTopic, ThisAndThat, ThisOrThat } from "./misc.js"; export enum Protocols { Relay = "relay", @@ -107,27 +107,6 @@ export type Callback = ( msg: T ) => void | Promise; -// SK = success key name -// SV = success value type -// EK = error key name (default: "error") -// EV = error value type (default: ProtocolError) -export type ProtocolResult< - SK extends string, - SV, - EK extends string = "error", - EV = ProtocolError -> = - | ({ - [key in SK]: SV; - } & { - [key in EK]: null; - }) - | ({ - [key in SK]: null; - } & { - [key in EK]: EV; - }); - export enum ProtocolError { /** Could not determine the origin of the fault. Best to check connectivity and try again */ GENERIC_FAIL = "Generic error", @@ -156,6 +135,11 @@ export enum ProtocolError { * Please ensure that the PubsubTopic is used when initializing the Waku node. */ TOPIC_NOT_CONFIGURED = "Topic not configured", + /** + * The pubsub topic configured on the decoder does not match the pubsub topic setup on the protocol. + * Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol. + */ + TOPIC_DECODER_MISMATCH = "Topic decoder mismatch", /** * Failure to find a peer with suitable protocols. This may due to a connection issue. * Mitigation can be: retrying after a given time period, display connectivity issue @@ -186,7 +170,16 @@ export interface Failure { peerId?: PeerId; } -export interface SendResult { - failures?: Failure[]; - successes: PeerId[]; -} +export type CoreProtocolResult = ThisOrThat< + "success", + PeerId, + "failure", + Failure +>; + +export type SDKProtocolResult = ThisAndThat< + "successes", + PeerId[], + "failures", + Failure[] +>; diff --git a/packages/interfaces/src/sender.ts b/packages/interfaces/src/sender.ts index ab7e6d1b8c..2dbe72def9 100644 --- a/packages/interfaces/src/sender.ts +++ b/packages/interfaces/src/sender.ts @@ -1,6 +1,6 @@ import type { IEncoder, IMessage } from "./message.js"; -import type { SendResult } from "./protocols.js"; +import { SDKProtocolResult } from "./protocols.js"; export interface ISender { - send: (encoder: IEncoder, message: IMessage) => Promise; + send: (encoder: IEncoder, message: IMessage) => Promise; } diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index bbe10447b9..c1f0addc10 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -22,7 +22,7 @@ import { ProtocolCreateOptions, ProtocolError, PubsubTopic, - SendResult + SDKProtocolResult } from "@waku/interfaces"; import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; import { pushOrInitMapSet } from "@waku/utils"; @@ -99,7 +99,10 @@ class Relay implements IRelay { /** * Send Waku message. */ - public async send(encoder: IEncoder, message: IMessage): Promise { + public async send( + encoder: IEncoder, + message: IMessage + ): Promise { const successes: PeerId[] = []; const { pubsubTopic } = encoder; @@ -142,7 +145,8 @@ class Relay implements IRelay { const { recipients } = await this.gossipSub.publish(pubsubTopic, msg); return { - successes: recipients + successes: recipients, + failures: [] }; } diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index e13af706ff..0d169a09db 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -1,19 +1,24 @@ import type { Peer } from "@libp2p/interface"; import { FilterCore } from "@waku/core"; -import type { - Callback, - ContentTopic, - IAsyncIterator, - IDecodedMessage, - IDecoder, - IFilterSDK, - IProtoMessage, - Libp2p, - ProtocolCreateOptions, - PubsubTopic, - ShardingParams, +import { + type Callback, + type ContentTopic, + CoreProtocolResult, + CreateSubscriptionResult, + type IAsyncIterator, + type IDecodedMessage, + type IDecoder, + type IFilterSDK, + type IProtoMessage, + type ISubscriptionSDK, + type Libp2p, + type ProtocolCreateOptions, + ProtocolError, + type PubsubTopic, + SDKProtocolResult, + type ShardingParams, SubscribeOptions, - Unsubscribe + type Unsubscribe } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; @@ -38,8 +43,7 @@ const MINUTE = 60 * 1000; const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: MINUTE }; - -export class SubscriptionManager { +export class SubscriptionManager implements ISubscriptionSDK { private readonly pubsubTopic: PubsubTopic; readonly peers: Peer[]; readonly receivedMessagesHashStr: string[] = []; @@ -64,28 +68,33 @@ export class SubscriptionManager { decoders: IDecoder | IDecoder[], callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { + ): Promise { const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; // check that all decoders are configured for the same pubsub topic as this subscription - decodersArray.forEach((decoder) => { + for (const decoder of decodersArray) { if (decoder.pubsubTopic !== this.pubsubTopic) { - throw new Error( - `Pubsub topic not configured: decoder is configured for pubsub topic ${decoder.pubsubTopic} but this subscription is for pubsub topic ${this.pubsubTopic}. Please create a new Subscription for the different pubsub topic.` - ); + return { + failures: [ + { + error: ProtocolError.TOPIC_DECODER_MISMATCH + } + ], + successes: [] + }; } - }); + } const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const promises = this.peers.map(async (peer) => { - await this.protocol.subscribe(this.pubsubTopic, peer, contentTopics); - }); + const promises = this.peers.map(async (peer) => + this.protocol.subscribe(this.pubsubTopic, peer, contentTopics) + ); const results = await Promise.allSettled(promises); - this.handleErrors(results, "subscribe"); + const finalResult = this.handleResult(results, "subscribe"); // Save the callback functions by content topics so they // can easily be removed (reciprocally replaced) if `unsubscribe` (reciprocally `subscribe`) @@ -106,50 +115,59 @@ export class SubscriptionManager { if (options?.keepAlive) { this.startKeepAlivePings(options.keepAlive); } + + return finalResult; } - async unsubscribe(contentTopics: ContentTopic[]): Promise { + async unsubscribe(contentTopics: ContentTopic[]): Promise { const promises = this.peers.map(async (peer) => { - await this.protocol.unsubscribe(this.pubsubTopic, peer, contentTopics); + const response = await this.protocol.unsubscribe( + this.pubsubTopic, + peer, + contentTopics + ); contentTopics.forEach((contentTopic: string) => { this.subscriptionCallbacks.delete(contentTopic); }); + + return response; }); const results = await Promise.allSettled(promises); - - this.handleErrors(results, "unsubscribe"); + const finalResult = this.handleResult(results, "unsubscribe"); if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) { this.stopKeepAlivePings(); } + + return finalResult; } - async ping(): Promise { - const promises = this.peers.map(async (peer) => { - await this.protocol.ping(peer); - }); + async ping(): Promise { + const promises = this.peers.map(async (peer) => this.protocol.ping(peer)); const results = await Promise.allSettled(promises); - this.handleErrors(results, "ping"); + return this.handleResult(results, "ping"); } - async unsubscribeAll(): Promise { - const promises = this.peers.map(async (peer) => { - await this.protocol.unsubscribeAll(this.pubsubTopic, peer); - }); + async unsubscribeAll(): Promise { + const promises = this.peers.map(async (peer) => + this.protocol.unsubscribeAll(this.pubsubTopic, peer) + ); const results = await Promise.allSettled(promises); this.subscriptionCallbacks.clear(); - this.handleErrors(results, "unsubscribeAll"); + const finalResult = this.handleResult(results, "unsubscribeAll"); if (this.keepAliveTimer) { this.stopKeepAlivePings(); } + + return finalResult; } async processIncomingMessage(message: WakuMessage): Promise { @@ -178,40 +196,32 @@ export class SubscriptionManager { await pushMessage(subscriptionCallback, this.pubsubTopic, message); } - // Filter out only the rejected promises and extract & handle their reasons - private handleErrors( - results: PromiseSettledResult[], + private handleResult( + results: PromiseSettledResult[], type: "ping" | "subscribe" | "unsubscribe" | "unsubscribeAll" - ): void { - const errors = results - .filter( - (result): result is PromiseRejectedResult => - result.status === "rejected" - ) - .map((rejectedResult) => rejectedResult.reason); - - if (errors.length === this.peers.length) { - const errorCounts = new Map(); - // TODO: streamline error logging with https://github.com/orgs/waku-org/projects/2/views/1?pane=issue&itemId=42849952 - errors.forEach((error) => { - const message = error instanceof Error ? error.message : String(error); - errorCounts.set(message, (errorCounts.get(message) || 0) + 1); - }); - - const uniqueErrorMessages = Array.from( - errorCounts, - ([message, count]) => `${message} (occurred ${count} times)` - ).join(", "); - throw new Error(`Error ${type} all peers: ${uniqueErrorMessages}`); - } else if (errors.length > 0) { - // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) - log.warn( - `Some ${type} failed. These will be refreshed with new peers`, - errors - ); - } else { - log.info(`${type} successful for all peers`); + ): SDKProtocolResult { + const result: SDKProtocolResult = { failures: [], successes: [] }; + + for (const promiseResult of results) { + if (promiseResult.status === "rejected") { + log.error( + `Failed to resolve ${type} promise successfully: `, + promiseResult.reason + ); + result.failures.push({ error: ProtocolError.GENERIC_FAIL }); + } else { + const coreResult = promiseResult.value; + if (coreResult.failure) { + result.failures.push(coreResult.failure); + } else { + result.successes.push(coreResult.success); + } + } } + + // TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463) + + return result; } private startKeepAlivePings(interval: number): void { @@ -297,7 +307,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { */ async createSubscription( pubsubTopicShardInfo: ShardingParams | PubsubTopic - ): Promise { + ): Promise { const pubsubTopic = typeof pubsubTopicShardInfo == "string" ? pubsubTopicShardInfo @@ -305,9 +315,21 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - const peers = await this.protocol.getPeers(); + let peers: Peer[] = []; + try { + peers = await this.protocol.getPeers(); + } catch (error) { + log.error("Error getting peers to initiate subscription: ", error); + return { + error: ProtocolError.GENERIC_FAIL, + subscription: null + }; + } if (peers.length === 0) { - throw new Error("No peer found to initiate subscription."); + return { + error: ProtocolError.NO_PEER_AVAILABLE, + subscription: null + }; } log.info( @@ -322,7 +344,10 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { new SubscriptionManager(pubsubTopic, peers, this.protocol) ); - return subscription; + return { + error: null, + subscription + }; } //TODO: remove this dependency on IReceiver @@ -346,21 +371,27 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { callback: Callback, options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { - const pubsubTopics = this.getPubsubTopics(decoders); + const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); - if (pubsubTopics.length === 0) { + if (uniquePubsubTopics.length === 0) { throw Error( "Failed to subscribe: no pubsubTopic found on decoders provided." ); } - if (pubsubTopics.length > 1) { + if (uniquePubsubTopics.length > 1) { throw Error( "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." ); } - const subscription = await this.createSubscription(pubsubTopics[0]); + const { subscription, error } = await this.createSubscription( + uniquePubsubTopics[0] + ); + + if (error) { + throw Error(`Failed to create subscription: ${error}`); + } await subscription.subscribe(decoders, callback, options); @@ -381,7 +412,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { return toAsyncIterator(this, decoders); } - private getPubsubTopics( + private getUniquePubsubTopics( decoders: IDecoder | IDecoder[] ): string[] { if (!Array.isArray(decoders)) { diff --git a/packages/sdk/src/protocols/light_push.ts b/packages/sdk/src/protocols/light_push.ts index 6f61c71989..67e2117df2 100644 --- a/packages/sdk/src/protocols/light_push.ts +++ b/packages/sdk/src/protocols/light_push.ts @@ -8,7 +8,7 @@ import { type Libp2p, type ProtocolCreateOptions, ProtocolError, - type SendResult + SDKProtocolResult } from "@waku/interfaces"; import { ensurePubsubTopicIsConfigured, Logger } from "@waku/utils"; @@ -24,7 +24,7 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { this.protocol = new LightPushCore(libp2p, options); } - async send(encoder: IEncoder, message: IMessage): Promise { + async send(encoder: IEncoder, message: IMessage): Promise { const successes: PeerId[] = []; const failures: Failure[] = []; diff --git a/packages/sdk/src/utils/content_topic.ts b/packages/sdk/src/utils/content_topic.ts index 62a3e7fb2a..68c5eeb320 100644 --- a/packages/sdk/src/utils/content_topic.ts +++ b/packages/sdk/src/utils/content_topic.ts @@ -3,7 +3,7 @@ import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; import { Callback, IDecoder, - IFilterSubscription, + ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces"; @@ -27,7 +27,7 @@ async function prepareSubscription( peer: Multiaddr ): Promise<{ decoder: IDecoder; - subscription: IFilterSubscription; + subscription: ISubscriptionSDK; }> { // Validate that the Waku node matches assumptions if (!waku.filter) { @@ -52,7 +52,10 @@ async function prepareSubscription( // Create decoder and subscription let decoder = createDecoder(contentTopic, pubsubTopic); if (decoder) decoder = decoder ?? decoder; - const subscription = await waku.filter.createSubscription(pubsubTopic); + const { subscription, error } = + await waku.filter.createSubscription(pubsubTopic); + if (error) + throw new Error("Failed to create subscription for content topic."); return { decoder, subscription }; } @@ -86,10 +89,11 @@ export async function streamContentTopic( controller.enqueue(message); }); }, - cancel() { - return subscription.unsubscribe([contentTopic]); + async cancel() { + await subscription.unsubscribe([contentTopic]); } }); + return [messageStream, opts.waku]; } @@ -105,7 +109,7 @@ export async function subscribeToContentTopic( contentTopic: string, callback: Callback, opts: CreateTopicOptions -): Promise<{ subscription: IFilterSubscription; waku: LightNode }> { +): Promise<{ subscription: ISubscriptionSDK; waku: LightNode }> { opts.waku = opts.waku ?? (await createLightNode({ diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index 959d57ad88..f11dd06ec7 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -5,10 +5,10 @@ import { ConnectionManager, DecodedMessage } from "@waku/core"; import type { Callback, IFilterSDK, - IFilterSubscription, ILightPushSDK, IRelay, IStoreSDK, + ISubscriptionSDK, Libp2p, LightNode, ProtocolCreateOptions, @@ -193,7 +193,7 @@ export class WakuNode implements Waku { contentTopic: string, peer: Multiaddr, callback: Callback - ): Promise { + ): Promise { return ( await subscribeToContentTopic(contentTopic, callback, { waku: this as LightNode, diff --git a/packages/tests/src/utils/log_file.ts b/packages/tests/src/utils/log_file.ts index d3b55748d5..7dd10ee9d5 100644 --- a/packages/tests/src/utils/log_file.ts +++ b/packages/tests/src/utils/log_file.ts @@ -50,8 +50,11 @@ function clean(str: string): string { return str.replace(/ /g, "_").replace(/[':()/]/g, ""); } -export function makeLogFileName(ctx: Context): string { - const unitTest = ctx?.currentTest ? ctx!.currentTest : ctx.test; +export function makeLogFileName(ctx: Context | undefined): string { + if (!ctx) { + return "unknown"; + } + const unitTest = ctx.currentTest ? ctx.currentTest : ctx.test; let name = clean(unitTest!.title); let suite = unitTest?.parent; diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index 7020322bcd..c08a4a49a1 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -4,7 +4,7 @@ import { DecodedMessage, waitForRemotePeer } from "@waku/core"; -import { IFilterSubscription, Protocols } from "@waku/interfaces"; +import { ISubscriptionSDK, Protocols } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces"; import { generatePrivateKey, @@ -83,7 +83,7 @@ describe("Waku Message Ephemeral field", function () { let waku: LightNode; let nwaku: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; afterEachCustom(this, async () => { await tearDownNodes(nwaku, waku); @@ -123,9 +123,10 @@ describe("Waku Message Ephemeral field", function () { Protocols.Store ]); - subscription = await waku.filter.createSubscription( - TestEncoder.pubsubTopic - ); + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestEncoder.pubsubTopic); + if (error) throw error; + subscription = _subscription; }); it("Ephemeral messages are not stored", async function () { diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index 4a7dbcd320..68287b54de 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -1,4 +1,4 @@ -import { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { ISubscriptionSDK, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -24,11 +24,14 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(10000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); - subscription = await waku.filter.createSubscription(TestShardInfo); + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) throw error; + subscription = _subscription; }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index 4653c56e24..4d78c4530c 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -1,5 +1,5 @@ import { waitForRemotePeer } from "@waku/core"; -import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; +import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -29,11 +29,15 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(10000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes(this.ctx, TestShardInfo); - subscription = await waku.filter.createSubscription(TestShardInfo); + + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) throw error; + subscription = _subscription; }); afterEachCustom(this, async () => { @@ -238,7 +242,10 @@ const runTests = (strictCheckNodes: boolean): void => { await waku.dial(await node.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); } - subscription = await waku.filter.createSubscription(TestShardInfo); + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) throw error; + subscription = _subscription; await subscription.subscribe( [TestDecoder], serviceNodes.messageCollector.callback diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index 687d41d8ad..ec8d9515dc 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -1,7 +1,7 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import type { ContentTopicInfo, - IFilterSubscription, + ISubscriptionSDK, LightNode, ShardInfo, SingleShardInfo @@ -32,7 +32,7 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; const customPubsubTopic1 = singleShardInfoToPubsubTopic({ @@ -61,7 +61,12 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, shardInfo); - subscription = await waku.filter.createSubscription(shardInfo); + + const { error, subscription: _subscription } = + await waku.filter.createSubscription(shardInfo); + if (error) throw error; + subscription = _subscription; + messageCollector = new MessageCollector(); }); @@ -84,8 +89,11 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { await subscription.subscribe([customDecoder1], messageCollector.callback); // Subscribe from the same lightnode to the 2nd pubsubtopic - const subscription2 = + const { error, subscription: subscription2 } = await waku.filter.createSubscription(customPubsubTopic2); + if (error) { + throw error; + } const messageCollector2 = new MessageCollector(); @@ -126,8 +134,13 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic - const subscription2 = + const { error, subscription: subscription2 } = await waku.filter.createSubscription(customPubsubTopic2); + + if (error) { + throw error; + } + await nwaku2.ensureSubscriptions([customPubsubTopic2]); const messageCollector2 = new MessageCollector(); @@ -180,7 +193,7 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; const customContentTopic1 = "/waku/2/content/utf8"; @@ -222,9 +235,10 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, contentTopicInfo); - subscription = await waku.filter.createSubscription( - autoshardingPubsubTopic1 - ); + const { error, subscription: _subscription } = + await waku.filter.createSubscription(autoshardingPubsubTopic1); + if (error) throw error; + subscription = _subscription; messageCollector = new MessageCollector(); }); @@ -251,9 +265,12 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { await subscription.subscribe([customDecoder1], messageCollector.callback); // Subscribe from the same lightnode to the 2nd pubsubtopic - const subscription2 = await waku.filter.createSubscription( - autoshardingPubsubTopic2 - ); + const { error, subscription: subscription2 } = + await waku.filter.createSubscription(autoshardingPubsubTopic2); + + if (error) { + throw error; + } const messageCollector2 = new MessageCollector(); @@ -303,9 +320,13 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic - const subscription2 = await waku.filter.createSubscription( - autoshardingPubsubTopic2 - ); + const { error, subscription: subscription2 } = + await waku.filter.createSubscription(autoshardingPubsubTopic2); + + if (error) { + throw error; + } + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); const messageCollector2 = new MessageCollector(); @@ -357,7 +378,7 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { let waku: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; const customPubsubTopic1 = singleShardInfoToPubsubTopic({ @@ -387,7 +408,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, shardInfo); - subscription = await waku.filter.createSubscription(customPubsubTopic1); + const { error, subscription: _subscription } = + await waku.filter.createSubscription(customPubsubTopic1); + if (error) throw error; + subscription = _subscription; + messageCollector = new MessageCollector(); }); @@ -410,8 +435,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { await subscription.subscribe([customDecoder1], messageCollector.callback); // Subscribe from the same lightnode to the 2nd pubsubtopic - const subscription2 = + const { error, subscription: subscription2 } = await waku.filter.createSubscription(customPubsubTopic2); + if (error) { + throw error; + } const messageCollector2 = new MessageCollector(); @@ -452,8 +480,11 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic - const subscription2 = + const { error, subscription: subscription2 } = await waku.filter.createSubscription(customPubsubTopic2); + if (error) { + throw error; + } await nwaku2.ensureSubscriptions([customPubsubTopic2]); const messageCollector2 = new MessageCollector(); diff --git a/packages/tests/tests/filter/single_node/ping.node.spec.ts b/packages/tests/tests/filter/single_node/ping.node.spec.ts index 59ca6c4130..71040dfaee 100644 --- a/packages/tests/tests/filter/single_node/ping.node.spec.ts +++ b/packages/tests/tests/filter/single_node/ping.node.spec.ts @@ -1,4 +1,4 @@ -import { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { ISubscriptionSDK, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -24,12 +24,16 @@ describe("Waku Filter V2: Ping", function () { this.timeout(10000); let waku: LightNode; let nwaku: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - subscription = await waku.filter.createSubscription(TestShardInfo); + + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) throw error; + subscription = _subscription; messageCollector = new MessageCollector(); }); diff --git a/packages/tests/tests/filter/single_node/push.node.spec.ts b/packages/tests/tests/filter/single_node/push.node.spec.ts index 5467402afb..fdb6a75760 100644 --- a/packages/tests/tests/filter/single_node/push.node.spec.ts +++ b/packages/tests/tests/filter/single_node/push.node.spec.ts @@ -1,5 +1,5 @@ import { waitForRemotePeer } from "@waku/core"; -import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; +import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -28,12 +28,17 @@ describe("Waku Filter V2: FilterPush", function () { this.timeout(10000); let waku: LightNode; let nwaku: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - subscription = await waku.filter.createSubscription(TestShardInfo); + + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) throw error; + subscription = _subscription; + messageCollector = new MessageCollector(nwaku); }); @@ -219,7 +224,10 @@ describe("Waku Filter V2: FilterPush", function () { // Redo the connection and create a new subscription await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - subscription = await waku.filter.createSubscription(); + const { error, subscription: _subscription } = + await waku.filter.createSubscription(); + if (error) throw error; + subscription = _subscription; await subscription.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts index 421f7d5755..a521147921 100644 --- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts @@ -1,5 +1,5 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; -import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; +import { ISubscriptionSDK, LightNode, Protocols } from "@waku/interfaces"; import { ecies, generatePrivateKey, @@ -40,14 +40,18 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { let waku2: LightNode; let nwaku: ServiceNode; let nwaku2: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; let ctx: Context; beforeEachCustom(this, async () => { - ctx = this.ctx; [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - subscription = await waku.filter.createSubscription(TestShardInfo); + + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) throw error; + subscription = _subscription; + messageCollector = new MessageCollector(); await nwaku.ensureSubscriptions([TestPubsubTopic]); }); @@ -282,10 +286,15 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); try { - await subscription.subscribe(td.decoders, messageCollector.callback); - throw new Error( - `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` + const { failures, successes } = await subscription.subscribe( + td.decoders, + messageCollector.callback ); + if (failures.length === 0 || successes.length > 0) { + throw new Error( + `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` + ); + } } catch (err) { if ( err instanceof Error && @@ -387,7 +396,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); // Create a second subscription on a different topic - const subscription2 = await waku.filter.createSubscription(TestShardInfo); + const { error, subscription: subscription2 } = + await waku.filter.createSubscription(TestShardInfo); + if (error) { + throw error; + } const newContentTopic = "/test/2/waku-filter/default"; const newEncoder = createEncoder({ contentTopic: newContentTopic, @@ -419,7 +432,11 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () { [nwaku2, waku2] = await runNodes(ctx, TestShardInfo); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - const subscription2 = await waku.filter.createSubscription(TestShardInfo); + const { error, subscription: subscription2 } = + await waku.filter.createSubscription(TestShardInfo); + if (error) { + throw error; + } await nwaku2.ensureSubscriptions([TestPubsubTopic]); // Send a message using the new subscription const newContentTopic = "/test/2/waku-filter/default"; diff --git a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts index 44d3b3bd47..1f7a6258a4 100644 --- a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts @@ -1,5 +1,5 @@ import { createDecoder, createEncoder } from "@waku/core"; -import { IFilterSubscription } from "@waku/interfaces"; +import { ISubscriptionSDK } from "@waku/interfaces"; import { LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -28,12 +28,16 @@ describe("Waku Filter V2: Unsubscribe", function () { this.timeout(10000); let waku: LightNode; let nwaku: ServiceNode; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; let messageCollector: MessageCollector; beforeEachCustom(this, async () => { [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - subscription = await waku.filter.createSubscription(TestShardInfo); + + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) throw error; + subscription = _subscription; messageCollector = new MessageCollector(); await nwaku.ensureSubscriptions([TestPubsubTopic]); }); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 833358baab..ee248c68ec 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -1,5 +1,5 @@ import { createDecoder, createEncoder } from "@waku/core"; -import { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { ISubscriptionSDK, LightNode } from "@waku/interfaces"; import { ecies, generatePrivateKey, @@ -36,7 +36,7 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(100000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes( @@ -44,7 +44,12 @@ const runTests = (strictCheckNodes: boolean): void => { TestShardInfo, strictCheckNodes ); - subscription = await waku.filter.createSubscription(TestShardInfo); + const { error, subscription: _subscription } = + await waku.filter.createSubscription(TestShardInfo); + + if (!error) { + subscription = _subscription; + } }); afterEachCustom(this, async () => { @@ -330,13 +335,15 @@ const runTests = (strictCheckNodes: boolean): void => { const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); try { - await subscription.subscribe( + const { failures, successes } = await subscription.subscribe( td.decoders, serviceNodes.messageCollector.callback ); - throw new Error( - `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` - ); + if (failures.length === 0 || successes.length > 0) { + throw new Error( + `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` + ); + } } catch (err) { if ( err instanceof Error && @@ -461,7 +468,11 @@ const runTests = (strictCheckNodes: boolean): void => { await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); // Create a second subscription on a different topic - const subscription2 = await waku.filter.createSubscription(TestShardInfo); + const { error, subscription: subscription2 } = + await waku.filter.createSubscription(TestShardInfo); + if (error) { + throw error; + } const newContentTopic = "/test/2/waku-filter/default"; const newEncoder = createEncoder({ contentTopic: newContentTopic, diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index ff130190c6..9dc362bb08 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -1,5 +1,5 @@ import { createDecoder, createEncoder } from "@waku/core"; -import { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { ISubscriptionSDK, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; @@ -28,18 +28,22 @@ const runTests = (strictCheckNodes: boolean): void => { this.timeout(10000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; - let subscription: IFilterSubscription; + let subscription: ISubscriptionSDK; beforeEachCustom(this, async () => { [serviceNodes, waku] = await runMultipleNodes(this.ctx, { contentTopics: [TestContentTopic], clusterId: ClusterId }); + const { error, subscription: _subscription } = + await waku.filter.createSubscription({ + contentTopics: [TestContentTopic], + clusterId: ClusterId + }); - subscription = await waku.filter.createSubscription({ - contentTopics: [TestContentTopic], - clusterId: ClusterId - }); + if (!error) { + subscription = _subscription; + } }); afterEachCustom(this, async () => { diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 8341bd6211..86b7ae7ea4 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,6 +1,6 @@ import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { - IFilterSubscription, + ISubscriptionSDK, LightNode, ProtocolCreateOptions, Protocols, @@ -45,13 +45,15 @@ export const messagePayload = { payload: utf8ToBytes(messageText) }; // Utility to validate errors related to pings in the subscription. export async function validatePingError( - subscription: IFilterSubscription + subscription: ISubscriptionSDK ): Promise { try { - await subscription.ping(); - throw new Error( - "Ping was successful but was expected to fail with a specific error." - ); + const { failures, successes } = await subscription.ping(); + if (failures.length === 0 || successes.length > 0) { + throw new Error( + "Ping was successful but was expected to fail with a specific error." + ); + } } catch (err) { if ( err instanceof Error && diff --git a/packages/tests/tests/peer-exchange/query.spec.ts b/packages/tests/tests/peer-exchange/query.spec.ts index 9ebb832af5..8dd70aad28 100644 --- a/packages/tests/tests/peer-exchange/query.spec.ts +++ b/packages/tests/tests/peer-exchange/query.spec.ts @@ -7,7 +7,7 @@ import { WakuPeerExchange, wakuPeerExchangeDiscovery } from "@waku/discovery"; -import type { LightNode, PeerExchangeResult } from "@waku/interfaces"; +import type { LightNode, PeerExchangeQueryResult } from "@waku/interfaces"; import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk"; import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; import { expect } from "chai"; @@ -38,7 +38,7 @@ describe("Peer Exchange Query", function () { let components: Libp2pComponents; let peerExchange: WakuPeerExchange; let numPeersToRequest: number; - let queryResult: PeerExchangeResult; + let queryResult: PeerExchangeQueryResult; beforeEachCustom( this, @@ -99,7 +99,7 @@ describe("Peer Exchange Query", function () { peerId: nwaku3PeerId, numPeers: numPeersToRequest }), - new Promise((resolve) => + new Promise((resolve) => setTimeout( () => resolve({