Skip to content

Commit

Permalink
Merge branch 'master' into feat/map-correct-fleet
Browse files Browse the repository at this point in the history
  • Loading branch information
weboko authored Jun 3, 2024
2 parents ab6f7ef + c5302fd commit d8ed83f
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 74 deletions.
10 changes: 7 additions & 3 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ export class FilterCore extends BaseProtocol implements IBaseProtocolCore {
options
);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
});
libp2p
.handle(FilterCodecs.PUSH, this.onRequest.bind(this), {
maxInboundStreams: 100
})
.catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
});
}

private onRequest(streamData: IncomingStreamData): void {
Expand Down
11 changes: 4 additions & 7 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,15 @@ export class LightPushCore extends BaseProtocol implements IBaseProtocolCore {
};
}

let stream: Stream | undefined;
let stream: Stream;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error(
`Failed to get a stream for remote peer${peer.id.toString()}`,
err
);
} catch (error) {
log.error("Failed to get stream", error);
return {
success: null,
failure: {
error: ProtocolError.REMOTE_PEER_FAULT,
error: ProtocolError.NO_STREAM_AVAILABLE,
peerId: peer.id
}
};
Expand Down
11 changes: 10 additions & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,16 @@ class Metadata extends BaseProtocol implements IMetadata {
};
}

const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (error) {
log.error("Failed to get stream", error);
return {
shardInfo: null,
error: ProtocolError.NO_STREAM_AVAILABLE
};
}

const encodedResponse = await pipe(
[request],
Expand Down
8 changes: 7 additions & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,13 @@ export class StoreCore extends BaseProtocol implements IStoreCore {

const historyRpcQuery = HistoryRpc.createQuery(queryOpts);

const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (e) {
log.error("Failed to get stream", e);
break;
}

const res = await pipe(
[historyRpcQuery.encode()],
Expand Down
91 changes: 66 additions & 25 deletions packages/core/src/lib/stream_manager.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
import type { PeerUpdate, Stream } from "@libp2p/interface";
import { Peer } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { selectConnection } from "@waku/utils/libp2p";

const CONNECTION_TIMEOUT = 5_000;
const RETRY_BACKOFF_BASE = 1_000;
const MAX_RETRIES = 3;

export class StreamManager {
private streamPool: Map<string, Promise<Stream | void>>;
private readonly streamPool: Map<string, Promise<Stream | void>>;
private readonly log: Logger;

constructor(
Expand All @@ -14,60 +18,97 @@ export class StreamManager {
public addEventListener: Libp2p["addEventListener"]
) {
this.log = new Logger(`stream-manager:${multicodec}`);
this.addEventListener(
"peer:update",
this.handlePeerUpdateStreamPool.bind(this)
);
this.getStream = this.getStream.bind(this);
this.streamPool = new Map();
this.addEventListener("peer:update", this.handlePeerUpdateStreamPool);
}

public async getStream(peer: Peer): Promise<Stream> {
const peerIdStr = peer.id.toString();
const streamPromise = this.streamPool.get(peerIdStr);

if (!streamPromise) {
return this.newStream(peer); // fallback by creating a new stream on the spot
return this.createStream(peer);
}

// We have the stream, let's remove it from the map
this.streamPool.delete(peerIdStr);
this.prepareStream(peer);

this.prepareNewStream(peer);

const stream = await streamPromise;

if (!stream || stream.status === "closed") {
return this.newStream(peer); // fallback by creating a new stream on the spot
try {
const stream = await streamPromise;
if (stream && stream.status !== "closed") {
return stream;
}
} catch (error) {
this.log.warn(`Failed to get stream for ${peerIdStr} -- `, error);
this.log.warn("Attempting to create a new stream for the peer");
}

return stream;
return this.createStream(peer);
}

private async newStream(peer: Peer): Promise<Stream> {
private async createStream(peer: Peer, retries = 0): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connection = selectConnection(connections);

if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);

try {
return await connection.newStream(this.multicodec);
} catch (error) {
if (retries < MAX_RETRIES) {
const backoff = RETRY_BACKOFF_BASE * Math.pow(2, retries);
await new Promise((resolve) => setTimeout(resolve, backoff));
return this.createStream(peer, retries + 1);
}
throw new Error(
`Failed to create a new stream for ${peer.id.toString()} -- ` + error
);
}
}

private prepareNewStream(peer: Peer): void {
const streamPromise = this.newStream(peer).catch(() => {
// No error thrown as this call is not triggered by the user
private prepareStream(peer: Peer): void {
const timeoutPromise = new Promise<void>((resolve) =>
setTimeout(resolve, CONNECTION_TIMEOUT)
);

const streamPromise = Promise.race([
this.createStream(peer),
timeoutPromise.then(() => {
throw new Error("Connection timeout");
})
]).catch((error) => {
this.log.error(
`Failed to prepare a new stream for ${peer.id.toString()}`
`Failed to prepare a new stream for ${peer.id.toString()} -- `,
error
);
});

this.streamPool.set(peer.id.toString(), streamPromise);
}

private handlePeerUpdateStreamPool = (evt: CustomEvent<PeerUpdate>): void => {
const peer = evt.detail.peer;
const { peer } = evt.detail;

if (peer.protocols.includes(this.multicodec)) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareNewStream(peer);
const isConnected = this.isConnectedTo(peer.id);

if (isConnected) {
this.log.info(`Preemptively opening a stream to ${peer.id.toString()}`);
this.prepareStream(peer);
} else {
const peerIdStr = peer.id.toString();
this.streamPool.delete(peerIdStr);
this.log.info(
`Removed pending stream for disconnected peer ${peerIdStr}`
);
}
}
};

private isConnectedTo(peerId: PeerId): boolean {
const connections = this.getConnections(peerId);
return connections.some((connection) => connection.status === "open");
}
}
11 changes: 10 additions & 1 deletion packages/discovery/src/peer-exchange/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,16 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
};
}

const stream = await this.getStream(peer);
let stream;
try {
stream = await this.getStream(peer);
} catch (err) {
log.error("Failed to get stream", err);
return {
peerInfos: null,
error: ProtocolError.NO_STREAM_AVAILABLE
};
}

const res = await pipe(
[rpcQuery.encode()],
Expand Down
5 changes: 5 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,11 @@ export enum ProtocolError {
* on the connection manager before retrying.
*/
NO_PEER_AVAILABLE = "No peer available",
/**
* Failure to find a stream to the peer. This may be because the connection with the peer is not still alive.
* Mitigation can be: retrying after a given time period, or mitigation for `NO_PEER_AVAILABLE` can be used.
*/
NO_STREAM_AVAILABLE = "No stream available",
/**
* The remote peer did not behave as expected. Mitigation for `NO_PEER_AVAILABLE`
* or `DECODE_FAILED` can be used.
Expand Down
26 changes: 9 additions & 17 deletions packages/tests/tests/filter/single_node/subscribe.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});

it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(50000);
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });

Expand All @@ -262,23 +262,15 @@ describe("Waku Filter V2: Subscribe: Single Service Node", function () {
});
}

// Open issue here: https://github.com/waku-org/js-waku/issues/1790
// That's why we use the try catch block
try {
// Verify that each message was received on the corresponding topic.
expect(await messageCollector.waitForMessages(topicCount)).to.eq(true);
td.contentTopics.forEach((topic, index) => {
messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
// Verify that each message was received on the corresponding topic.
expect(await messageCollector.waitForMessages(topicCount)).to.eq(true);
td.contentTopics.forEach((topic, index) => {
messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
} catch (error) {
console.warn(
"This test still fails because of https://github.com/waku-org/js-waku/issues/1790"
);
}
});
});

it("Error when try to subscribe to more than 101 topics (new limit)", async function () {
Expand Down
30 changes: 11 additions & 19 deletions packages/tests/tests/filter/subscribe.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ const runTests = (strictCheckNodes: boolean): void => {
});

it("Subscribe to 100 topics (new limit) at once and receives messages", async function () {
this.timeout(50000);
this.timeout(100_000);
const topicCount = 100;
const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic });

Expand All @@ -309,25 +309,17 @@ const runTests = (strictCheckNodes: boolean): void => {
});
}

// Open issue here: https://github.com/waku-org/js-waku/issues/1790
// That's why we use the try catch block
try {
// Verify that each message was received on the corresponding topic.
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
// Verify that each message was received on the corresponding topic.
expect(
await serviceNodes.messageCollector.waitForMessages(topicCount)
).to.eq(true);
td.contentTopics.forEach((topic, index) => {
serviceNodes.messageCollector.verifyReceivedMessage(index, {
expectedContentTopic: topic,
expectedMessageText: `Message for Topic ${index + 1}`,
expectedPubsubTopic: TestPubsubTopic
});
} catch (error) {
console.warn(
"This test still fails because of https://github.com/waku-org/js-waku/issues/1790"
);
}
});
});

it("Error when try to subscribe to more than 101 topics (new limit)", async function () {
Expand Down

0 comments on commit d8ed83f

Please sign in to comment.