Skip to content

Commit

Permalink
feat: use the lowest latency peer for protocols (#1540)
Browse files Browse the repository at this point in the history
* maintain pings in a hashmap

* convert `KeepAliveManager` into a singleton

* chore: fix an unrelated cyclic dependency error

* update `selectPeerForProtocol` to return peer with the lowest latency

* use the new KeepAliveManager API

* use the new API for `selectPeerForProtocol`

* add tests

* use PeerData to hold the ping instead of a new variable

* improve tests for readability

* move back KeepAliveManager from singleton

* reenable all tests

* minor improvements

* improve error handling

* convert .then() syntax to async/await
  • Loading branch information
danisharora099 authored Sep 8, 2023
1 parent 1cfe0fc commit 6f09fbf
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 18 deletions.
7 changes: 6 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,11 @@ export class ConnectionManager
void (async () => {
const peerId = evt.detail;

this.keepAliveManager.start(peerId, this.libp2p.services.ping);
this.keepAliveManager.start(
peerId,
this.libp2p.services.ping,
this.libp2p.peerStore
);

const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
Expand Down
36 changes: 30 additions & 6 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { IRelay } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug";
import type { PingService } from "libp2p/ping";

import { createEncoder } from "../index.js";
import { createEncoder } from "./message/version_0.js";

export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = debug("waku:keep-alive");
Expand All @@ -22,8 +24,12 @@ export class KeepAliveManager {
this.relay = relay;
}

public start(peerId: PeerId, libp2pPing: PingService): void {
// Just in case a timer already exist for this peer
public start(
peerId: PeerId,
libp2pPing: PingService,
peerStore: PeerStore
): void {
// Just in case a timer already exists for this peer
this.stop(peerId);

const { pingKeepAlive: pingPeriodSecs, relayKeepAlive: relayPeriodSecs } =
Expand All @@ -33,10 +39,28 @@ export class KeepAliveManager {

if (pingPeriodSecs !== 0) {
const interval = setInterval(() => {
libp2pPing.ping(peerId).catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
void (async () => {
try {
// ping the peer for keep alive
// also update the peer store with the latency
const ping = await libp2pPing.ping(peerId);
log(`Ping succeeded (${peerIdStr})`, ping);

try {
await peerStore.patch(peerId, {
metadata: {
ping: utf8ToBytes(ping.toString())
}
});
} catch (e) {
log("Failed to update ping", e);
}
} catch (e) {
log(`Ping failed (${peerIdStr})`, e);
}
})();
}, pingPeriodSecs * 1000);

this.pingKeepAliveTimers.set(peerIdStr, interval);
}

Expand Down
7 changes: 4 additions & 3 deletions packages/tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@waku/interfaces": "*",
"@waku/utils": "*",
"app-root-path": "^3.1.0",
"chai-as-promised": "^7.1.1",
"debug": "^4.3.4",
"dockerode": "^3.3.5",
"p-timeout": "^6.1.0",
Expand All @@ -66,20 +67,20 @@
},
"devDependencies": {
"@libp2p/bootstrap": "^9.0.2",
"@types/sinon": "^10.0.16",
"@types/chai": "^4.3.5",
"@types/dockerode": "^3.3.19",
"@types/mocha": "^10.0.1",
"@types/sinon": "^10.0.16",
"@types/tail": "^2.2.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^6.6.0",
"@waku/sdk": "*",
"@waku/dns-discovery": "*",
"@waku/message-encryption": "*",
"@waku/peer-exchange": "*",
"@waku/sdk": "*",
"chai": "^4.3.7",
"datastore-core": "^9.2.2",
"cspell": "^7.3.2",
"datastore-core": "^9.2.2",
"debug": "^4.3.4",
"interface-datastore": "^8.2.3",
"libp2p": "^0.46.9",
Expand Down
157 changes: 155 additions & 2 deletions packages/tests/tests/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import {
createDecoder,
createEncoder,
Expand All @@ -9,11 +12,16 @@ import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { selectPeerForProtocol } from "@waku/utils/libp2p";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import sinon from "sinon";

import { makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { NimGoNode } from "../src/node/node.js";

chai.use(chaiAsPromised);

const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
Expand Down Expand Up @@ -106,3 +114,148 @@ describe("Util: toAsyncIterator: Filter", () => {
expect(result.done).to.eq(true);
});
});

const TestCodec = "test/1";

describe("selectPeerForProtocol", () => {
let peerStore: PeerStore;
const protocols = [TestCodec];

let lowPingPeer: Peer,
midPingPeer: Peer,
highPingPeer: Peer,
differentCodecPeer: Peer,
anotherDifferentCodecPeer: Peer;

beforeEach(async function () {
this.timeout(10000);
const waku = await createLightNode();
await waku.start();
await delay(3000);
peerStore = waku.libp2p.peerStore;

const [
lowPingPeerId,
midPingPeerId,
highPingPeerId,
differentCodecPeerId,
anotherDifferentCodecPeerId
] = await Promise.all([
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId()
]);

lowPingPeer = {
id: lowPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("50"))
} as Peer;

midPingPeer = {
id: midPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("100"))
} as Peer;

highPingPeer = {
id: highPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("500"))
} as Peer;

differentCodecPeer = {
id: differentCodecPeerId,
protocols: ["DifferentCodec"]
} as Peer;

anotherDifferentCodecPeer = {
id: anotherDifferentCodecPeerId,
protocols: ["AnotherDifferentCodec"]
} as Peer;
});

afterEach(() => {
sinon.restore();
});

it("should return the peer with the lowest ping", async function () {
const mockPeers = [highPingPeer, lowPingPeer, midPingPeer];

sinon.stub(peerStore, "get").callsFake(async (peerId) => {
return mockPeers.find((peer) => peer.id.equals(peerId))!;
});

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

const result = await selectPeerForProtocol(peerStore, protocols);

expect(result.peer).to.deep.equal(lowPingPeer);
expect(result.protocol).to.equal(TestCodec);
});

it("should return the peer with the provided peerId", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

const result = await selectPeerForProtocol(
peerStore,
protocols,
targetPeer
);
expect(result.peer).to.deep.equal(mockPeer);
});

it("should return a random peer when all peers have the same latency", async function () {
const mockPeers = [highPingPeer, highPingPeer, highPingPeer];

sinon.stub(peerStore, "get").callsFake(async (peerId) => {
return mockPeers.find((peer) => peer.id.equals(peerId))!;
});

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

const result = await selectPeerForProtocol(peerStore, protocols);

expect(mockPeers).to.deep.include(result.peer);
});

it("should throw an error when no peer matches the given protocols", async function () {
const mockPeers = [differentCodecPeer, anotherDifferentCodecPeer];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

await expect(
selectPeerForProtocol(peerStore, protocols)
).to.be.rejectedWith(
`Failed to find known peer that registers protocols: ${protocols}`
);
});

it("should throw an error when the selected peer does not register the required protocols", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

await expect(
selectPeerForProtocol(peerStore, protocols, targetPeer)
).to.be.rejectedWith(
`Peer does not register required protocols (${targetPeer.toString()}): ${protocols}`
);
});
});
Loading

0 comments on commit 6f09fbf

Please sign in to comment.