Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimistically create streams for light protocols #1514

Closed
wants to merge 12 commits into from
44 changes: 25 additions & 19 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,49 @@
import type { Libp2p } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol
} from "@waku/utils/libp2p";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
import debug from "debug";

import { StreamManager } from "./stream_manager.js";

/**
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
*/
export class BaseProtocol implements IBaseProtocol {
export class BaseProtocol extends StreamManager implements IBaseProtocol {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];

constructor(
public multicodec: string,
private components: Libp2pComponents
private components: Libp2pComponents,
log: debug.Debugger
) {
super(
multicodec,
components.connectionManager.getConnections.bind(
components.connectionManager
),
log
);

this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
this.removeLibp2pEventListener = components.events.removeEventListener.bind(
components.events
);

this.addLibp2pEventListener(
"peer:update",
super.handlePeerUpdateStreamPool.bind(this)
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
);
// TODO: might be better to check with `connection:close` event
this.addLibp2pEventListener(
"peer:disconnect",
super.handlePeerDisconnectStreamPool.bind(this)
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
);
}

public get peerStore(): PeerStore {
Expand All @@ -50,15 +67,4 @@ export class BaseProtocol implements IBaseProtocol {
);
return peer;
}
protected async newStream(peer: Peer): Promise<Stream> {
const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}

return connection.newStream(this.multicodec);
}
}
4 changes: 2 additions & 2 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -271,7 +271,7 @@ class Filter extends BaseProtocol implements IReceiver {
this.setActiveSubscription(
_pubSubTopic,
peer.id.toString(),
new Subscription(_pubSubTopic, peer, this.newStream.bind(this, peer))
new Subscription(_pubSubTopic, peer, this.getStream.bind(this, peer))
);

return subscription;
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
super(LightPushCodec, libp2p.components, log);
this.options = options || {};
}

Expand Down Expand Up @@ -99,7 +99,7 @@ class LightPush extends BaseProtocol implements ILightPush {
}

const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
const stream = await this.getStream(peer);

try {
const res = await pipe(
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Store extends BaseProtocol implements IStore {
options: ProtocolCreateOptions;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
super(StoreCodec, libp2p.components, log);
this.options = options ?? {};
}

Expand Down Expand Up @@ -254,7 +254,7 @@ class Store extends BaseProtocol implements IStore {
const peer = await this.getPeer(options?.peerId);

for await (const messages of paginate<T>(
this.newStream.bind(this, peer),
this.getStream.bind(this, peer),
queryOpts,
decodersAsMap,
options?.cursor
Expand Down
126 changes: 126 additions & 0 deletions packages/core/src/lib/stream_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import type { PeerUpdate } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer } from "@libp2p/interface/peer-store";
import { Libp2p } from "@waku/interfaces";
import { selectConnection } from "@waku/utils/libp2p";
import debug from "debug";

export class StreamManager {
private streamsPool: Map<string, Stream[]> = new Map();
private ongoingStreamCreations: Map<string, Promise<Stream>> = new Map();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make it simpler by not having a "pool" of stream, but just one stream.

Also, once you created a stream, I'd just store the Promise<Stream> and that's it. and just return this promise

private readonly MAX_STREAMS_PER_PEER = 3;
private readonly MAX_RETRIES = 3;

constructor(
public multicodec: string,
public getConnections: Libp2p["getConnections"],
private log: debug.Debugger
) {}

private async newStream(
peer: Peer,
retries = this.MAX_RETRIES
): Promise<Stream> {
try {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);
} catch (error) {
if (retries > 0) {
this.log(`Retrying stream creation. Retries left: ${retries}`);
return this.newStream(peer, retries - 1);
}
throw error;
}
}

protected async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();

const ongoingCreation = this.ongoingStreamCreations.get(peerIdStr);
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
if (ongoingCreation) {
// Wait for the ongoing stream creation to complete
await ongoingCreation;
}

const peerStreams = this.streamsPool.get(peerIdStr) || [];
if (peerStreams.length > 0) {
const stream = peerStreams.pop();
if (!stream) {
throw new Error("Failed to get a stream from the pool");
}
this.replenishStreamPool(peer);
return stream;
}

this.log(`No stream available for peer ${peerIdStr}. Opening a new one.`);
return this.createAndSaveStream(peer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you dont need to "create and save", you just need to return a fresh new stream

}

private async createAndSaveStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();

const streamCreationPromise = (async () => {
const stream = await this.newStream(peer);
const peerStreams = this.streamsPool.get(peerIdStr) || [];
peerStreams.push(stream);
Comment on lines +69 to +70
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am pretty sure this could be your race condition. Not 100% sure how it work with the JavaScript event loop.
Maybe @weboko can advise.

In any case, I would not have a array of stream, just have 1

this.streamsPool.set(peerIdStr, peerStreams);
return stream;
})();

this.ongoingStreamCreations.set(peerIdStr, streamCreationPromise);

try {
return await streamCreationPromise;
} finally {
this.ongoingStreamCreations.delete(peerIdStr);
}
}

private replenishStreamPool(peer: Peer): void {
const peerIdStr = peer.id.toString();
const ongoingCreationsCount = this.ongoingStreamCreations.has(peerIdStr)
? 1
: 0;
const availableStreamsCount = (this.streamsPool.get(peerIdStr) || [])
.length;

if (
ongoingCreationsCount + availableStreamsCount <
this.MAX_STREAMS_PER_PEER
) {
this.createAndSaveStream(peer)
.then(() => {
this.log(`Replenished stream pool for peer ${peer.id.toString()}`);
})
.catch((err) => {
this.log(
`Error replenishing stream pool for peer ${peer.id.toString()}: ${err}`
);
});
}
}

protected handlePeerUpdateStreamPool(evt: CustomEvent<PeerUpdate>): void {
const peer = evt.detail.peer;
if (peer.protocols.includes(this.multicodec)) {
const peerIdStr = peer.id.toString();
if (!this.streamsPool.has(peerIdStr)) {
this.streamsPool.set(peerIdStr, []);
}
this.log(`Optimistically opening a stream to ${peer.id.toString()}`);
this.replenishStreamPool(peer);
}
}

protected handlePeerDisconnectStreamPool(evt: CustomEvent<PeerId>): void {
const peerId = evt.detail;
this.streamsPool.delete(peerId.toString());
// Cancel ongoing stream creation if any
this.ongoingStreamCreations.delete(peerId.toString());
}
}
4 changes: 2 additions & 2 deletions packages/peer-exchange/src/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
* @param components - libp2p components
*/
constructor(components: Libp2pComponents) {
super(PeerExchangeCodec, components);
super(PeerExchangeCodec, components, log);
this.multicodec = PeerExchangeCodec;
}

Expand All @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {

const peer = await this.getPeer(params.peerId);

const stream = await this.newStream(peer);
const stream = await this.getStream(peer);

const res = await pipe(
[rpcQuery.encode()],
Expand Down
Loading