Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(filter)!: new simpler filter API #2092

Merged
merged 7 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
import type { PeerId } from "@libp2p/interface";

import type { IDecodedMessage, IDecoder } from "./message.js";
import type { ContentTopic, PubsubTopic, ThisOrThat } from "./misc.js";
import type { ContentTopic, ThisOrThat } from "./misc.js";
import type {
Callback,
IBaseProtocolCore,
IBaseProtocolSDK,
ProtocolError,
ProtocolUseOptions,
SDKProtocolResult,
ShardingParams
SDKProtocolResult
} from "./protocols.js";
import type { IReceiver } from "./receiver.js";

Expand Down Expand Up @@ -37,12 +36,28 @@ export interface ISubscriptionSDK {

export type IFilterSDK = IReceiver &
IBaseProtocolSDK & { protocol: IBaseProtocolCore } & {
createSubscription(
pubsubTopicShardInfo?: ShardingParams | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult>;
subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult>;
};

export type SubscribeResult = SubscriptionSuccess | SubscriptionError;

type SubscriptionSuccess = {
subscription: ISubscriptionSDK;
error: null;
results: SDKProtocolResult;
};

type SubscriptionError = {
subscription: null;
error: ProtocolError;
results: null;
};

export type CreateSubscriptionResult = ThisOrThat<
"subscription",
ISubscriptionSDK,
Expand Down
5 changes: 5 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@ export enum ProtocolError {
* Ensure that the pubsub topic used for decoder creation is the same as the one used for protocol.
*/
TOPIC_DECODER_MISMATCH = "Topic decoder mismatch",
/**
* The topics passed in the decoders do not match each other, or don't exist at all.
* Ensure that all the pubsub topics used in the decoders are valid and match each other.
*/
INVALID_DECODER_TOPICS = "Invalid decoder topics",
/**
* Failure to find a peer with suitable protocols. This may due to a connection issue.
* Mitigation can be: retrying after a given time period, display connectivity issue
Expand Down
10 changes: 6 additions & 4 deletions packages/interfaces/src/receiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ export interface IReceiver {
toSubscriptionIterator: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
) => Promise<IAsyncIterator<T>>;
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Unsubscribe | Promise<Unsubscribe>;
subscribeWithUnsubscribe: SubscribeWithUnsubscribe;
}

type SubscribeWithUnsubscribe = <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Unsubscribe | Promise<Unsubscribe>;
4 changes: 3 additions & 1 deletion packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class Relay implements IRelay {
};
}

public subscribe<T extends IDecodedMessage>(
public subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
): () => void {
Expand All @@ -171,6 +171,8 @@ class Relay implements IRelay {
};
}

public subscribe = this.subscribeWithUnsubscribe;

private removeObservers<T extends IDecodedMessage>(
observers: Array<[PubsubTopic, Observer<T>]>
): void {
Expand Down
115 changes: 100 additions & 15 deletions packages/sdk/src/protocols/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
type SDKProtocolResult,
type ShardingParams,
type SubscribeOptions,
SubscribeResult,
type Unsubscribe
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
Expand Down Expand Up @@ -448,19 +449,89 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
this.activeSubscriptions = new Map();
}

//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
* - error: A ProtocolError if the subscription creation failed, or null if successful.
* - results: An object containing arrays of failures and successes from the subscription process.
* Only present if the subscription was created successfully.
*
* @throws {Error} If there's an unexpected error during the subscription process.
*
* @remarks
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
* - Even if the subscription was created successfully, there might be some failures in the results.
*
* @example
* ```typescript
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
* if (!subscription || error) {
* console.error("Failed to create subscription:", error);
* }
* console.log("Subscription created successfully");
* if (results.failures.length > 0) {
* console.warn("Some errors occurred during subscription:", results.failures);
* }
* console.log("Successful subscriptions:", results.successes);
*
* ```
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);

private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
if (uniquePubsubTopics.length !== 1) {
return {
subscription: null,
error: ProtocolError.INVALID_DECODER_TOPICS,
results: null
};
}

const pubsubTopic = uniquePubsubTopics[0];

const { subscription, error } = await this.createSubscription(
pubsubTopic,
protocolUseOptions
);

if (error) {
return {
subscription: null,
error: error,
results: null
};
}

const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
);
return {
subscription,
error: null,
results: {
failures: failures,
successes: successes
}
};
}

/**
Expand All @@ -469,7 +540,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
public async createSubscription(
private async createSubscription(
pubsubTopicShardInfo: ShardingParams | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
Expand Down Expand Up @@ -516,7 +587,6 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
};
}

//TODO: remove this dependency on IReceiver
/**
* This method is used to satisfy the `IReceiver` interface.
*
Expand All @@ -532,7 +602,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribe<T extends IDecodedMessage>(
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
Expand Down Expand Up @@ -578,6 +648,21 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
return toAsyncIterator(this, decoders);
}

//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}

private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}

private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
Expand Down
15 changes: 4 additions & 11 deletions packages/tests/tests/ephemeral.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
DecodedMessage,
waitForRemotePeer
} from "@waku/core";
import { ISubscriptionSDK, Protocols } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces";
import {
generatePrivateKey,
Expand Down Expand Up @@ -83,8 +83,6 @@ describe("Waku Message Ephemeral field", function () {
let waku: LightNode;
let nwaku: ServiceNode;

let subscription: ISubscriptionSDK;

afterEachCustom(this, async () => {
await tearDownNodes(nwaku, waku);
});
Expand Down Expand Up @@ -122,11 +120,6 @@ describe("Waku Message Ephemeral field", function () {
Protocols.LightPush,
Protocols.Store
]);

const { error, subscription: _subscription } =
await waku.filter.createSubscription(TestEncoder.pubsubTopic);
if (error) throw error;
subscription = _subscription;
});

it("Ephemeral messages are not stored", async function () {
Expand Down Expand Up @@ -218,7 +211,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([TestDecoder], callback);
await waku.filter.subscribe([TestDecoder], callback);

await delay(200);
const normalTxt = "Normal message";
Expand Down Expand Up @@ -265,7 +258,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([decoder], callback);
await waku.filter.subscribe([decoder], callback);

await delay(200);
const normalTxt = "Normal message";
Expand Down Expand Up @@ -316,7 +309,7 @@ describe("Waku Message Ephemeral field", function () {
const callback = (msg: DecodedMessage): void => {
messages.push(msg);
};
await subscription.subscribe([decoder], callback);
await waku.filter.subscribe([decoder], callback);

await delay(200);
const normalTxt = "Normal message";
Expand Down
Loading
Loading