Skip to content

Commit

Permalink
Merge branch 'master' of github.com:waku-org/js-waku into feat/filter…
Browse files Browse the repository at this point in the history
…-error-codes
  • Loading branch information
danisharora099 committed Apr 30, 2024
2 parents fc1cacf + 5b03709 commit 4e37405
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 40 deletions.
2 changes: 1 addition & 1 deletion packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ export class KeepAliveManager {
}

try {
await peerStore.patch(peerId, {
await peerStore.merge(peerId, {
metadata: {
ping: utf8ToBytes(ping.toString())
}
Expand Down
10 changes: 6 additions & 4 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import type {
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";

export type ContentFilter = {
contentTopic: string;
export type SubscribeOptions = {
keepAlive?: number;
};

export type IFilter = IReceiver & IBaseProtocolCore;

export interface ISubscriptionSDK {
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
callback: Callback<T>,
options?: SubscribeOptions
): Promise<SDKProtocolResult>;

unsubscribe(contentTopics: ContentTopic[]): Promise<SDKProtocolResult>;
Expand All @@ -32,7 +33,8 @@ export interface ISubscriptionSDK {
export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic
pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: SubscribeOptions
): Promise<CreateSubscriptionResult>;
};

Expand Down
1 change: 1 addition & 0 deletions packages/interfaces/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ export type CreateLibp2pOptions = Libp2pOptions & {
* @default false
*/
hideWebSocketInfo?: boolean;
pingMaxInboundStreams?: number;
};
8 changes: 7 additions & 1 deletion packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,16 @@ export type ProtocolCreateOptions = {
* - WakuRelay to receive, route and send messages,
* - WakuLightPush to send messages,
* - WakuStore to retrieve messages.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md) for details.
*
*/
shardInfo?: Partial<ShardingParams>;
/**
* Content topics are used to determine pubsubTopics
* If not provided pubsubTopics will be determined based on shardInfo
* See [Waku v2 Topic Usage Recommendations](https://github.com/vacp2p/rfc-index/blob/main/waku/informational/23/topics.md) for details.
*/
contentTopics?: string[];
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export { defaultLibp2p } from "./utils/libp2p.js";
export * from "./utils/content_topic.js";
export * from "./waku.js";

export { createLightNode, createNode } from "./light-node/index.js";
export { createLightNode } from "./light-node/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuFilter } from "./protocols/filter.js";
export { wakuStore } from "./protocols/store.js";
Expand Down
25 changes: 0 additions & 25 deletions packages/sdk/src/light-node/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,6 @@ import { CreateWakuNodeOptions, WakuNode, WakuOptions } from "../waku.js";

export { Libp2pComponents };

/**
* Create a Waku node configured to use autosharding or static sharding.
*/
export async function createNode(
options: CreateWakuNodeOptions = { pubsubTopics: [] }
): Promise<LightNode> {
if (!options.shardInfo) {
throw new Error("Shard info must be set");
}

const libp2p = await createLibp2pAndUpdateOptions(options);

const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);

return new WakuNode(
options as WakuOptions,
libp2p,
store,
lightPush,
filter
) as LightNode;
}

/**
* Create a Waku node that uses Waku Light Push, Filter and Store to send and
* receive messages, enabling low resource consumption.
Expand Down
65 changes: 60 additions & 5 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
type PubsubTopic,
SDKProtocolResult,
type ShardingParams,
SubscribeOptions,
type Unsubscribe
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
Expand All @@ -38,10 +39,15 @@ type SubscriptionCallback<T extends IDecodedMessage> = {

const log = new Logger("sdk:filter");

const MINUTE = 60 * 1000;
const DEFAULT_SUBSCRIBE_OPTIONS = {
keepAlive: MINUTE
};
export class SubscriptionManager implements ISubscriptionSDK {
private readonly pubsubTopic: PubsubTopic;
readonly peers: Peer[];
readonly receivedMessagesHashStr: string[] = [];
private keepAliveTimer: number | null = null;

private subscriptionCallbacks: Map<
ContentTopic,
Expand All @@ -60,7 +66,8 @@ export class SubscriptionManager implements ISubscriptionSDK {

async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SDKProtocolResult> {
const decodersArray = Array.isArray(decoders) ? decoders : [decoders];

Expand Down Expand Up @@ -109,6 +116,10 @@ export class SubscriptionManager implements ISubscriptionSDK {
this.subscriptionCallbacks.set(contentTopic, subscriptionCallback);
});

if (options?.keepAlive) {
this.startKeepAlivePings(options.keepAlive);
}

return finalResult;
}

Expand All @@ -128,8 +139,13 @@ export class SubscriptionManager implements ISubscriptionSDK {
});

const results = await Promise.allSettled(promises);
const finalResult = this.handleResult(results, "unsubscribe");

return this.handleResult(results, "unsubscribe");
if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) {
this.stopKeepAlivePings();
}

return finalResult;
}

async ping(): Promise<SDKProtocolResult> {
Expand All @@ -151,7 +167,13 @@ export class SubscriptionManager implements ISubscriptionSDK {

this.subscriptionCallbacks.clear();

return this.handleResult(results, "unsubscribeAll");
const finalResult = this.handleResult(results, "unsubscribeAll");

if (this.keepAliveTimer) {
this.stopKeepAlivePings();
}

return finalResult;
}

async processIncomingMessage(message: WakuMessage): Promise<void> {
Expand Down Expand Up @@ -207,6 +229,38 @@ export class SubscriptionManager implements ISubscriptionSDK {

return result;
}

private startKeepAlivePings(interval: number): void {
if (this.keepAliveTimer) {
log.info("Recurring pings already set up.");
return;
}

this.keepAliveTimer = setInterval(() => {
const run = async (): Promise<void> => {
try {
log.info("Recurring ping to peers.");
await this.ping();
} catch (error) {
log.error("Stopping recurring pings due to failure", error);
this.stopKeepAlivePings();
}
};

void run();
}, interval) as unknown as number;
}

private stopKeepAlivePings(): void {
if (!this.keepAliveTimer) {
log.info("Already stopped recurring pings.");
return;
}

log.info("Stopping recurring pings.");
clearInterval(this.keepAliveTimer);
this.keepAliveTimer = null;
}
}

class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
Expand Down Expand Up @@ -320,7 +374,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
*/
async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);

Expand All @@ -344,7 +399,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
throw Error(`Failed to create subscription: ${error}`);
}

await subscription.subscribe(decoders, callback);
await subscription.subscribe(decoders, callback, options);

const contentTopics = Array.from(
groupByContentTopic(
Expand Down
40 changes: 37 additions & 3 deletions packages/sdk/src/utils/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@ import {
type ShardInfo
} from "@waku/interfaces";
import { wakuGossipSub } from "@waku/relay";
import { ensureShardingConfigured } from "@waku/utils";
import { ensureShardingConfigured, Logger } from "@waku/utils";
import { createLibp2p } from "libp2p";

import { CreateWakuNodeOptions, DefaultUserAgent } from "../waku.js";
import {
CreateWakuNodeOptions,
DefaultPingMaxInboundStreams,
DefaultUserAgent
} from "../waku.js";

import { defaultPeerDiscoveries } from "./discovery.js";

Expand All @@ -31,6 +35,8 @@ type MetadataService = {
metadata?: (components: Libp2pComponents) => IMetadata;
};

const logger = new Logger("sdk:create");

export async function defaultLibp2p(
shardInfo?: ShardInfo,
wakuGossipSub?: PubsubService["pubsub"],
Expand Down Expand Up @@ -70,7 +76,10 @@ export async function defaultLibp2p(
identify: identify({
agentVersion: userAgent ?? DefaultUserAgent
}),
ping: ping(),
ping: ping({
maxInboundStreams:
options?.pingMaxInboundStreams ?? DefaultPingMaxInboundStreams
}),
...metadataService,
...pubsubService,
...options?.services
Expand All @@ -81,6 +90,12 @@ export async function defaultLibp2p(
export async function createLibp2pAndUpdateOptions(
options: CreateWakuNodeOptions
): Promise<Libp2p> {
logWhichShardInfoIsUsed(options);

if (options.contentTopics) {
options.shardInfo = { contentTopics: options.contentTopics };
}

const shardInfo = options.shardInfo
? ensureShardingConfigured(options.shardInfo)
: undefined;
Expand Down Expand Up @@ -110,3 +125,22 @@ export async function createLibp2pAndUpdateOptions(

return libp2p;
}

function logWhichShardInfoIsUsed(options: CreateWakuNodeOptions): void {
if (options.pubsubTopics) {
logger.info("Using pubsubTopics array to bootstrap the node.");
return;
}

if (options.contentTopics) {
logger.info(
"Using contentTopics and default cluster ID (1) to bootstrap the node."
);
return;
}

if (options.shardInfo) {
logger.info("Using shardInfo parameters to bootstrap the node.");
return;
}
}
1 change: 1 addition & 0 deletions packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import { subscribeToContentTopic } from "./utils/content_topic.js";
export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
export const DefaultUserAgent = "js-waku";
export const DefaultPingMaxInboundStreams = 10;

const log = new Logger("waku");

Expand Down
31 changes: 31 additions & 0 deletions packages/tests/tests/metadata.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,4 +233,35 @@ describe("Metadata Protocol", function () {
expect(metadataShardInfo!.clusterId).to.eq(shardInfo.clusterId);
expect(metadataShardInfo.shards).to.include.members(shardInfo.shards);
});

it("receiving a ping from a peer does not overwrite shard info", async function () {
const shardInfo: ShardInfo = {
clusterId: 2,
shards: [1]
};

await nwaku1.start({
relay: true,
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo)
});

const nwaku1Ma = await nwaku1.getMultiaddrWithId();
const nwaku1PeerId = await nwaku1.getPeerId();

waku = await createLightNode({ shardInfo, pingKeepAlive: 1 });
await waku.start();
await waku.libp2p.dialProtocol(nwaku1Ma, MetadataCodec);

// delay to ensure the connection is estabilished, shardInfo is updated, and there is a ping
await delay(1500);

const metadata = (await waku.libp2p.peerStore.get(nwaku1PeerId)).metadata;
expect(metadata.get("shardInfo")).to.not.be.undefined;

const pingInfo = metadata.get("ping");
expect(pingInfo).to.not.be.undefined;
});
});

0 comments on commit 4e37405

Please sign in to comment.