Skip to content

Commit

Permalink
chore(lightpush)!: move protocol implementation opinions to `@waku/sd…
Browse files Browse the repository at this point in the history
…k` (#1887)

* chore: restructure @waku/sdk

* chore: introduce `BaseProtocolCore` and `BaseProtocolSDK`

* chore: introduce `LightPushCore` and `LightPushSDK`

* chore: update `relay` for new types

* chore(sdk): update structure

* chore(filter): add `numPeersToUse`

* chore: update tests

* update: size-limit

* chore: update more tests

* attach issue link to TODOs
  • Loading branch information
danisharora099 authored Mar 11, 2024
1 parent 49c3968 commit 8deab11
Show file tree
Hide file tree
Showing 31 changed files with 462 additions and 338 deletions.
9 changes: 7 additions & 2 deletions .size-limit.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,14 @@ module.exports = [
import: "{ wakuRelay }",
},
{
name: "Light protocols",
name: "Waku Filter",
path: "packages/core/bundle/index.js",
import: "{ wakuLightPush, wakuFilter }",
import: "{ wakuFilter }",
},
{
name: "Waku LightPush",
path: "packages/sdk/bundle/index.js",
import: "{ wakuLightPush }",
},
{
name: "History retrieval protocols",
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

export * as waku_light_push from "./lib/light_push/index.js";
export { LightPushCodec, wakuLightPush } from "./lib/light_push/index.js";
export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js";

export * as waku_store from "./lib/store/index.js";

Expand Down
13 changes: 4 additions & 9 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Libp2p } from "@libp2p/interface";
import type { Peer, PeerStore, Stream } from "@libp2p/interface";
import type {
IBaseProtocol,
IBaseProtocolCore,
Libp2pComponents,
ProtocolCreateOptions,
PubsubTopic
Expand All @@ -16,27 +16,22 @@ import {
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

const DEFAULT_NUM_PEERS_TO_USE = 3;

/**
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
*/
export class BaseProtocol implements IBaseProtocol {
export class BaseProtocol implements IBaseProtocolCore {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
readonly numPeersToUse: number;
protected streamManager: StreamManager;

constructor(
public multicodec: string,
private components: Libp2pComponents,
private log: Logger,
protected pubsubTopics: PubsubTopic[],
public readonly pubsubTopics: PubsubTopic[],
private options?: ProtocolCreateOptions
) {
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;

this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
Expand Down Expand Up @@ -86,7 +81,7 @@ export class BaseProtocol implements IBaseProtocol {
* @returns A list of peers that support the protocol sorted by latency.
*/
protected async getPeers(
async getPeers(
{
numPeers,
maxBootstrapPeers
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,8 @@ class Subscription {
}
}

const DEFAULT_NUM_PEERS = 3;

class Filter extends BaseProtocol implements IReceiver {
private activeSubscriptions = new Map<string, Subscription>();

Expand All @@ -357,6 +359,9 @@ class Filter extends BaseProtocol implements IReceiver {
return subscription;
}

//TODO: Remove when FilterCore and FilterSDK are introduced
private readonly numPeersToUse: number;

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
FilterCodecs.SUBSCRIBE,
Expand All @@ -366,6 +371,8 @@ class Filter extends BaseProtocol implements IReceiver {
options
);

this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS;

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
});
Expand Down
204 changes: 99 additions & 105 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
import type { PeerId, Stream } from "@libp2p/interface";
import type { Peer, PeerId, Stream } from "@libp2p/interface";
import {
Failure,
IBaseProtocolCore,
IEncoder,
ILightPush,
IMessage,
Libp2p,
ProtocolCreateOptions,
SendError,
SendResult
SendError
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
isMessageSizeUnderCap
} from "@waku/utils";
import { isMessageSizeUnderCap } from "@waku/utils";
import { Logger } from "@waku/utils";
import all from "it-all";
import * as lp from "it-length-prefixed";
Expand All @@ -38,10 +35,20 @@ type PreparePushMessageResult =
error: SendError;
};

type CoreSendResult =
| {
success: null;
failure: Failure;
}
| {
success: PeerId;
failure: null;
};

/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush extends BaseProtocol implements ILightPush {
export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
LightPushCodec,
Expand All @@ -54,8 +61,7 @@ class LightPush extends BaseProtocol implements ILightPush {

private async preparePushMessage(
encoder: IEncoder,
message: IMessage,
pubsubTopic: string
message: IMessage
): Promise<PreparePushMessageResult> {
try {
if (!message.payload || message.payload.length === 0) {
Expand All @@ -77,7 +83,7 @@ class LightPush extends BaseProtocol implements ILightPush {
};
}

const query = PushRpc.createRequest(protoMessage, pubsubTopic);
const query = PushRpc.createRequest(protoMessage, encoder.pubsubTopic);
return { query, error: null };
} catch (error) {
log.error("Failed to prepare push message", error);
Expand All @@ -89,116 +95,104 @@ class LightPush extends BaseProtocol implements ILightPush {
}
}

async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
const { pubsubTopic } = encoder;
ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

const recipients: PeerId[] = [];

async send(
encoder: IEncoder,
message: IMessage,
peer: Peer
): Promise<CoreSendResult> {
const { query, error: preparationError } = await this.preparePushMessage(
encoder,
message,
pubsubTopic
message
);

if (preparationError || !query) {
return {
recipients,
errors: [preparationError]
success: null,
failure: {
error: preparationError,
peerId: peer.id
}
};
}

const peers = await this.getPeers({
maxBootstrapPeers: 1,
numPeers: this.numPeersToUse
});

if (!peers.length) {
let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
err
);
return {
recipients,
errors: [SendError.NO_PEER_AVAILABLE]
success: null,
failure: {
error: SendError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}

const promises = peers.map(async (peer) => {
let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
err
);
return { recipients, error: SendError.REMOTE_PEER_FAULT };
}

let res: Uint8ArrayList[] | undefined;
try {
res = await pipe(
[query.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
} catch (err) {
log.error("Failed to send waku light push request", err);
return { recipients, error: SendError.GENERIC_FAIL };
}

const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});

let response: PushResponse | undefined;
try {
response = PushRpc.decode(bytes).response;
} catch (err) {
log.error("Failed to decode push reply", err);
return { recipients, error: SendError.DECODE_FAILED };
}
let res: Uint8ArrayList[] | undefined;
try {
res = await pipe(
[query.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
} catch (err) {
log.error("Failed to send waku light push request", err);
return {
success: null,
failure: {
error: SendError.GENERIC_FAIL,
peerId: peer.id
}
};
}

if (!response) {
log.error("Remote peer fault: No response in PushRPC");
return { recipients, error: SendError.REMOTE_PEER_FAULT };
}
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
bytes.append(chunk);
});

if (!response.isSuccess) {
log.error("Remote peer rejected the message: ", response.info);
return { recipients, error: SendError.REMOTE_PEER_REJECTED };
}
let response: PushResponse | undefined;
try {
response = PushRpc.decode(bytes).response;
} catch (err) {
log.error("Failed to decode push reply", err);
return {
success: null,
failure: {
error: SendError.DECODE_FAILED,
peerId: peer.id
}
};
}

recipients.some((recipient) => recipient.equals(peer.id)) ||
recipients.push(peer.id);
if (!response) {
log.error("Remote peer fault: No response in PushRPC");
return {
success: null,
failure: {
error: SendError.REMOTE_PEER_FAULT,
peerId: peer.id
}
};
}

return { recipients };
});
if (!response.isSuccess) {
log.error("Remote peer rejected the message: ", response.info);
return {
success: null,
failure: {
error: SendError.REMOTE_PEER_REJECTED,
peerId: peer.id
}
};
}

const results = await Promise.allSettled(promises);

// TODO: handle renewing faulty peers with new peers (https://github.com/waku-org/js-waku/issues/1463)
const errors = results
.filter(
(
result
): result is PromiseFulfilledResult<{
recipients: PeerId[];
error: SendError | undefined;
}> => result.status === "fulfilled"
)
.map((result) => result.value.error)
.filter((error) => error !== undefined) as SendError[];

return {
recipients,
errors
};
return { success: peer.id, failure: null };
}
}

export function wakuLightPush(
init: Partial<ProtocolCreateOptions> = {}
): (libp2p: Libp2p) => ILightPush {
return (libp2p: Libp2p) => new LightPush(libp2p, init);
}
Loading

0 comments on commit 8deab11

Please sign in to comment.