Skip to content

Commit

Permalink
chore: update API
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Apr 30, 2024
1 parent 2da0e44 commit b9b2ef0
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 12 deletions.
7 changes: 5 additions & 2 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,11 @@ export class BaseProtocol implements IBaseProtocolCore {
this.addLibp2pEventListener
);
}
protected async getStream(peer: Peer): Promise<Stream> {
return this.streamManager.getStream(peer);
protected async getStream(peer: Peer): Promise<Stream | null> {
const stream = await this.streamManager.getStream(peer);

if (!stream) return null;
return stream;
}

public get peerStore(): PeerStore {
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
contentTopics: ContentTopic[]
): Promise<void> {
const stream = await this.getStream(peer);
if (!stream) {
throw new Error(`Failed to get stream for peer ${peer.id.toString()}`);
}

const request = FilterSubscribeRpc.createSubscribeRequest(
pubsubTopic,
Expand Down Expand Up @@ -128,6 +131,10 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
contentTopics: ContentTopic[]
): Promise<void> {
const stream = await this.getStream(peer);
if (!stream) {
throw new Error(`Failed to get stream for peer ${peer.id.toString()}`);
}

const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest(
pubsubTopic,
contentTopics
Expand All @@ -138,6 +145,9 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {

async unsubscribeAll(pubsubTopic: PubsubTopic, peer: Peer): Promise<void> {
const stream = await this.getStream(peer);
if (!stream) {
throw new Error(`Failed to get stream for peer ${peer.id.toString()}`);
}

const request = FilterSubscribeRpc.createUnsubscribeAllRequest(pubsubTopic);

Expand Down Expand Up @@ -167,6 +177,9 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {

async ping(peer: Peer): Promise<void> {
const stream = await this.getStream(peer);
if (!stream) {
throw new Error(`Failed to get stream for peer ${peer.id.toString()}`);
}

const request = FilterSubscribeRpc.createSubscriberPingRequest();

Expand Down
14 changes: 4 additions & 10 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Peer, PeerId, Stream } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import {
Failure,
IBaseProtocolCore,
Expand Down Expand Up @@ -100,18 +100,12 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
};
}

let stream: Stream | undefined;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
err
);
const stream = await this.getStream(peer);
if (!stream) {
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peer.id
}
};
Expand Down
6 changes: 6 additions & 0 deletions packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ class Metadata extends BaseProtocol implements IMetadata {
}

const stream = await this.getStream(peer);
if (!stream) {
return {
shardInfo: null,
error: ProtocolError.NO_STREAM_AVAILABLE
};
}

const encodedResponse = await pipe(
[request],
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
const historyRpcQuery = HistoryRpc.createQuery(queryOpts);

const stream = await this.getStream(peer);
if (!stream) {
throw new Error(`Failed to get stream to peer ${peer.id.toString()}`);
}

const res = await pipe(
[historyRpcQuery.encode()],
Expand Down
6 changes: 6 additions & 0 deletions packages/discovery/src/peer-exchange/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
}

const stream = await this.getStream(peer);
if (!stream) {
return {
error: ProtocolError.NO_STREAM_AVAILABLE,
peerInfos: null
};
}

const res = await pipe(
[rpcQuery.encode()],
Expand Down
5 changes: 5 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ export enum ProtocolError {
* on the connection manager before retrying.
*/
NO_PEER_AVAILABLE = "No peer available",
/**
* Failure to find a stream to the peer. This may be because the connection with the peer is not still alive.
* Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used.
*/
NO_STREAM_AVAILABLE = "No stream available",
/**
* The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
Expand Down

0 comments on commit b9b2ef0

Please sign in to comment.