diff --git a/packages/sdk/src/reliability_monitor/receiver.ts b/packages/sdk/src/reliability_monitor/receiver.ts index ea58c1b17c..ab2a84bda4 100644 --- a/packages/sdk/src/reliability_monitor/receiver.ts +++ b/packages/sdk/src/reliability_monitor/receiver.ts @@ -115,7 +115,7 @@ export class ReceiverReliabilityMonitor { return; } - const timeout = window.setTimeout( + const timeout = setTimeout( (async () => { const receivedAnyMessage = this.verifiedPeers.has(peerIdStr); const receivedTestMessage = this.receivedMessagesFormPeer.has( @@ -136,7 +136,7 @@ export class ReceiverReliabilityMonitor { await this.renewAndSubscribePeer(peerId); }) as () => void, MESSAGE_VERIFICATION_DELAY - ); + ) as unknown as number; this.scheduledVerification.set(peerIdStr, timeout); } diff --git a/packages/sdk/src/waku/wait_for_remote_peer.spec.ts b/packages/sdk/src/waku/wait_for_remote_peer.spec.ts index f3d71a1b32..88c9828a11 100644 --- a/packages/sdk/src/waku/wait_for_remote_peer.spec.ts +++ b/packages/sdk/src/waku/wait_for_remote_peer.spec.ts @@ -7,7 +7,7 @@ import sinon from "sinon"; import { waitForRemotePeer } from "./wait_for_remote_peer.js"; import { WakuNode } from "./waku.js"; -describe("waitForRemotePeer", () => { +describe.only("waitForRemotePeer", () => { let eventTarget = new EventTarget(); beforeEach(() => { @@ -121,8 +121,8 @@ describe("waitForRemotePeer", () => { }); it("should check connected peers if present and suitable", async () => { - const addEventListenerSpy = sinon.spy(eventTarget.addEventListener); - eventTarget.addEventListener = addEventListenerSpy; + const removeEventListenerSpy = sinon.spy(eventTarget.removeEventListener); + eventTarget.removeEventListener = removeEventListenerSpy; const wakuNode = mockWakuNode({ isStarted: true, @@ -144,7 +144,7 @@ describe("waitForRemotePeer", () => { } expect(err).to.be.undefined; - expect(addEventListenerSpy.notCalled).to.be.true; + expect(removeEventListenerSpy.notCalled).to.be.true; }); it("should wait for LightPush peer to be connected", async () => { diff --git a/packages/sdk/src/waku/wait_for_remote_peer.ts b/packages/sdk/src/waku/wait_for_remote_peer.ts index 8e2e7a7296..ae10d6be8a 100644 --- a/packages/sdk/src/waku/wait_for_remote_peer.ts +++ b/packages/sdk/src/waku/wait_for_remote_peer.ts @@ -40,57 +40,76 @@ export async function waitForRemotePeer( throw Error("Waku node is not started"); } + for (const protocol of protocols) { + switch (protocol) { + case Protocols.Relay: + if (!waku.relay) + throw Error("Cannot wait for Relay peer: protocol not mounted"); + break; + case Protocols.LightPush: + if (!waku.lightPush) + throw Error("Cannot wait for LightPush peer: protocol not mounted"); + break; + case Protocols.Store: + if (!waku.store) + throw Error("Cannot wait for Store peer: protocol not mounted"); + break; + case Protocols.Filter: + if (!waku.filter) + throw Error("Cannot wait for Filter peer: protocol not mounted"); + break; + } + } + + const promises = [waitForProtocols(waku, protocols)]; + if (connections.length > 0 && !protocols.includes(Protocols.Relay)) { - const success = await waitForMetadata(waku, protocols); + promises.push( + waitForMetadata(waku, protocols) as unknown as Promise + ); + } - if (success) { - return; - } + if (timeoutMs) { + await rejectOnTimeout( + Promise.any(promises), + timeoutMs, + "Timed out waiting for a remote peer." + ); + } else { + await Promise.any(promises); } +} + +type EventListener = (_: CustomEvent) => void; +/** + * Waits for required peers to be connected. + */ +async function waitForProtocols( + waku: IWaku, + protocols: Protocols[] +): Promise { const promises = []; - if (protocols.includes(Protocols.Relay)) { - if (!waku.relay) { - throw Error("Cannot wait for Relay peer: protocol not mounted"); - } + if (waku.relay && protocols.includes(Protocols.Relay)) { promises.push(waku.relay.waitForPeers()); } - if (protocols.includes(Protocols.Store)) { - if (!waku.store) { - throw Error("Cannot wait for Store peer: protocol not mounted"); - } + if (waku.store && protocols.includes(Protocols.Store)) { promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p)); } - if (protocols.includes(Protocols.LightPush)) { - if (!waku.lightPush) { - throw Error("Cannot wait for LightPush peer: protocol not mounted"); - } + if (waku.lightPush && protocols.includes(Protocols.LightPush)) { promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p)); } - if (protocols.includes(Protocols.Filter)) { - if (!waku.filter) { - throw new Error("Cannot wait for Filter peer: protocol not mounted"); - } + if (waku.filter && protocols.includes(Protocols.Filter)) { promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p)); } - if (timeoutMs) { - await rejectOnTimeout( - Promise.all(promises), - timeoutMs, - "Timed out waiting for a remote peer." - ); - } else { - await Promise.all(promises); - } + return Promise.all(promises); } -type EventListener = (_: CustomEvent) => void; - /** * Wait for a peer with the given protocol to be connected. * If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service. @@ -135,12 +154,12 @@ async function waitForConnectedPeer( } /** - * Waits for the metadata from the remote peer. + * Checks existing connections for needed metadata. */ async function waitForMetadata( waku: IWaku, protocols: Protocols[] -): Promise { +): Promise { const connectedPeers = waku.libp2p.getPeers(); const metadataService = waku.libp2p.services.metadata; const enabledCodes = mapProtocolsToCodecs(protocols); @@ -149,7 +168,7 @@ async function waitForMetadata( log.info( `Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}` ); - return false; + return; } for (const peerId of connectedPeers) { @@ -173,7 +192,7 @@ async function waitForMetadata( ); if (confirmedAllCodecs) { - return true; + return; } } } @@ -187,8 +206,6 @@ async function waitForMetadata( continue; } } - - return false; } const awaitTimeout = (ms: number, rejectReason: string): Promise =>