Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimistically create streams for light protocols #1514

Closed
wants to merge 12 commits into from
75 changes: 73 additions & 2 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Libp2p } from "@libp2p/interface";
import type { Libp2p, PeerUpdate } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
Expand All @@ -16,17 +16,23 @@ import {
export class BaseProtocol implements IBaseProtocol {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
streamsPool: Map<string, Stream[]> = new Map();

constructor(
public multicodec: string,
private components: Libp2pComponents
private components: Libp2pComponents,
private log: debug.Debugger
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
this.removeLibp2pEventListener = components.events.removeEventListener.bind(
components.events
);

this.addLibp2pEventListener("peer:update", this.updateStreamPool);
// TODO: might be better to check with `connection:close` event
this.addLibp2pEventListener("peer:disconnect", this.removeStreamPool);
}

public get peerStore(): PeerStore {
Expand All @@ -50,6 +56,7 @@ export class BaseProtocol implements IBaseProtocol {
);
return peer;
}

protected async newStream(peer: Peer): Promise<Stream> {
const connections = this.components.connectionManager.getConnections(
peer.id
Expand All @@ -61,4 +68,68 @@ export class BaseProtocol implements IBaseProtocol {

return connection.newStream(this.multicodec);
}

protected async getStream(peer: Peer): Promise<Stream | Promise<Stream>> {
const peerStreams = this.streamsPool.get(peer.id.toString());
if (peerStreams && peerStreams.length > 0) {
//TODO: either reuse the same stream, or pop and replenish the pool
const stream = peerStreams[0];
if (!stream) {
throw new Error("Failed to get a stream from the pool");
}
return stream;
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
}
this.log(
`No stream available for peer ${peer.id.toString()}. Opening a new one.`
);
return this.createStream(peer);
}

private async createStream(peer: Peer): Promise<Stream> {
const peerStreams = this.streamsPool.get(peer.id.toString());
const stream = await this.newStream(peer);
if (peerStreams) {
peerStreams.push(stream);
} else {
this.streamsPool.set(peer.id.toString(), [stream]);
}
return stream;
}

private updateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const peer = evt.detail.peer;
if (peer.protocols.includes(this.multicodec)) {
this.streamsPool.set(peer.id.toString(), []);
this.openAndSaveStream(peer).catch((err) => {
this.log(`error: optimistic stream opening failed: ${err}`);
});
}
};

private async openAndSaveStream(peer: Peer): Promise<void> {
this.log(
`Optimistically opening a stream to peer ${peer.id.toString()} for protocol ${
this.multicodec
}`
);
try {
await this.createStream(peer);
this.log(
`Optimistically opened a stream to peer ${peer.id.toString()} for protocol ${
this.multicodec
}`
);
} catch (error) {
this.log(
`Failed to open a stream to peer ${peer.id.toString()} for protocol ${
this.multicodec
}: ${error}`
);
}
}

private removeStreamPool = (evt: CustomEvent<PeerId>): void => {
const peerId = evt.detail;
this.streamsPool.delete(peerId.toString());
};
}
2 changes: 1 addition & 1 deletion packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterCodecs.PUSH, e);
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
super(LightPushCodec, libp2p.components, log);
this.options = options || {};
}

Expand Down Expand Up @@ -99,7 +99,7 @@ class LightPush extends BaseProtocol implements ILightPush {
}

const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
const stream = await this.getStream(peer);

try {
const res = await pipe(
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Store extends BaseProtocol implements IStore {
options: ProtocolCreateOptions;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
super(StoreCodec, libp2p.components, log);
this.options = options ?? {};
}

Expand Down
2 changes: 1 addition & 1 deletion packages/peer-exchange/src/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
* @param components - libp2p components
*/
constructor(components: Libp2pComponents) {
super(PeerExchangeCodec, components);
super(PeerExchangeCodec, components, log);
this.multicodec = PeerExchangeCodec;
}

Expand Down
Loading