Skip to content

Commit

Permalink
Merge branch 'master' into weboko/network-config
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko authored Jul 19, 2024
2 parents 0439594 + 169a09d commit 0aeed00
Show file tree
Hide file tree
Showing 41 changed files with 338 additions and 305 deletions.
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
}]
}
],
"@typescript-eslint/explicit-member-accessibility": "error",
"prettier/prettier": [
"error",
{
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class BaseProtocol implements IBaseProtocolCore {
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];
protected streamManager: StreamManager;

constructor(
protected constructor(
public multicodec: string,
private components: Libp2pComponents,
private log: Logger,
Expand Down Expand Up @@ -82,7 +82,7 @@ export class BaseProtocol implements IBaseProtocolCore {
* @returns A list of peers that support the protocol sorted by latency.
*/
async getPeers(
public async getPeers(
{
numPeers,
maxBootstrapPeers
Expand Down
40 changes: 21 additions & 19 deletions packages/core/src/lib/filter/filter_rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,32 @@ import { v4 as uuid } from "uuid";
export class FilterPushRpc {
public constructor(public proto: proto.MessagePush) {}

static decode(bytes: Uint8Array): FilterPushRpc {
public static decode(bytes: Uint8Array): FilterPushRpc {
const res = proto.MessagePush.decode(bytes);
return new FilterPushRpc(res);
}

encode(): Uint8Array {
public encode(): Uint8Array {
return proto.MessagePush.encode(this.proto);
}

get wakuMessage(): WakuMessage | undefined {
public get wakuMessage(): WakuMessage | undefined {
return this.proto.wakuMessage;
}

/**
* Get the pubsub topic from the FilterPushRpc object.
* @returns string
*/
get pubsubTopic(): string | undefined {
public get pubsubTopic(): string | undefined {
return this.proto.pubsubTopic;
}
}

export class FilterSubscribeRpc {
public constructor(public proto: proto.FilterSubscribeRequest) {}

static createSubscribeRequest(
public static createSubscribeRequest(
pubsubTopic: string,
contentTopics: string[]
): FilterSubscribeRpc {
Expand All @@ -46,7 +46,7 @@ export class FilterSubscribeRpc {
});
}

static createUnsubscribeRequest(
public static createUnsubscribeRequest(
pubsubTopic: string,
contentTopics: string[]
): FilterSubscribeRpc {
Expand All @@ -59,7 +59,9 @@ export class FilterSubscribeRpc {
});
}

static createUnsubscribeAllRequest(pubsubTopic: string): FilterSubscribeRpc {
public static createUnsubscribeAllRequest(
pubsubTopic: string
): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: uuid(),
filterSubscribeType:
Expand All @@ -69,7 +71,7 @@ export class FilterSubscribeRpc {
});
}

static createSubscriberPingRequest(): FilterSubscribeRpc {
public static createSubscriberPingRequest(): FilterSubscribeRpc {
return new FilterSubscribeRpc({
requestId: uuid(),
filterSubscribeType:
Expand All @@ -79,53 +81,53 @@ export class FilterSubscribeRpc {
});
}

static decode(bytes: Uint8Array): FilterSubscribeRpc {
public static decode(bytes: Uint8Array): FilterSubscribeRpc {
const res = proto.FilterSubscribeRequest.decode(bytes);
return new FilterSubscribeRpc(res);
}

encode(): Uint8Array {
public encode(): Uint8Array {
return proto.FilterSubscribeRequest.encode(this.proto);
}

get filterSubscribeType(): proto.FilterSubscribeRequest.FilterSubscribeType {
public get filterSubscribeType(): proto.FilterSubscribeRequest.FilterSubscribeType {
return this.proto.filterSubscribeType;
}

get requestId(): string {
public get requestId(): string {
return this.proto.requestId;
}

get pubsubTopic(): string | undefined {
public get pubsubTopic(): string | undefined {
return this.proto.pubsubTopic;
}

get contentTopics(): string[] {
public get contentTopics(): string[] {
return this.proto.contentTopics;
}
}

export class FilterSubscribeResponse {
public constructor(public proto: proto.FilterSubscribeResponse) {}

static decode(bytes: Uint8Array): FilterSubscribeResponse {
public static decode(bytes: Uint8Array): FilterSubscribeResponse {
const res = proto.FilterSubscribeResponse.decode(bytes);
return new FilterSubscribeResponse(res);
}

encode(): Uint8Array {
public encode(): Uint8Array {
return proto.FilterSubscribeResponse.encode(this.proto);
}

get statusCode(): number {
public get statusCode(): number {
return this.proto.statusCode;
}

get statusDesc(): string | undefined {
public get statusDesc(): string | undefined {
return this.proto.statusDesc;
}

get requestId(): string {
public get requestId(): string {
return this.proto.requestId;
}
}
90 changes: 45 additions & 45 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export const FilterCodecs = {
};

export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
constructor(
public constructor(
private handleIncomingMessage: (
pubsubTopic: PubsubTopic,
wakuMessage: WakuMessage,
Expand All @@ -58,47 +58,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
});
}

private onRequest(streamData: IncomingStreamData): void {
const { connection, stream } = streamData;
const { remotePeer } = connection;
log.info(`Received message from ${remotePeer.toString()}`);
try {
pipe(stream, lp.decode, async (source) => {
for await (const bytes of source) {
const response = FilterPushRpc.decode(bytes.slice());

const { pubsubTopic, wakuMessage } = response;

if (!wakuMessage) {
log.error("Received empty message");
return;
}

if (!pubsubTopic) {
log.error("Pubsub topic missing from push message");
return;
}

await this.handleIncomingMessage(
pubsubTopic,
wakuMessage,
connection.remotePeer.toString()
);
}
}).then(
() => {
log.info("Receiving pipe closed.");
},
(e) => {
log.error("Error with receiving pipe", e);
}
);
} catch (e) {
log.error("Error decoding message", e);
}
}

async subscribe(
public async subscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
Expand Down Expand Up @@ -152,7 +112,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
};
}

async unsubscribe(
public async unsubscribe(
pubsubTopic: PubsubTopic,
peer: Peer,
contentTopics: ContentTopic[]
Expand Down Expand Up @@ -198,7 +158,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
};
}

async unsubscribeAll(
public async unsubscribeAll(
pubsubTopic: PubsubTopic,
peer: Peer
): Promise<CoreProtocolResult> {
Expand Down Expand Up @@ -246,7 +206,7 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
};
}

async ping(peer: Peer): Promise<CoreProtocolResult> {
public async ping(peer: Peer): Promise<CoreProtocolResult> {
let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
Expand Down Expand Up @@ -316,4 +276,44 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
failure: null
};
}

private onRequest(streamData: IncomingStreamData): void {
const { connection, stream } = streamData;
const { remotePeer } = connection;
log.info(`Received message from ${remotePeer.toString()}`);
try {
pipe(stream, lp.decode, async (source) => {
for await (const bytes of source) {
const response = FilterPushRpc.decode(bytes.slice());

const { pubsubTopic, wakuMessage } = response;

if (!wakuMessage) {
log.error("Received empty message");
return;
}

if (!pubsubTopic) {
log.error("Pubsub topic missing from push message");
return;
}

await this.handleIncomingMessage(
pubsubTopic,
wakuMessage,
connection.remotePeer.toString()
);
}
}).then(
() => {
log.info("Receiving pipe closed.");
},
(e) => {
log.error("Error with receiving pipe", e);
}
);
} catch (e) {
log.error("Error decoding message", e);
}
}
}
6 changes: 5 additions & 1 deletion packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ export class KeepAliveManager {
private relayKeepAliveTimers: Map<PeerId, ReturnType<typeof setInterval>[]> =
new Map();

constructor({ options, relay, libp2p }: CreateKeepAliveManagerOptions) {
public constructor({
options,
relay,
libp2p
}: CreateKeepAliveManagerOptions) {
this.options = options;
this.relay = relay;
this.libp2p = libp2p;
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type PreparePushMessageResult = ThisOrThat<"query", PushRpc>;
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
LightPushCodec,
libp2p.components,
Expand Down Expand Up @@ -78,7 +78,7 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
}
}

async send(
public async send(
encoder: IEncoder,
message: IMessage,
peer: Peer
Expand Down
10 changes: 5 additions & 5 deletions packages/core/src/lib/light_push/push_rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { v4 as uuid } from "uuid";
export class PushRpc {
public constructor(public proto: proto.PushRpc) {}

static createRequest(
public static createRequest(
message: proto.WakuMessage,
pubsubTopic: string
): PushRpc {
Expand All @@ -19,20 +19,20 @@ export class PushRpc {
});
}

static decode(bytes: Uint8ArrayList): PushRpc {
public static decode(bytes: Uint8ArrayList): PushRpc {
const res = proto.PushRpc.decode(bytes);
return new PushRpc(res);
}

encode(): Uint8Array {
public encode(): Uint8Array {
return proto.PushRpc.encode(this.proto);
}

get query(): proto.PushRequest | undefined {
public get query(): proto.PushRequest | undefined {
return this.proto.request;
}

get response(): proto.PushResponse | undefined {
public get response(): proto.PushResponse | undefined {
return this.proto.response;
}
}
Loading

0 comments on commit 0aeed00

Please sign in to comment.