Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pre-emptive stream creation for protocols #1516

Merged
merged 21 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8b36e45
pass log as an arg to baseprotocol
danisharora099 Aug 30, 2023
2655998
optimistically create and use streams for light protocols
danisharora099 Aug 30, 2023
2bf0c97
refactor BaseProtocol for readability
danisharora099 Aug 30, 2023
f2dc0a4
use optimistic stream selection in protocols
danisharora099 Aug 30, 2023
d4d9eee
use a new stream for every request instead of reusing
danisharora099 Aug 30, 2023
c269e6d
replenish streams correctly
danisharora099 Aug 30, 2023
d644d18
create StreamManager
danisharora099 Aug 30, 2023
925b4d4
refactor for a single stream
danisharora099 Aug 30, 2023
9896fc0
fix: listener binds
danisharora099 Aug 30, 2023
1b46c0a
declare streamManager as a class var isntead of extending
danisharora099 Aug 31, 2023
6497428
remove stream destruction as it happens by default
danisharora099 Aug 31, 2023
f833caa
simplify logic & address comments
danisharora099 Aug 31, 2023
15555f9
fix: bind typo
danisharora099 Aug 31, 2023
f9689ff
refactor for improvements
danisharora099 Aug 31, 2023
b321596
fix typedoc
danisharora099 Aug 31, 2023
8ca3d7a
rm: lock
danisharora099 Aug 31, 2023
ff5b078
restructure StreamManager for readbility
danisharora099 Aug 31, 2023
0c77582
Merge branch 'master' into feat/optimistic-streams-single
danisharora099 Aug 31, 2023
a92b71e
remove log as an arg
danisharora099 Aug 31, 2023
b921cfb
use newStream as a facade in BaseProtoocl
danisharora099 Sep 1, 2023
379dcda
Merge branch 'master' into feat/optimistic-streams-single
danisharora099 Sep 1, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { ConnectionManager } from "./lib/connection_manager.js";

export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager.js";
34 changes: 16 additions & 18 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import type { Libp2p } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol
} from "@waku/utils/libp2p";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
import debug from "debug";

import { StreamManager } from "./stream_manager.js";

/**
* A class with predefined helpers, to be used as a base to implement Waku
Expand All @@ -16,17 +14,28 @@ import {
export class BaseProtocol implements IBaseProtocol {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
protected streamManager: StreamManager;

constructor(
public multicodec: string,
private components: Libp2pComponents
private components: Libp2pComponents,
log: debug.Debugger
) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
this.removeLibp2pEventListener = components.events.removeEventListener.bind(
components.events
);

this.streamManager = new StreamManager(
multicodec,
components.connectionManager.getConnections.bind(
components.connectionManager
),
this.addLibp2pEventListener,
log
);
}

public get peerStore(): PeerStore {
Expand All @@ -50,15 +59,4 @@ export class BaseProtocol implements IBaseProtocol {
);
return peer;
}
protected async newStream(peer: Peer): Promise<Stream> {
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}

return connection.newStream(this.multicodec);
}
}
8 changes: 6 additions & 2 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class Filter extends BaseProtocol implements IReceiver {
}

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);
super(FilterCodecs.SUBSCRIBE, libp2p.components, log);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -271,7 +271,11 @@ class Filter extends BaseProtocol implements IReceiver {
this.setActiveSubscription(
_pubSubTopic,
peer.id.toString(),
new Subscription(_pubSubTopic, peer, this.newStream.bind(this, peer))
new Subscription(
_pubSubTopic,
peer,
this.streamManager.getStream.bind(this, peer)
)
);

return subscription;
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
super(LightPushCodec, libp2p.components, log);
this.options = options || {};
}

Expand Down Expand Up @@ -103,7 +103,7 @@ class LightPush extends BaseProtocol implements ILightPush {

let error: undefined | SendError = undefined;
const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
const stream = await this.streamManager.getStream(peer);

try {
const res = await pipe(
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class Store extends BaseProtocol implements IStore {
options: ProtocolCreateOptions;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
super(StoreCodec, libp2p.components, log);
this.options = options ?? {};
}

Expand Down Expand Up @@ -254,7 +254,7 @@ class Store extends BaseProtocol implements IStore {
const peer = await this.getPeer(options?.peerId);

for await (const messages of paginate<T>(
this.newStream.bind(this, peer),
this.streamManager.getStream.bind(this, peer),
queryOpts,
decodersAsMap,
options?.cursor
Expand Down
68 changes: 68 additions & 0 deletions packages/core/src/lib/stream_manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type { PeerUpdate } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import { Peer } from "@libp2p/interface/peer-store";
import { Libp2p } from "@waku/interfaces";
import { selectConnection } from "@waku/utils/libp2p";
import debug from "debug";

export class StreamManager {
private streamPool: Map<string, Promise<Stream>>;

constructor(
public multicodec: string,
public getConnections: Libp2p["getConnections"],
public addEventListener: Libp2p["addEventListener"],
private log: debug.Debugger
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
) {
this.addEventListener(
"peer:update",
this.handlePeerUpdateStreamPool.bind(this)
);
this.getStream = this.getStream.bind(this);
this.streamPool = new Map();
}

public async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();
const streamPromise = this.streamPool.get(peerIdStr);
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved

if (!streamPromise) {
return this.newStream(peer); // fallback by creating a new stream on the spot
}

// We have the stream, let's remove it from the map
this.streamPool.delete(peerIdStr);

this.prepareNewStream(peer);

const stream = await streamPromise;

if (stream.status === "closed") {
return this.newStream(peer); // fallback by creating a new stream on the spot
}

return stream;
}

private async newStream(peer: Peer): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);
}

private prepareNewStream(peer: Peer): void {
const streamPromise = this.newStream(peer);
this.streamPool.set(peer.id.toString(), streamPromise);
}

private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const peer = evt.detail.peer;
if (peer.protocols.includes(this.multicodec)) {
this.log(`Optimistically opening a stream to ${peer.id.toString()}`);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-emptively

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.prepareNewStream(peer);
}
};
}
4 changes: 2 additions & 2 deletions packages/peer-exchange/src/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
* @param components - libp2p components
*/
constructor(components: Libp2pComponents) {
super(PeerExchangeCodec, components);
super(PeerExchangeCodec, components, log);
this.multicodec = PeerExchangeCodec;
}

Expand All @@ -47,7 +47,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {

const peer = await this.getPeer(params.peerId);

const stream = await this.newStream(peer);
const stream = await this.streamManager.getStream(peer);

const res = await pipe(
[rpcQuery.encode()],
Expand Down
Loading