Skip to content
This repository has been archived by the owner on Feb 26, 2024. It is now read-only.

fix: implement backoff retry policy for websocket handler #3600

Open
wants to merge 12 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 117 additions & 40 deletions src/chains/ethereum/ethereum/src/forking/handlers/ws-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ import { JsonRpcResponse, JsonRpcError } from "@ganache/utils";

const { JSONRPC_PREFIX } = BaseHandler;

export type RetryConfiguration = {
retryIntervalBaseInSeconds: number;
retryCounter: number;
};

export class WsHandler extends BaseHandler implements Handler {
private open: Promise<unknown>;
private connection: WebSocket;
Expand All @@ -20,50 +25,80 @@ export class WsHandler extends BaseHandler implements Handler {
}>
>();

constructor(options: EthereumInternalOptions, abortSignal: AbortSignal) {
// queue requests when connection is closed.
private delayedRequestsQueue = [];
// flag to identify if adhoc reconnection attempt.
private adhocReconnectionRequest = false;

// retry configuration
private retryCounter: number = 3;
private retryIntervalBaseInSeconds: number = 2;
private initialRetryCounter: number;
private retryTimeoutId: NodeJS.Timeout;

// socket configuration
private url: string;
private origin: string;
private logging: EthereumInternalOptions["logging"];

constructor(
options: EthereumInternalOptions,
abortSignal: AbortSignal,
retryConfiguration?: RetryConfiguration | undefined
) {
super(options, abortSignal);

const {
fork: { url, origin },
logging
} = options;
this.url = url.toString();
this.origin = origin;
this.logging = logging;

// set retry configuration values
if (retryConfiguration) {
this.retryCounter = retryConfiguration.retryCounter;
this.initialRetryCounter = retryConfiguration.retryIntervalBaseInSeconds;
}
this.initialRetryCounter = this.retryCounter;

this.connection = new WebSocket(url.toString(), {
origin,
headers: this.headers
});
// `nodebuffer` is already the default, but I just wanted to be explicit
// here because when `nodebuffer` is the binaryType the `message` event's
// data type is guaranteed to be a `Buffer`. We don't need to check for
// different types of data.
// I mention all this because if `arraybuffer` or `fragment` is used for the
// binaryType the `"message"` event's `data` may end up being
// `ArrayBuffer | Buffer`, or `Buffer[] | Buffer`, respectively.
// If you need to change this, you probably need to change our `onMessage`
// handler too.
this.connection.binaryType = "nodebuffer";

this.open = this.connect(this.connection, logging);
this.connection.onclose = () => {
const onCloseEvent = () => {
// try to connect again...
// Issue: https://github.com/trufflesuite/ganache/issues/3476
// TODO: backoff and eventually fail
// Issue: https://github.com/trufflesuite/ganache/issues/3477
this.open = this.connect(this.connection, logging);
// backoff and eventually fail
// do not schedule reconnection for adhoc reconnection requests
if (this.retryCounter === 0) {
this.logging.logger.log("Connection to Infura has failed. Try again");
} else {
if (!this.adhocReconnectionRequest) {
this.retryCounter--;
clearTimeout(this.retryTimeoutId);
this.retryTimeoutId = setTimeout(async () => {
this.reconnect(this.url, this.origin, false);
}, Math.pow(this.retryIntervalBaseInSeconds, this.initialRetryCounter - this.retryCounter) * 1000);
}
}
};
this.open = this.connect(this.url, this.origin, onCloseEvent);
this.abortSignal.addEventListener("abort", () => {
this.connection.onclose = null;
this.connection.close(1000);
});
this.connection.onmessage = this.onMessage.bind(this);
}

public async request<T>(
method: string,
params: unknown[],
options = { disableCache: false }
) {
await this.open;
try {
await this.open;
} catch (er) {
this.logging.logger.log("Connection to Infura has failed");
// skip the reconnection if connection is being made
if (this.connection.readyState !== this.connection.CONNECTING)
this.reconnect(this.url, this.origin, true);
}
if (this.abortSignal.aborted) return Promise.reject(new AbortError());

const key = JSON.stringify({ method, params });
Expand All @@ -81,7 +116,13 @@ export class WsHandler extends BaseHandler implements Handler {
// Issue: https://github.com/trufflesuite/ganache/issues/3478
this.inFlightRequests.set(messageId, deferred);

this.connection.send(`${JSONRPC_PREFIX}${messageId},${key.slice(1)}`);
// if connection is alive send request else delay the request
const data = `${JSONRPC_PREFIX}${messageId},${key.slice(1)}`;
if (this.connection && this.connection.readyState === 1) {
this.connection.send(data);
} else {
this.delayRequest(data);
}
return deferred.promise.finally(() => this.requestCache.delete(key));
};
return await this.queueRequest<T>(method, params, key, send, options);
Expand All @@ -105,26 +146,62 @@ export class WsHandler extends BaseHandler implements Handler {
}
}

private connect(
connection: WebSocket,
logging: EthereumInternalOptions["logging"]
) {
private connect(url: string, origin: string, onCloseEvent: any) {
this.connection = new WebSocket(url, {
origin,
headers: this.headers
});
// `nodebuffer` is already the default, but I just wanted to be explicit
// here because when `nodebuffer` is the binaryType the `message` event's
// data type is guaranteed to be a `Buffer`. We don't need to check for
// different types of data.
// I mention all this because if `arraybuffer` or `fragment` is used for the
// binaryType the `"message"` event's `data` may end up being
// `ArrayBuffer | Buffer`, or `Buffer[] | Buffer`, respectively.
// If you need to change this, you probably need to change our `onMessage`
// handler too.
this.connection.binaryType = "nodebuffer";
this.connection.onclose = onCloseEvent;
this.connection.onmessage = this.onMessage.bind(this);
let open = new Promise((resolve, reject) => {
connection.onopen = resolve;
connection.onerror = reject;
this.connection.onopen = resolve;
this.connection.onerror = reject;
});
open.then(() => {
this.connection.onopen = null;
this.connection.onerror = null;
// reset the retry counter and any timeouts scheduled for retries
this.retryCounter = this.initialRetryCounter;
clearTimeout(this.retryTimeoutId);

this.adhocReconnectionRequest = false;
// process delayed requests which were queued at the time of connection failure
this.sendDelayedRequests();
});
open.then(
() => {
connection.onopen = null;
connection.onerror = null;
},
err => {
logging.logger.log(err);
}
);
return open;
}

private reconnect(
url: string,
origin: string,
adhocReconnectionRequest: boolean = false
) {
this.adhocReconnectionRequest = adhocReconnectionRequest;
const onCloseEvent = this.connection.onclose;
this.open = this.connect(url, origin, onCloseEvent);
}

private delayRequest(request: any) {
this.delayedRequestsQueue.push(request);
}

private sendDelayedRequests() {
while (this.delayedRequestsQueue.length > 0) {
const request = this.delayedRequestsQueue.pop();
this.connection.send(request);
}
}

public async close() {
await super.close();
this.connection.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import assert from "assert";
import AbortController from "abort-controller";
import { WsHandler } from "../../../src/forking/handlers/ws-handler";
import {
EthereumOptionsConfig,
EthereumProviderOptions
} from "@ganache/ethereum-options";
import WebSocket from "ws";

const createWebSocketServer = (port: number): WebSocket.Server => {
let wsServer = new WebSocket.Server({ port });
wsServer.on("connection", async ws => {
ws.on("message", data => {
const message = JSON.parse(data.toString());
ws.send(
Buffer.from(
JSON.stringify({
id: message.id,
jsonrpc: "2.0",
result: "0x0"
}),
"utf-8"
)
);
if (message.method === "client-disconnect") {
setTimeout(() => {
ws.terminate();
}, 10);
}
});
});
return wsServer;
};

// create test server
const URL = "ws://localhost:8888/";
let wsServer: WebSocket.Server;
let wsHandler: WsHandler;
wsServer = createWebSocketServer(8888);

describe("ws-handler", function () {
describe("retries", function () {
before(() => {
const providerOptions = EthereumOptionsConfig.normalize({
fork: {
url: URL,
origin: "test"
}
} as EthereumProviderOptions);
const abortController: AbortController = new AbortController();
wsHandler = new WsHandler(providerOptions, abortController.signal, {
retryCounter: 4,
retryIntervalBaseInSeconds: 3
});
});

after(() => {
wsHandler.close();
wsServer.close();
});

it("should attempt to reconnect the server when connection is terminated", async () => {
// send a request to websocket server to get connection termination.
await wsHandler.request<any>("client-disconnect", [], {
disableCache: true
});
await new Promise(resolve => setTimeout(resolve, 100));

// send request after connection is terminated
const retryPromise = wsHandler.request<any>("retry", [], {
disableCache: true
});

// assert the result
const response = await retryPromise;
assert.equal(response, "0x0");
}).timeout(10000);
});
});