From 8d13d89d1cecca8fa3581c6eb4363db93edababe Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 13:29:22 +0530 Subject: [PATCH 01/14] maintain pings in a hashmap --- packages/core/src/lib/keep_alive_manager.ts | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index b72c6743a5..335afe59f6 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -14,12 +14,14 @@ export class KeepAliveManager { private relayKeepAliveTimers: Map>; private options: KeepAliveOptions; private relay?: IRelay; + public peerPings: Map; constructor(options: KeepAliveOptions, relay?: IRelay) { this.pingKeepAliveTimers = new Map(); this.relayKeepAliveTimers = new Map(); this.options = options; this.relay = relay; + this.peerPings = new Map(); } public start(peerId: PeerId, libp2pPing: PingService): void { @@ -33,9 +35,15 @@ export class KeepAliveManager { if (pingPeriodSecs !== 0) { const interval = setInterval(() => { - libp2pPing.ping(peerId).catch((e) => { - log(`Ping failed (${peerIdStr})`, e); - }); + libp2pPing + .ping(peerId) + .then((ping) => { + log(`Ping succeeded (${peerIdStr})`, ping); + this.peerPings.set(peerIdStr, ping); + }) + .catch((e) => { + log(`Ping failed (${peerIdStr})`, e); + }); }, pingPeriodSecs * 1000); this.pingKeepAliveTimers.set(peerIdStr, interval); } From 27647183bb23177f4be6eb1fab807f4d7d8c97cc Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 13:30:14 +0530 Subject: [PATCH 02/14] convert `KeepAliveManager` into a singleton --- packages/core/src/lib/keep_alive_manager.ts | 38 +++++++++++++++++++-- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 335afe59f6..b65674e930 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -10,21 +10,53 @@ export const RelayPingContentTopic = "/relay-ping/1/ping/null"; const log = debug("waku:keep-alive"); export class KeepAliveManager { + private static instance: KeepAliveManager; + private pingKeepAliveTimers: Map>; private relayKeepAliveTimers: Map>; private options: KeepAliveOptions; private relay?: IRelay; + private libp2pPing: PingService; public peerPings: Map; - constructor(options: KeepAliveOptions, relay?: IRelay) { + private constructor( + libp2pPing: PingService, + options: KeepAliveOptions, + relay?: IRelay + ) { this.pingKeepAliveTimers = new Map(); this.relayKeepAliveTimers = new Map(); this.options = options; this.relay = relay; this.peerPings = new Map(); + this.libp2pPing = libp2pPing; + } + + public static createInstance( + libp2pPing: PingService, + options: KeepAliveOptions, + relay?: IRelay + ): KeepAliveManager { + if (!KeepAliveManager.instance) { + KeepAliveManager.instance = new KeepAliveManager( + libp2pPing, + options, + relay + ); + } + return KeepAliveManager.instance; + } + + public static getInstance(): KeepAliveManager { + if (!KeepAliveManager.instance) { + throw new Error( + "KeepAliveManager not initialized - please use createInstance() first" + ); + } + return KeepAliveManager.instance; } - public start(peerId: PeerId, libp2pPing: PingService): void { + public start(peerId: PeerId): void { // Just in case a timer already exist for this peer this.stop(peerId); @@ -35,7 +67,7 @@ export class KeepAliveManager { if (pingPeriodSecs !== 0) { const interval = setInterval(() => { - libp2pPing + this.libp2pPing .ping(peerId) .then((ping) => { log(`Ping succeeded (${peerIdStr})`, ping); From f3afd6fcb00fe6ee1b36f768473fbfb2862d9e7d Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 13:30:48 +0530 Subject: [PATCH 03/14] chore: fix an unrelated cyclic dependency error --- packages/core/src/lib/keep_alive_manager.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index b65674e930..6b9da29961 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -4,7 +4,7 @@ import type { KeepAliveOptions } from "@waku/interfaces"; import debug from "debug"; import type { PingService } from "libp2p/ping"; -import { createEncoder } from "../index.js"; +import { createEncoder } from "./message/version_0.js"; export const RelayPingContentTopic = "/relay-ping/1/ping/null"; const log = debug("waku:keep-alive"); From 160c680200955e3b66d179de91332ee2f675b15e Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 13:34:13 +0530 Subject: [PATCH 04/14] update `selectPeerForProtocol` to return peer with the lowest latency --- packages/utils/src/libp2p/index.ts | 45 ++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 5 deletions(-) diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 6697ff10fe..0091c47269 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -16,6 +16,32 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined { return peers[index]; } +/** + * Returns the peer with the lowest latency. + * @param getPing - A function that returns the latency for a given peer + * @param peers - The list of peers to choose from + * @returns The peer with the lowest latency, or undefined if no peer could be reached + */ +export async function selectLowestLatencyPeer( + peerPings: Map, + peers: Peer[] +): Promise { + if (peers.length === 0) return; + + const results = await Promise.all( + peers.map((peer) => { + const ping = peerPings.get(peer.id.toString()) ?? Infinity; + return { peer, ping }; + }) + ); + + const lowestLatencyResult = results.sort((a, b) => a.ping - b.ping)[0]; + + return lowestLatencyResult.ping !== Infinity + ? lowestLatencyResult.peer + : undefined; +} + /** * Returns the list of peers that supports the given protocol. */ @@ -35,12 +61,19 @@ export async function getPeersForProtocol( return peers; } +/** + * Returns a peer that supports the given protocol. + * If peerId is provided, the peer with that id is returned. + * Otherwise, the peer with the lowest latency is returned. + * If no peer is found from the above criteria, a random peer is returned. + */ export async function selectPeerForProtocol( peerStore: PeerStore, + peerPings: Map, protocols: string[], peerId?: PeerId ): Promise<{ peer: Peer; protocol: string }> { - let peer; + let peer: Peer | undefined; if (peerId) { peer = await peerStore.get(peerId); if (!peer) { @@ -50,11 +83,13 @@ export async function selectPeerForProtocol( } } else { const peers = await getPeersForProtocol(peerStore, protocols); - peer = selectRandomPeer(peers); + peer = await selectLowestLatencyPeer(peerPings, peers); if (!peer) { - throw new Error( - `Failed to find known peer that registers protocols: ${protocols}` - ); + peer = selectRandomPeer(peers); + if (!peer) + throw new Error( + `Failed to find known peer that registers protocols: ${protocols}` + ); } } From 6a07f4546c1e4679a992c73b338c15e795bc27a0 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 13:45:31 +0530 Subject: [PATCH 05/14] use the new KeepAliveManager API --- packages/core/src/lib/connection_manager.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 30a051aaf7..5fa527fe5d 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -116,7 +116,11 @@ export class ConnectionManager ...options }; - this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay); + this.keepAliveManager = KeepAliveManager.createInstance( + this.libp2p.services.ping, + keepAliveOptions, + relay + ); this.run() .then(() => log(`Connection Manager is now running`)) @@ -340,7 +344,7 @@ export class ConnectionManager void (async () => { const peerId = evt.detail; - this.keepAliveManager.start(peerId, this.libp2p.services.ping); + this.keepAliveManager.start(peerId); const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( Tags.BOOTSTRAP From 0e46824c8c5c3c0e3c97aa7bf1c5f17bf54f1e6d Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 13:46:50 +0530 Subject: [PATCH 06/14] use the new API for `selectPeerForProtocol` --- packages/core/src/lib/base_protocol.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 1546fda480..113538529f 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -6,6 +6,7 @@ import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; import { filterPeers } from "./filterPeers.js"; +import { KeepAliveManager } from "./keep_alive_manager.js"; import { StreamManager } from "./stream_manager.js"; /** @@ -54,8 +55,10 @@ export class BaseProtocol implements IBaseProtocol { } protected async getPeer(peerId?: PeerId): Promise { + const { peerPings } = KeepAliveManager.getInstance(); const { peer } = await selectPeerForProtocol( this.peerStore, + peerPings, [this.multicodec], peerId ); From 42eb3cb393cc4a377ea4405a23c529be41e7732c Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 13:49:23 +0530 Subject: [PATCH 07/14] add tests --- package-lock.json | 7 +- packages/tests/package.json | 7 +- packages/tests/tests/utils.spec.ts | 134 ++++++++++++++++++++++++++++- 3 files changed, 142 insertions(+), 6 deletions(-) diff --git a/package-lock.json b/package-lock.json index 913acc2b8e..402ffad63c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7370,7 +7370,8 @@ }, "node_modules/chai-as-promised": { "version": "7.1.1", - "license": "WTFPL", + "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-7.1.1.tgz", + "integrity": "sha512-azL6xMoi+uxu6z4rhWQ1jbdUhOMhis2PvscD/xjLqNMkv3BPPp2JyyuTHOrf9BOosGpNQ11v6BKv/g57RXbiaA==", "dependencies": { "check-error": "^1.0.2" }, @@ -27384,6 +27385,7 @@ "@waku/interfaces": "*", "@waku/utils": "*", "app-root-path": "^3.1.0", + "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", "p-timeout": "^6.1.0", @@ -31445,6 +31447,7 @@ "@waku/utils": "*", "app-root-path": "^3.1.0", "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", "cspell": "^7.3.2", "datastore-core": "^9.2.2", "debug": "^4.3.4", @@ -32756,6 +32759,8 @@ }, "chai-as-promised": { "version": "7.1.1", + "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-7.1.1.tgz", + "integrity": "sha512-azL6xMoi+uxu6z4rhWQ1jbdUhOMhis2PvscD/xjLqNMkv3BPPp2JyyuTHOrf9BOosGpNQ11v6BKv/g57RXbiaA==", "requires": { "check-error": "^1.0.2" } diff --git a/packages/tests/package.json b/packages/tests/package.json index 5ea6005a58..30bcc7f820 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -57,6 +57,7 @@ "@waku/interfaces": "*", "@waku/utils": "*", "app-root-path": "^3.1.0", + "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", "p-timeout": "^6.1.0", @@ -66,20 +67,20 @@ }, "devDependencies": { "@libp2p/bootstrap": "^9.0.2", - "@types/sinon": "^10.0.16", "@types/chai": "^4.3.5", "@types/dockerode": "^3.3.19", "@types/mocha": "^10.0.1", + "@types/sinon": "^10.0.16", "@types/tail": "^2.2.1", "@typescript-eslint/eslint-plugin": "^5.57.0", "@typescript-eslint/parser": "^6.6.0", - "@waku/sdk": "*", "@waku/dns-discovery": "*", "@waku/message-encryption": "*", "@waku/peer-exchange": "*", + "@waku/sdk": "*", "chai": "^4.3.7", - "datastore-core": "^9.2.2", "cspell": "^7.3.2", + "datastore-core": "^9.2.2", "debug": "^4.3.4", "interface-datastore": "^8.2.3", "libp2p": "^0.46.8", diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 42935d60ec..37e5c5d015 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -1,3 +1,6 @@ +import type { PeerStore } from "@libp2p/interface/peer-store"; +import type { Peer } from "@libp2p/interface/peer-store"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createDecoder, createEncoder, @@ -9,11 +12,16 @@ import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; +import { selectPeerForProtocol } from "@waku/utils/libp2p"; +import chai, { expect } from "chai"; +import chaiAsPromised from "chai-as-promised"; +import sinon from "sinon"; -import { makeLogFileName, NOISE_KEY_1 } from "../src/index.js"; +import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js"; import { NimGoNode } from "../src/node/node.js"; +chai.use(chaiAsPromised); + const TestContentTopic = "/test/1/waku-filter"; const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); const TestDecoder = createDecoder(TestContentTopic); @@ -106,3 +114,125 @@ describe("Util: toAsyncIterator: Filter", () => { expect(result.done).to.eq(true); }); }); + +const TestCodec = "test/1"; + +describe("selectPeerForProtocol", () => { + let peerStore: PeerStore; + let peerPings: Map; + const protocols = [TestCodec]; + + beforeEach(async function () { + this.timeout(10000); + const waku = await createLightNode(); + await waku.start(); + await delay(3000); + peerStore = waku.libp2p.peerStore; + peerPings = new Map(); + }); + + afterEach(() => { + sinon.restore(); + }); + + it("should return the peer with the lowest ping", async function () { + const peer1 = await createSecp256k1PeerId(); + const peer2 = await createSecp256k1PeerId(); + const peer3 = await createSecp256k1PeerId(); + + const mockPeers = [ + { id: peer1, protocols: [TestCodec] }, + { id: peer2, protocols: [TestCodec] }, + { id: peer3, protocols: [TestCodec] } + ] as Peer[]; + + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { + for (const peer of mockPeers) { + callback(peer); + } + }); + + peerPings.set(peer1.toString(), 500); + peerPings.set(peer2.toString(), 1000); + peerPings.set(peer3.toString(), 100); + + const result = await selectPeerForProtocol(peerStore, peerPings, protocols); + + expect(result.peer).to.deep.equal(mockPeers[2]); + expect(result.protocol).to.equal(TestCodec); + }); + + it("should return the peer with the provided peerId", async function () { + const targetPeer = await createSecp256k1PeerId(); + const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer; + sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer); + + const result = await selectPeerForProtocol( + peerStore, + peerPings, + protocols, + targetPeer + ); + expect(result.peer).to.deep.equal(mockPeer); + }); + + it("should return a random peer when all peers have the same latency", async function () { + const peer1 = await createSecp256k1PeerId(); + const peer2 = await createSecp256k1PeerId(); + const peer3 = await createSecp256k1PeerId(); + + const mockPeers = [ + { id: peer1, protocols: [TestCodec] }, + { id: peer2, protocols: [TestCodec] }, + { id: peer3, protocols: [TestCodec] } + ] as Peer[]; + + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { + for (const peer of mockPeers) { + callback(peer); + } + }); + + peerPings.set(peer1.toString(), 500); + peerPings.set(peer2.toString(), 500); + peerPings.set(peer3.toString(), 500); + + const result = await selectPeerForProtocol(peerStore, peerPings, protocols); + + expect(mockPeers).to.deep.include(result.peer); + }); + + it("should throw an error when no peer matches the given protocols", async function () { + const mockPeers = [ + { id: await createSecp256k1PeerId(), protocols: ["DifferentCodec"] }, + { + id: await createSecp256k1PeerId(), + protocols: ["AnotherDifferentCodec"] + } + ] as Peer[]; + + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { + for (const peer of mockPeers) { + callback(peer); + } + }); + + await expect( + selectPeerForProtocol(peerStore, peerPings, protocols) + ).to.be.rejectedWith( + `Failed to find known peer that registers protocols: ${protocols}` + ); + }); + + it("should throw an error when the selected peer does not register the required protocols", async function () { + const targetPeer = await createSecp256k1PeerId(); + const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer; + sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer); + + await expect( + selectPeerForProtocol(peerStore, peerPings, protocols, targetPeer) + ).to.be.rejectedWith( + `Peer does not register required protocols (${targetPeer.toString()}): ${protocols}` + ); + }); +}); From a37892eab14e38f797181dacb4c1237fbc89e4eb Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 22:52:17 +0530 Subject: [PATCH 08/14] use PeerData to hold the ping instead of a new variable --- packages/core/src/lib/base_protocol.ts | 3 - packages/core/src/lib/connection_manager.ts | 1 + packages/core/src/lib/keep_alive_manager.ts | 17 +++++- packages/tests/tests/utils.spec.ts | 67 ++++++++++++++------- packages/utils/src/libp2p/index.ts | 14 +++-- 5 files changed, 68 insertions(+), 34 deletions(-) diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 113538529f..1546fda480 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -6,7 +6,6 @@ import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces"; import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; import { filterPeers } from "./filterPeers.js"; -import { KeepAliveManager } from "./keep_alive_manager.js"; import { StreamManager } from "./stream_manager.js"; /** @@ -55,10 +54,8 @@ export class BaseProtocol implements IBaseProtocol { } protected async getPeer(peerId?: PeerId): Promise { - const { peerPings } = KeepAliveManager.getInstance(); const { peer } = await selectPeerForProtocol( this.peerStore, - peerPings, [this.multicodec], peerId ); diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 5fa527fe5d..3dc49227ab 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -118,6 +118,7 @@ export class ConnectionManager this.keepAliveManager = KeepAliveManager.createInstance( this.libp2p.services.ping, + this.libp2p.peerStore, keepAliveOptions, relay ); diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 6b9da29961..b05ba4874b 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -1,6 +1,8 @@ import type { PeerId } from "@libp2p/interface/peer-id"; +import type { PeerStore } from "@libp2p/interface/peer-store"; import type { IRelay } from "@waku/interfaces"; import type { KeepAliveOptions } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; import type { PingService } from "libp2p/ping"; @@ -17,10 +19,11 @@ export class KeepAliveManager { private options: KeepAliveOptions; private relay?: IRelay; private libp2pPing: PingService; - public peerPings: Map; + private peerStore: PeerStore; private constructor( libp2pPing: PingService, + peerStore: PeerStore, options: KeepAliveOptions, relay?: IRelay ) { @@ -28,18 +31,20 @@ export class KeepAliveManager { this.relayKeepAliveTimers = new Map(); this.options = options; this.relay = relay; - this.peerPings = new Map(); this.libp2pPing = libp2pPing; + this.peerStore = peerStore; } public static createInstance( libp2pPing: PingService, + peerStore: PeerStore, options: KeepAliveOptions, relay?: IRelay ): KeepAliveManager { if (!KeepAliveManager.instance) { KeepAliveManager.instance = new KeepAliveManager( libp2pPing, + peerStore, options, relay ); @@ -71,7 +76,13 @@ export class KeepAliveManager { .ping(peerId) .then((ping) => { log(`Ping succeeded (${peerIdStr})`, ping); - this.peerPings.set(peerIdStr, ping); + this.peerStore + .patch(peerId, { + metadata: { + ping: utf8ToBytes(ping.toString()) + } + }) + .catch((e) => log("Failed to update ping", e)); }) .catch((e) => { log(`Ping failed (${peerIdStr})`, e); diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 37e5c5d015..0d6c2a155b 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -117,9 +117,8 @@ describe("Util: toAsyncIterator: Filter", () => { const TestCodec = "test/1"; -describe("selectPeerForProtocol", () => { +describe.only("selectPeerForProtocol", () => { let peerStore: PeerStore; - let peerPings: Map; const protocols = [TestCodec]; beforeEach(async function () { @@ -128,7 +127,6 @@ describe("selectPeerForProtocol", () => { await waku.start(); await delay(3000); peerStore = waku.libp2p.peerStore; - peerPings = new Map(); }); afterEach(() => { @@ -141,24 +139,36 @@ describe("selectPeerForProtocol", () => { const peer3 = await createSecp256k1PeerId(); const mockPeers = [ - { id: peer1, protocols: [TestCodec] }, - { id: peer2, protocols: [TestCodec] }, - { id: peer3, protocols: [TestCodec] } + { + id: peer1, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("500")) + }, + { + id: peer2, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("50")) + }, + { + id: peer3, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("100")) + } ] as Peer[]; + sinon.stub(peerStore, "get").callsFake(async (peerId) => { + return mockPeers.find((peer) => peer.id.equals(peerId))!; + }); + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { for (const peer of mockPeers) { callback(peer); } }); - peerPings.set(peer1.toString(), 500); - peerPings.set(peer2.toString(), 1000); - peerPings.set(peer3.toString(), 100); - - const result = await selectPeerForProtocol(peerStore, peerPings, protocols); + const result = await selectPeerForProtocol(peerStore, protocols); - expect(result.peer).to.deep.equal(mockPeers[2]); + expect(result.peer).to.deep.equal(mockPeers[1]); expect(result.protocol).to.equal(TestCodec); }); @@ -169,7 +179,6 @@ describe("selectPeerForProtocol", () => { const result = await selectPeerForProtocol( peerStore, - peerPings, protocols, targetPeer ); @@ -182,22 +191,34 @@ describe("selectPeerForProtocol", () => { const peer3 = await createSecp256k1PeerId(); const mockPeers = [ - { id: peer1, protocols: [TestCodec] }, - { id: peer2, protocols: [TestCodec] }, - { id: peer3, protocols: [TestCodec] } + { + id: peer1, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("500")) + }, + { + id: peer2, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("500")) + }, + { + id: peer3, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("500")) + } ] as Peer[]; + sinon.stub(peerStore, "get").callsFake(async (peerId) => { + return mockPeers.find((peer) => peer.id.equals(peerId))!; + }); + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { for (const peer of mockPeers) { callback(peer); } }); - peerPings.set(peer1.toString(), 500); - peerPings.set(peer2.toString(), 500); - peerPings.set(peer3.toString(), 500); - - const result = await selectPeerForProtocol(peerStore, peerPings, protocols); + const result = await selectPeerForProtocol(peerStore, protocols); expect(mockPeers).to.deep.include(result.peer); }); @@ -218,7 +239,7 @@ describe("selectPeerForProtocol", () => { }); await expect( - selectPeerForProtocol(peerStore, peerPings, protocols) + selectPeerForProtocol(peerStore, protocols) ).to.be.rejectedWith( `Failed to find known peer that registers protocols: ${protocols}` ); @@ -230,7 +251,7 @@ describe("selectPeerForProtocol", () => { sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer); await expect( - selectPeerForProtocol(peerStore, peerPings, protocols, targetPeer) + selectPeerForProtocol(peerStore, protocols, targetPeer) ).to.be.rejectedWith( `Peer does not register required protocols (${targetPeer.toString()}): ${protocols}` ); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 0091c47269..3b3d6d14da 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -3,6 +3,8 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import debug from "debug"; +import { bytesToUtf8 } from "../bytes/index.js"; + const log = debug("waku:libp2p-utils"); /** @@ -23,14 +25,17 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined { * @returns The peer with the lowest latency, or undefined if no peer could be reached */ export async function selectLowestLatencyPeer( - peerPings: Map, + peerStore: PeerStore, peers: Peer[] ): Promise { if (peers.length === 0) return; const results = await Promise.all( - peers.map((peer) => { - const ping = peerPings.get(peer.id.toString()) ?? Infinity; + peers.map(async (peer) => { + const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping"); + if (!pingBytes) return { peer, ping: Infinity }; + + const ping = Number(bytesToUtf8(pingBytes)) ?? Infinity; return { peer, ping }; }) ); @@ -69,7 +74,6 @@ export async function getPeersForProtocol( */ export async function selectPeerForProtocol( peerStore: PeerStore, - peerPings: Map, protocols: string[], peerId?: PeerId ): Promise<{ peer: Peer; protocol: string }> { @@ -83,7 +87,7 @@ export async function selectPeerForProtocol( } } else { const peers = await getPeersForProtocol(peerStore, protocols); - peer = await selectLowestLatencyPeer(peerPings, peers); + peer = await selectLowestLatencyPeer(peerStore, peers); if (!peer) { peer = selectRandomPeer(peers); if (!peer) From 8da588aa054732e4709471043eeb22aad2dd5c16 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 23:02:03 +0530 Subject: [PATCH 09/14] improve tests for readability --- packages/tests/tests/utils.spec.ts | 102 +++++++++++++++-------------- 1 file changed, 52 insertions(+), 50 deletions(-) diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 0d6c2a155b..bba22528fe 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -121,12 +121,60 @@ describe.only("selectPeerForProtocol", () => { let peerStore: PeerStore; const protocols = [TestCodec]; + let lowPingPeer: Peer, + midPingPeer: Peer, + highPingPeer: Peer, + differentCodecPeer: Peer, + anotherDifferentCodecPeer: Peer; + beforeEach(async function () { this.timeout(10000); const waku = await createLightNode(); await waku.start(); await delay(3000); peerStore = waku.libp2p.peerStore; + + const [ + lowPingPeerId, + midPingPeerId, + highPingPeerId, + differentCodecPeerId, + anotherDifferentCodecPeerId + ] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId() + ]); + + lowPingPeer = { + id: lowPingPeerId, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("50")) + } as Peer; + + midPingPeer = { + id: midPingPeerId, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("100")) + } as Peer; + + highPingPeer = { + id: highPingPeerId, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("500")) + } as Peer; + + differentCodecPeer = { + id: differentCodecPeerId, + protocols: ["DifferentCodec"] + } as Peer; + + anotherDifferentCodecPeer = { + id: anotherDifferentCodecPeerId, + protocols: ["AnotherDifferentCodec"] + } as Peer; }); afterEach(() => { @@ -134,27 +182,7 @@ describe.only("selectPeerForProtocol", () => { }); it("should return the peer with the lowest ping", async function () { - const peer1 = await createSecp256k1PeerId(); - const peer2 = await createSecp256k1PeerId(); - const peer3 = await createSecp256k1PeerId(); - - const mockPeers = [ - { - id: peer1, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("500")) - }, - { - id: peer2, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("50")) - }, - { - id: peer3, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("100")) - } - ] as Peer[]; + const mockPeers = [highPingPeer, lowPingPeer, midPingPeer]; sinon.stub(peerStore, "get").callsFake(async (peerId) => { return mockPeers.find((peer) => peer.id.equals(peerId))!; @@ -168,7 +196,7 @@ describe.only("selectPeerForProtocol", () => { const result = await selectPeerForProtocol(peerStore, protocols); - expect(result.peer).to.deep.equal(mockPeers[1]); + expect(result.peer).to.deep.equal(lowPingPeer); expect(result.protocol).to.equal(TestCodec); }); @@ -186,27 +214,7 @@ describe.only("selectPeerForProtocol", () => { }); it("should return a random peer when all peers have the same latency", async function () { - const peer1 = await createSecp256k1PeerId(); - const peer2 = await createSecp256k1PeerId(); - const peer3 = await createSecp256k1PeerId(); - - const mockPeers = [ - { - id: peer1, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("500")) - }, - { - id: peer2, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("500")) - }, - { - id: peer3, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("500")) - } - ] as Peer[]; + const mockPeers = [highPingPeer, highPingPeer, highPingPeer]; sinon.stub(peerStore, "get").callsFake(async (peerId) => { return mockPeers.find((peer) => peer.id.equals(peerId))!; @@ -224,13 +232,7 @@ describe.only("selectPeerForProtocol", () => { }); it("should throw an error when no peer matches the given protocols", async function () { - const mockPeers = [ - { id: await createSecp256k1PeerId(), protocols: ["DifferentCodec"] }, - { - id: await createSecp256k1PeerId(), - protocols: ["AnotherDifferentCodec"] - } - ] as Peer[]; + const mockPeers = [differentCodecPeer, anotherDifferentCodecPeer]; sinon.stub(peerStore, "forEach").callsFake(async (callback) => { for (const peer of mockPeers) { From a860d53cf50807300500219d2c073b83000c8bd4 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 23:11:56 +0530 Subject: [PATCH 10/14] move back KeepAliveManager from singleton --- packages/core/src/lib/connection_manager.ts | 7 ++-- packages/core/src/lib/keep_alive_manager.ts | 43 ++------------------- 2 files changed, 7 insertions(+), 43 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 3dc49227ab..30e8f37dc9 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -116,9 +116,8 @@ export class ConnectionManager ...options }; - this.keepAliveManager = KeepAliveManager.createInstance( - this.libp2p.services.ping, - this.libp2p.peerStore, + this.keepAliveManager = new KeepAliveManager( + libp2p.peerStore, keepAliveOptions, relay ); @@ -345,7 +344,7 @@ export class ConnectionManager void (async () => { const peerId = evt.detail; - this.keepAliveManager.start(peerId); + this.keepAliveManager.start(peerId, this.libp2p.services.ping); const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( Tags.BOOTSTRAP diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index b05ba4874b..5810371c90 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -12,57 +12,22 @@ export const RelayPingContentTopic = "/relay-ping/1/ping/null"; const log = debug("waku:keep-alive"); export class KeepAliveManager { - private static instance: KeepAliveManager; - private pingKeepAliveTimers: Map>; private relayKeepAliveTimers: Map>; private options: KeepAliveOptions; private relay?: IRelay; - private libp2pPing: PingService; private peerStore: PeerStore; - private constructor( - libp2pPing: PingService, - peerStore: PeerStore, - options: KeepAliveOptions, - relay?: IRelay - ) { + constructor(peerStore: PeerStore, options: KeepAliveOptions, relay?: IRelay) { this.pingKeepAliveTimers = new Map(); this.relayKeepAliveTimers = new Map(); this.options = options; this.relay = relay; - this.libp2pPing = libp2pPing; this.peerStore = peerStore; } - public static createInstance( - libp2pPing: PingService, - peerStore: PeerStore, - options: KeepAliveOptions, - relay?: IRelay - ): KeepAliveManager { - if (!KeepAliveManager.instance) { - KeepAliveManager.instance = new KeepAliveManager( - libp2pPing, - peerStore, - options, - relay - ); - } - return KeepAliveManager.instance; - } - - public static getInstance(): KeepAliveManager { - if (!KeepAliveManager.instance) { - throw new Error( - "KeepAliveManager not initialized - please use createInstance() first" - ); - } - return KeepAliveManager.instance; - } - - public start(peerId: PeerId): void { - // Just in case a timer already exist for this peer + public start(peerId: PeerId, libp2pPing: PingService): void { + // Just in case a timer already exists for this peer this.stop(peerId); const { pingKeepAlive: pingPeriodSecs, relayKeepAlive: relayPeriodSecs } = @@ -72,7 +37,7 @@ export class KeepAliveManager { if (pingPeriodSecs !== 0) { const interval = setInterval(() => { - this.libp2pPing + libp2pPing .ping(peerId) .then((ping) => { log(`Ping succeeded (${peerIdStr})`, ping); From 51c91cd6ae7a2444342a9f0e08574766d5ec7e2c Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 23:12:10 +0530 Subject: [PATCH 11/14] reenable all tests --- packages/tests/tests/utils.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index bba22528fe..fc31cbc8a1 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -117,7 +117,7 @@ describe("Util: toAsyncIterator: Filter", () => { const TestCodec = "test/1"; -describe.only("selectPeerForProtocol", () => { +describe("selectPeerForProtocol", () => { let peerStore: PeerStore; const protocols = [TestCodec]; From 08f36fff400f3d5974e04873cae3da9324981eaf Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 7 Sep 2023 23:16:57 +0530 Subject: [PATCH 12/14] minor improvements --- packages/core/src/lib/connection_manager.ts | 12 ++++++------ packages/core/src/lib/keep_alive_manager.ts | 14 +++++++++----- packages/utils/src/libp2p/index.ts | 2 +- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 30e8f37dc9..293c5bb11f 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -116,11 +116,7 @@ export class ConnectionManager ...options }; - this.keepAliveManager = new KeepAliveManager( - libp2p.peerStore, - keepAliveOptions, - relay - ); + this.keepAliveManager = new KeepAliveManager(keepAliveOptions, relay); this.run() .then(() => log(`Connection Manager is now running`)) @@ -344,7 +340,11 @@ export class ConnectionManager void (async () => { const peerId = evt.detail; - this.keepAliveManager.start(peerId, this.libp2p.services.ping); + this.keepAliveManager.start( + peerId, + this.libp2p.services.ping, + this.libp2p.peerStore + ); const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( Tags.BOOTSTRAP diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 5810371c90..1103e46487 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -16,17 +16,19 @@ export class KeepAliveManager { private relayKeepAliveTimers: Map>; private options: KeepAliveOptions; private relay?: IRelay; - private peerStore: PeerStore; - constructor(peerStore: PeerStore, options: KeepAliveOptions, relay?: IRelay) { + constructor(options: KeepAliveOptions, relay?: IRelay) { this.pingKeepAliveTimers = new Map(); this.relayKeepAliveTimers = new Map(); this.options = options; this.relay = relay; - this.peerStore = peerStore; } - public start(peerId: PeerId, libp2pPing: PingService): void { + public start( + peerId: PeerId, + libp2pPing: PingService, + peerStore: PeerStore + ): void { // Just in case a timer already exists for this peer this.stop(peerId); @@ -37,11 +39,13 @@ export class KeepAliveManager { if (pingPeriodSecs !== 0) { const interval = setInterval(() => { + // ping the peer for keep alive + // also update the peer store with the latency libp2pPing .ping(peerId) .then((ping) => { log(`Ping succeeded (${peerIdStr})`, ping); - this.peerStore + peerStore .patch(peerId, { metadata: { ping: utf8ToBytes(ping.toString()) diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 3b3d6d14da..6c1019b8b7 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -20,7 +20,7 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined { /** * Returns the peer with the lowest latency. - * @param getPing - A function that returns the latency for a given peer + * @param peerStore - The Libp2p PeerStore * @param peers - The list of peers to choose from * @returns The peer with the lowest latency, or undefined if no peer could be reached */ From ccbe4f09787767a932da07f933ac7e1bd776e057 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Fri, 8 Sep 2023 00:02:40 +0530 Subject: [PATCH 13/14] improve error handling --- packages/utils/src/libp2p/index.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 6c1019b8b7..4f37f15c0f 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -41,6 +41,9 @@ export async function selectLowestLatencyPeer( ); const lowestLatencyResult = results.sort((a, b) => a.ping - b.ping)[0]; + if (!lowestLatencyResult) { + return undefined; + } return lowestLatencyResult.ping !== Infinity ? lowestLatencyResult.peer From e093cfed0c71544093d1dd7f723e028433a8ec89 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Fri, 8 Sep 2023 21:18:07 +0530 Subject: [PATCH 14/14] convert .then() syntax to async/await --- packages/core/src/lib/keep_alive_manager.ts | 28 ++++++++++++--------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index 1103e46487..6227c08257 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -39,24 +39,28 @@ export class KeepAliveManager { if (pingPeriodSecs !== 0) { const interval = setInterval(() => { - // ping the peer for keep alive - // also update the peer store with the latency - libp2pPing - .ping(peerId) - .then((ping) => { + void (async () => { + try { + // ping the peer for keep alive + // also update the peer store with the latency + const ping = await libp2pPing.ping(peerId); log(`Ping succeeded (${peerIdStr})`, ping); - peerStore - .patch(peerId, { + + try { + await peerStore.patch(peerId, { metadata: { ping: utf8ToBytes(ping.toString()) } - }) - .catch((e) => log("Failed to update ping", e)); - }) - .catch((e) => { + }); + } catch (e) { + log("Failed to update ping", e); + } + } catch (e) { log(`Ping failed (${peerIdStr})`, e); - }); + } + })(); }, pingPeriodSecs * 1000); + this.pingKeepAliveTimers.set(peerIdStr, interval); }