Skip to content

Commit

Permalink
re-implement peerManager, remove ProtocolUseOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko committed Oct 23, 2024
1 parent 7ae3b91 commit d0c6905
Show file tree
Hide file tree
Showing 12 changed files with 104 additions and 289 deletions.
1 change: 1 addition & 0 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
}
}

// TODO(weboko): use peer.id as parameter instead
public async send(
encoder: IEncoder,
message: IMessage,
Expand Down
2 changes: 0 additions & 2 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import type {
Callback,
IBaseProtocolCore,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";
Expand Down Expand Up @@ -40,7 +39,6 @@ export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult>;
};
Expand Down
15 changes: 0 additions & 15 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,6 @@ export type IBaseProtocolCore = {

export type NetworkConfig = StaticSharding | AutoSharding;

//TODO: merge this with ProtocolCreateOptions or establish distinction: https://github.com/waku-org/js-waku/issues/2048
/**
* Options for using LightPush and Filter
*/
export type ProtocolUseOptions = {
/**
* Optional flag to force using all available peers
*/
forceUseAllPeers?: boolean;
/**
* Optional maximum number of attempts for exponential backoff
*/
maxAttempts?: number;
};

export type ProtocolCreateOptions = {
/**
* Configuration for determining the network in use.
Expand Down
1 change: 0 additions & 1 deletion packages/sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
"@waku/proto": "^0.0.8",
"@waku/utils": "0.0.21",
"@waku/message-hash": "0.1.17",
"async-mutex": "^0.5.0",
"libp2p": "^1.8.1"
},
"devDependencies": {
Expand Down
27 changes: 7 additions & 20 deletions packages/sdk/src/protocols/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import {
type Libp2p,
NetworkConfig,
ProtocolError,
type ProtocolUseOptions,
type PubsubTopic,
type SubscribeOptions,
SubscribeResult,
Expand Down Expand Up @@ -67,7 +66,6 @@ class Filter implements IFilter {
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
Expand Down Expand Up @@ -103,7 +101,6 @@ class Filter implements IFilter {
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
Expand All @@ -118,10 +115,7 @@ class Filter implements IFilter {

const pubsubTopic = uniquePubsubTopics[0];

const { subscription, error } = await this.createSubscription(
pubsubTopic,
protocolUseOptions
);
const { subscription, error } = await this.createSubscription(pubsubTopic);

if (error) {
return {
Expand Down Expand Up @@ -153,32 +147,26 @@ class Filter implements IFilter {
* @returns The subscription object.
*/
private async createSubscription(
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
options?: ProtocolUseOptions
pubsubTopicShardInfo: NetworkConfig | PubsubTopic
): Promise<CreateSubscriptionResult> {
options = {
autoRetry: true,
...options
} as ProtocolUseOptions;

const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];

ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);

const hasPeers = await this.peerManager.hasPeersWithMaintain(options);
if (!hasPeers) {
const peers = await this.peerManager.getPeers();
if (peers.length === 0) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
}

log.info(
`Creating filter subscription with ${this.peerManager.connectedPeers.length} peers: `,
this.peerManager.connectedPeers.map((peer) => peer.id.toString())
`Creating filter subscription with ${peers.length} peers: `,
peers.map((peer) => peer.id.toString())
);

const subscription =
Expand All @@ -189,8 +177,7 @@ class Filter implements IFilter {
pubsubTopic,
this.protocol,
this.connectionManager,
() => this.peerManager.connectedPeers,
this.peerManager.renewPeer.bind(this),
this.peerManager,
this.libp2p,
this.lightPush
)
Expand Down
27 changes: 15 additions & 12 deletions packages/sdk/src/protocols/filter/subscription_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { groupByContentTopic, Logger } from "@waku/utils";

import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js";
import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js";
import { PeerManager } from "../peer_manager.js";

import {
DEFAULT_KEEP_ALIVE,
Expand Down Expand Up @@ -57,10 +58,7 @@ export class SubscriptionManager implements ISubscription {
private readonly pubsubTopic: PubsubTopic,
private readonly protocol: FilterCore,
private readonly connectionManager: ConnectionManager,
private readonly getPeers: () => Peer[],
private readonly renewPeer: (
peerToDisconnect: PeerId
) => Promise<Peer | undefined>,
private readonly peerManager: PeerManager,
private readonly libp2p: Libp2p,
private readonly lightPush?: ILightPush
) {
Expand All @@ -69,11 +67,9 @@ export class SubscriptionManager implements ISubscription {

this.reliabilityMonitor = ReliabilityMonitorManager.createReceiverMonitor(
this.pubsubTopic,
this.getPeers.bind(this),
this.renewPeer.bind(this),
this.peerManager,
() => Array.from(this.subscriptionCallbacks.keys()),
this.protocol.subscribe.bind(this.protocol),
this.protocol.addLibp2pEventListener.bind(this.protocol),
this.sendLightPushCheckMessage.bind(this)
);
}
Expand Down Expand Up @@ -116,7 +112,8 @@ export class SubscriptionManager implements ISubscription {
const decodersGroupedByCT = groupByContentTopic(decodersArray);
const contentTopics = Array.from(decodersGroupedByCT.keys());

const promises = this.getPeers().map(async (peer) =>
const peers = await this.peerManager.getPeers();
const promises = peers.map(async (peer) =>
this.subscribeWithPeerVerification(peer, contentTopics)
);

Expand Down Expand Up @@ -153,7 +150,8 @@ export class SubscriptionManager implements ISubscription {
public async unsubscribe(
contentTopics: ContentTopic[]
): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) => {
const peers = await this.peerManager.getPeers();
const promises = peers.map(async (peer) => {
const response = await this.protocol.unsubscribe(
this.pubsubTopic,
peer,
Expand All @@ -179,7 +177,9 @@ export class SubscriptionManager implements ISubscription {

public async ping(peerId?: PeerId): Promise<SDKProtocolResult> {
log.info("Sending keep-alive ping");
const peers = peerId ? [peerId] : this.getPeers().map((peer) => peer.id);
const peers = peerId
? [peerId]
: (await this.peerManager.getPeers()).map((peer) => peer.id);

const promises = peers.map((peerId) => this.pingSpecificPeer(peerId));
const results = await Promise.allSettled(promises);
Expand All @@ -188,7 +188,8 @@ export class SubscriptionManager implements ISubscription {
}

public async unsubscribeAll(): Promise<SDKProtocolResult> {
const promises = this.getPeers().map(async (peer) =>
const peers = await this.peerManager.getPeers();
const promises = peers.map(async (peer) =>
this.protocol.unsubscribeAll(this.pubsubTopic, peer)
);

Expand Down Expand Up @@ -241,6 +242,7 @@ export class SubscriptionManager implements ISubscription {
peer,
contentTopics
);

await this.sendLightPushCheckMessage(peer);
return result;
}
Expand Down Expand Up @@ -271,7 +273,8 @@ export class SubscriptionManager implements ISubscription {
}

private async pingSpecificPeer(peerId: PeerId): Promise<CoreProtocolResult> {
const peer = this.getPeers().find((p) => p.id.equals(peerId));
const peers = await this.peerManager.getPeers();
const peer = peers.find((p) => p.id.equals(peerId));
if (!peer) {
return {
success: null,
Expand Down
4 changes: 2 additions & 2 deletions packages/sdk/src/protocols/light_push/light_push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class LightPush implements ILightPush {
};
}

const peers = this.peerManager.getPeers();
const peers = await this.peerManager.getPeers();
if (peers.length === 0) {
return {
successes,
Expand Down Expand Up @@ -117,7 +117,7 @@ export class LightPush implements ILightPush {
maxAttempts?: number
): Promise<void> {
maxAttempts = maxAttempts || DEFAULT_MAX_ATTEMPTS;
const connectedPeers = this.peerManager.getPeers();
const connectedPeers = await this.peerManager.getPeers();

if (connectedPeers.length === 0) {
log.warn("Cannot retry with no connected peers.");
Expand Down
Loading

0 comments on commit d0c6905

Please sign in to comment.