Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use the lowest latency peer for protocols #1540

Merged
merged 15 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
private options: ConnectionManagerOptions;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Check warning on line 34 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 34 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

private currentActiveDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
Expand Down Expand Up @@ -197,7 +197,7 @@
// Handle generic error
log(
`Error dialing peer ${peerId.toString()} - ${
(error as any).message

Check warning on line 200 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type

Check warning on line 200 in packages/core/src/lib/connection_manager.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type
}`
);
}
Expand Down Expand Up @@ -340,7 +340,11 @@
void (async () => {
const peerId = evt.detail;

this.keepAliveManager.start(peerId, this.libp2p.services.ping);
this.keepAliveManager.start(
peerId,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can simplify and pass ‘libp2p’ directly

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest passing ping and peerStore individually to KeepAliveManager instead of the entire libp2p object because:

  • by passing only what's needed, it's clear what KeepAliveManager depends on. This makes our code more readable and maintainable.
  • this approach reduces the dependency on the entire libp2p structure. If libp2p evolves, it minimizes the ripple effect on KeepAliveManager.
  • it follows best practices like the Single Responsibility and Interface Segregation Principles

Let me know your thoughts!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SOLID is not applied here because it is more about how entity/object should be designed and not what it should be provided with. You might need thousand dependencies to do single thing and it is up to devs how easier/cleaner they should be provided.

My rule of thumb for dependencies - it's easier to grow the number of entities than methods.

And again, it is a nit, so feel free to keep the way you think reasonable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understanding it's a nit, glad we're discussing to be on similar pages around these philosophies if we're maintaining big codebases :)

I understand that SOLID primarily focuses on the design and structure of classes. However, the Dependency Inversion Principle directly addresses how dependencies should be managed -- I interpret it as our classes are decoupled and that we're injecting only the necessary dependencies for this use case
Perhaps you have a different interpretation?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's exactly about necessary dependencies, and the necessary dependency here is libp2p and not its methods.

this.libp2p.services.ping,
this.libp2p.peerStore
);

const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
Expand Down
32 changes: 26 additions & 6 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { IRelay } from "@waku/interfaces";
import type { KeepAliveOptions } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug";
import type { PingService } from "libp2p/ping";

import { createEncoder } from "../index.js";
import { createEncoder } from "./message/version_0.js";

export const RelayPingContentTopic = "/relay-ping/1/ping/null";
const log = debug("waku:keep-alive");
Expand All @@ -22,8 +24,12 @@ export class KeepAliveManager {
this.relay = relay;
}

public start(peerId: PeerId, libp2pPing: PingService): void {
// Just in case a timer already exist for this peer
public start(
peerId: PeerId,
libp2pPing: PingService,
peerStore: PeerStore
): void {
// Just in case a timer already exists for this peer
this.stop(peerId);

const { pingKeepAlive: pingPeriodSecs, relayKeepAlive: relayPeriodSecs } =
Expand All @@ -33,9 +39,23 @@ export class KeepAliveManager {

if (pingPeriodSecs !== 0) {
const interval = setInterval(() => {
libp2pPing.ping(peerId).catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
// ping the peer for keep alive
// also update the peer store with the latency
libp2pPing
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
.ping(peerId)
.then((ping) => {
log(`Ping succeeded (${peerIdStr})`, ping);
peerStore
.patch(peerId, {
metadata: {
ping: utf8ToBytes(ping.toString())
}
})
.catch((e) => log("Failed to update ping", e));
})
.catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
}, pingPeriodSecs * 1000);
this.pingKeepAliveTimers.set(peerIdStr, interval);
}
Expand Down
7 changes: 4 additions & 3 deletions packages/tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"@waku/interfaces": "*",
"@waku/utils": "*",
"app-root-path": "^3.1.0",
"chai-as-promised": "^7.1.1",
"debug": "^4.3.4",
"dockerode": "^3.3.5",
"p-timeout": "^6.1.0",
Expand All @@ -66,20 +67,20 @@
},
"devDependencies": {
"@libp2p/bootstrap": "^9.0.2",
"@types/sinon": "^10.0.16",
"@types/chai": "^4.3.5",
"@types/dockerode": "^3.3.19",
"@types/mocha": "^10.0.1",
"@types/sinon": "^10.0.16",
"@types/tail": "^2.2.1",
"@typescript-eslint/eslint-plugin": "^5.57.0",
"@typescript-eslint/parser": "^6.6.0",
"@waku/sdk": "*",
"@waku/dns-discovery": "*",
"@waku/message-encryption": "*",
"@waku/peer-exchange": "*",
"@waku/sdk": "*",
"chai": "^4.3.7",
"datastore-core": "^9.2.2",
"cspell": "^7.3.2",
"datastore-core": "^9.2.2",
"debug": "^4.3.4",
"interface-datastore": "^8.2.3",
"libp2p": "^0.46.8",
Expand Down
157 changes: 155 additions & 2 deletions packages/tests/tests/utils.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import type { PeerStore } from "@libp2p/interface/peer-store";
import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import {
createDecoder,
createEncoder,
Expand All @@ -9,11 +12,16 @@ import { Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { toAsyncIterator } from "@waku/utils";
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
import { expect } from "chai";
import { selectPeerForProtocol } from "@waku/utils/libp2p";
import chai, { expect } from "chai";
import chaiAsPromised from "chai-as-promised";
import sinon from "sinon";

import { makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
import { NimGoNode } from "../src/node/node.js";

chai.use(chaiAsPromised);

const TestContentTopic = "/test/1/waku-filter";
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
const TestDecoder = createDecoder(TestContentTopic);
Expand Down Expand Up @@ -106,3 +114,148 @@ describe("Util: toAsyncIterator: Filter", () => {
expect(result.done).to.eq(true);
});
});

const TestCodec = "test/1";

describe("selectPeerForProtocol", () => {
let peerStore: PeerStore;
const protocols = [TestCodec];

let lowPingPeer: Peer,
midPingPeer: Peer,
highPingPeer: Peer,
differentCodecPeer: Peer,
anotherDifferentCodecPeer: Peer;

beforeEach(async function () {
this.timeout(10000);
const waku = await createLightNode();
await waku.start();
await delay(3000);
peerStore = waku.libp2p.peerStore;

const [
lowPingPeerId,
midPingPeerId,
highPingPeerId,
differentCodecPeerId,
anotherDifferentCodecPeerId
] = await Promise.all([
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId(),
createSecp256k1PeerId()
]);

lowPingPeer = {
id: lowPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("50"))
} as Peer;

midPingPeer = {
id: midPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("100"))
} as Peer;

highPingPeer = {
id: highPingPeerId,
protocols: [TestCodec],
metadata: new Map().set("ping", utf8ToBytes("500"))
} as Peer;

differentCodecPeer = {
id: differentCodecPeerId,
protocols: ["DifferentCodec"]
} as Peer;

anotherDifferentCodecPeer = {
id: anotherDifferentCodecPeerId,
protocols: ["AnotherDifferentCodec"]
} as Peer;
});

afterEach(() => {
sinon.restore();
});

it("should return the peer with the lowest ping", async function () {
const mockPeers = [highPingPeer, lowPingPeer, midPingPeer];

sinon.stub(peerStore, "get").callsFake(async (peerId) => {
return mockPeers.find((peer) => peer.id.equals(peerId))!;
});

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

const result = await selectPeerForProtocol(peerStore, protocols);

expect(result.peer).to.deep.equal(lowPingPeer);
expect(result.protocol).to.equal(TestCodec);
});

it("should return the peer with the provided peerId", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

const result = await selectPeerForProtocol(
peerStore,
protocols,
targetPeer
);
expect(result.peer).to.deep.equal(mockPeer);
});

it("should return a random peer when all peers have the same latency", async function () {
const mockPeers = [highPingPeer, highPingPeer, highPingPeer];

sinon.stub(peerStore, "get").callsFake(async (peerId) => {
return mockPeers.find((peer) => peer.id.equals(peerId))!;
});

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

const result = await selectPeerForProtocol(peerStore, protocols);

expect(mockPeers).to.deep.include(result.peer);
});

it("should throw an error when no peer matches the given protocols", async function () {
const mockPeers = [differentCodecPeer, anotherDifferentCodecPeer];

sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
for (const peer of mockPeers) {
callback(peer);
}
});

await expect(
selectPeerForProtocol(peerStore, protocols)
).to.be.rejectedWith(
`Failed to find known peer that registers protocols: ${protocols}`
);
});

it("should throw an error when the selected peer does not register the required protocols", async function () {
const targetPeer = await createSecp256k1PeerId();
const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer;
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);

await expect(
selectPeerForProtocol(peerStore, protocols, targetPeer)
).to.be.rejectedWith(
`Peer does not register required protocols (${targetPeer.toString()}): ${protocols}`
);
});
});
49 changes: 44 additions & 5 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import type { PeerId } from "@libp2p/interface/peer-id";
import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import debug from "debug";

import { bytesToUtf8 } from "../bytes/index.js";

const log = debug("waku:libp2p-utils");

/**
Expand All @@ -16,6 +18,35 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined {
return peers[index];
}

/**
* Returns the peer with the lowest latency.
* @param peerStore - The Libp2p PeerStore
* @param peers - The list of peers to choose from
* @returns The peer with the lowest latency, or undefined if no peer could be reached
*/
export async function selectLowestLatencyPeer(
peerStore: PeerStore,
peers: Peer[]
): Promise<Peer | undefined> {
if (peers.length === 0) return;

const results = await Promise.all(
weboko marked this conversation as resolved.
Show resolved Hide resolved
peers.map(async (peer) => {
const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping");
if (!pingBytes) return { peer, ping: Infinity };

const ping = Number(bytesToUtf8(pingBytes)) ?? Infinity;
return { peer, ping };
})
);

const lowestLatencyResult = results.sort((a, b) => a.ping - b.ping)[0];

return lowestLatencyResult.ping !== Infinity
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
? lowestLatencyResult.peer
: undefined;
}

/**
* Returns the list of peers that supports the given protocol.
*/
Expand All @@ -35,12 +66,18 @@ export async function getPeersForProtocol(
return peers;
}

/**
* Returns a peer that supports the given protocol.
* If peerId is provided, the peer with that id is returned.
* Otherwise, the peer with the lowest latency is returned.
* If no peer is found from the above criteria, a random peer is returned.
*/
export async function selectPeerForProtocol(
peerStore: PeerStore,
protocols: string[],
peerId?: PeerId
): Promise<{ peer: Peer; protocol: string }> {
let peer;
let peer: Peer | undefined;
if (peerId) {
peer = await peerStore.get(peerId);
if (!peer) {
Expand All @@ -50,11 +87,13 @@ export async function selectPeerForProtocol(
}
} else {
const peers = await getPeersForProtocol(peerStore, protocols);
peer = selectRandomPeer(peers);
peer = await selectLowestLatencyPeer(peerStore, peers);
if (!peer) {
throw new Error(
`Failed to find known peer that registers protocols: ${protocols}`
);
peer = selectRandomPeer(peers);
if (!peer)
throw new Error(
`Failed to find known peer that registers protocols: ${protocols}`
);
}
}

Expand Down
Loading