Skip to content

Commit

Permalink
chore: use websocket to wait for new block
Browse files Browse the repository at this point in the history
Previous behaviour used to be
HTTP polling busy loop.
  • Loading branch information
foxpy committed Dec 23, 2022
1 parent affda7e commit 3942479
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 61 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ CONTRACTS_PATH - path to contracts that will be used in tests
NEUTRON_ADDRESS_PREFIX - address prefix for neutron controller network
COSMOS_ADDRESS_PREFIX - address prefix for gaia (cosmoshub) host network
NODE1_URL - url to the first node
NODE1_WS_URL - url to websocket of the first node
NODE2_URL - url to the second node
NODE2_WS_URL - url to websocket of the second node
BLOCKS_COUNT_BEFORE_START - how many blocks we wait before start first test
NO_DOCKER - do not start cosmopark for tests
NO_REBUILD - skip containers rebuilding
Expand Down
19 changes: 13 additions & 6 deletions src/helpers/cosmos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import axios from 'axios';
import { CodeId, Wallet } from '../types';
import Long from 'long';
import path from 'path';
import { waitBlocks } from './wait';
import {
CosmosTxV1beta1GetTxResponse,
InlineResponse20075TxResponse,
} from '@cosmos-client/core/cjs/openapi/api';
import { cosmos, google } from '@cosmos-client/core/cjs/proto';
import { CosmosSDK } from '@cosmos-client/core/cjs/sdk';
import { ibc } from '@cosmos-client/ibc/cjs/proto';
import crypto from 'crypto';
import ICoin = cosmos.base.v1beta1.ICoin;
import { ibc } from '@cosmos-client/ibc/cjs/proto';
import IHeight = ibc.core.client.v1.IHeight;
import crypto from 'crypto';
import { BlockWaiter } from './wait';

export const NEUTRON_DENOM = process.env.NEUTRON_DENOM || 'stake';
export const COSMOS_DENOM = process.env.COSMOS_DENOM || 'uatom';
Expand Down Expand Up @@ -134,12 +134,19 @@ cosmosclient.codec.register(

export class CosmosWrapper {
sdk: cosmosclient.CosmosSDK;
blockWaiter: BlockWaiter;
wallet: Wallet;
denom: string;

constructor(sdk: cosmosclient.CosmosSDK, wallet: Wallet, denom: string) {
constructor(
sdk: cosmosclient.CosmosSDK,
blockWaiter: BlockWaiter,
wallet: Wallet,
denom: string,
) {
this.denom = denom;
this.sdk = sdk;
this.blockWaiter = blockWaiter;
this.wallet = wallet;
}

Expand Down Expand Up @@ -193,7 +200,7 @@ export class CosmosWrapper {
const txhash = res.data?.tx_response.txhash;
let error = null;
while (numAttempts > 0) {
await waitBlocks(this.sdk, 1);
await this.blockWaiter.next();
numAttempts--;
const data = await rest.tx
.getTx(this.sdk as CosmosSDK, txhash)
Expand Down Expand Up @@ -303,7 +310,7 @@ export class CosmosWrapper {
}

numAttempts--;
await waitBlocks(this.sdk, 1);
await this.blockWaiter.next();
}

throw new Error('failed to query contract');
Expand Down
14 changes: 9 additions & 5 deletions src/helpers/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const BLOCKS_COUNT_BEFORE_START = process.env.BLOCKS_COUNT_BEFORE_START

let alreadySetUp = false;

export const setup = async (host: string) => {
export const setup = async (host1: string, host2: string) => {
if (alreadySetUp) {
console.log('already set up');
return;
Expand All @@ -32,8 +32,12 @@ export const setup = async (host: string) => {
showVersions();
await showContractsHashes();

await waitForHTTP(host);
await waitForChannel(host);
await waitForHTTP(host1);
await waitForChannel(host1);
await waitForHTTP(host2);
await waitForChannel(host2);
await wait(20); // FIXME: this hardcoded sleep is here to wait until hermes is fully initialized.
// proper fix would be to monitor hermes status events.
alreadySetUp = true;
};

Expand All @@ -53,7 +57,7 @@ export const waitForHTTP = async (
}
// eslint-disable-next-line no-empty
} catch (e) {}
await wait(10);
await wait(1);
}
throw new Error('No port opened');
};
Expand All @@ -80,7 +84,7 @@ export const waitForChannel = async (
}
// eslint-disable-next-line no-empty
} catch (e) {}
await wait(10);
await wait(1);
}

throw new Error('No channel opened');
Expand Down
2 changes: 1 addition & 1 deletion src/helpers/ica.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export const getIca = (
numAttempts = 20,
) =>
getWithAttempts(
cm.sdk,
cm,
() =>
cm.queryContract<{
interchain_account_address: string;
Expand Down
4 changes: 2 additions & 2 deletions src/helpers/icq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export const waitForICQResultWithRemoteHeight = (
numAttempts = 20,
) =>
getWithAttempts(
cm.sdk,
cm,
() => getRegisteredQuery(cm, contractAddress, queryId),
async (query) =>
query.registered_query.last_submitted_result_remote_height >=
Expand Down Expand Up @@ -80,7 +80,7 @@ export const waitForTransfersAmount = (
numAttempts = 50,
) =>
getWithAttempts(
cm.sdk,
cm,
async () =>
(await queryTransfersNumber(cm, contractAddress)).transfers_number,
async (amount) => amount == expectedTransfersAmount,
Expand Down
46 changes: 35 additions & 11 deletions src/helpers/wait.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { rest } from '@cosmos-client/core';
import { rest, websocket } from '@cosmos-client/core';

(global as any).WebSocket = require('ws');

export const wait = async (seconds: number) =>
new Promise((r) => {
Expand All @@ -16,23 +18,45 @@ export const getRemoteHeight = async (sdk: any) => {
return +block.data.block.header.height;
};

export const waitBlocks = async (sdk: any, n: number) => {
const targetHeight = (await getRemoteHeight(sdk)) + n;
for (;;) {
await wait(1);
const currentHeight = await getRemoteHeight(sdk);
if (currentHeight >= targetHeight) {
break;
export class BlockWaiter {
url;

constructor(url: string) {
this.url = url;
}

next() {
return new Promise((r) => {
const ws = websocket.connect(this.url);
ws.next({
id: '1',
jsonrpc: '2.0',
method: 'subscribe',
params: ["tm.event='NewBlock'"],
});
ws.subscribe((x) => {
if (Object.entries((x as any).result).length !== 0) {
ws.unsubscribe();
r(x);
}
});
});
}

async waitBlocks(n: number) {
while (n > 0) {
await this.next();
n--;
}
}
};
}

/**
* getWithAttempts waits until readyFunc(getFunc()) returns true
* and only then returns result of getFunc()
*/
export const getWithAttempts = async <T>(
sdk: any,
cm: any,
getFunc: () => Promise<T>,
readyFunc: (t: T) => Promise<boolean>,
numAttempts = 20,
Expand All @@ -48,7 +72,7 @@ export const getWithAttempts = async <T>(
} catch (e) {
error = e;
}
await waitBlocks(sdk, 1);
await cm.blockWaiter.next();
}
throw error != null ? error : new Error('getWithAttempts: no attempts left');
};
12 changes: 11 additions & 1 deletion src/testcases/common_localcosmosnet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { cosmosclient } from '@cosmos-client/core';
import { Wallet } from '../types';
import { mnemonicToWallet } from '../helpers/cosmos';
import { setup } from '../helpers/env';
import { BlockWaiter } from '../helpers/wait';

const config = require('../config.json');

Expand Down Expand Up @@ -56,6 +57,8 @@ const walletSet = async (
export class TestStateLocalCosmosTestNet {
sdk1: cosmosclient.CosmosSDK;
sdk2: cosmosclient.CosmosSDK;
blockWaiter1: BlockWaiter;
blockWaiter2: BlockWaiter;
wallets: Record<string, Record<string, Wallet>>;
icq_web_host: string;
init = async () => {
Expand All @@ -70,7 +73,14 @@ export class TestStateLocalCosmosTestNet {

this.icq_web_host = 'http://localhost:9999';

await setup(host1);
this.blockWaiter1 = new BlockWaiter(
process.env.NODE1_WS_URL || 'ws://localhost:26657',
);
this.blockWaiter2 = new BlockWaiter(
process.env.NODE2_WS_URL || 'ws://localhost:16657',
);

await setup(host1, host2);

this.wallets = {};
this.wallets.neutron = await walletSet(this.sdk1, neutron_prefix);
Expand Down
16 changes: 9 additions & 7 deletions src/testcases/interchain_kv_query.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { proto, rest } from '@cosmos-client/core';
import {
CosmosWrapper,
COSMOS_DENOM,
CosmosWrapper,
NEUTRON_DENOM,
NeutronContract,
} from '../helpers/cosmos';
import { TestStateLocalCosmosTestNet } from './common_localcosmosnet';
import { getRemoteHeight, getWithAttempts, waitBlocks } from '../helpers/wait';
import { getRemoteHeight, getWithAttempts } from '../helpers/wait';
import { AccAddress, ValAddress } from '@cosmos-client/core/cjs/types';
import { CosmosSDK } from '@cosmos-client/core/cjs/sdk';
import {
Expand Down Expand Up @@ -174,10 +174,10 @@ const acceptInterchainqueriesParamsChangeProposal = async (
const proposalId = parseInt(attribute);
expect(proposalId).toBeGreaterThanOrEqual(0);

await waitBlocks(cm.sdk, 1);
await cm[1].blockWaiter.next();
await cm.voteYes(proposalId, wallet.address.toString());

await waitBlocks(cm.sdk, 1);
await cm[1].blockWaiter.next();
await cm.executeProposal(proposalId, wallet.address.toString());

await getWithAttempts(
Expand Down Expand Up @@ -271,11 +271,13 @@ describe('Neutron / Interchain KV Query', () => {
cm = {
1: new CosmosWrapper(
testState.sdk1,
testState.blockWaiter1,
testState.wallets.neutron.demo1,
NEUTRON_DENOM,
),
2: new CosmosWrapper(
testState.sdk2,
testState.blockWaiter2,
testState.wallets.cosmos.demo2,
COSMOS_DENOM,
),
Expand Down Expand Up @@ -599,7 +601,7 @@ describe('Neutron / Interchain KV Query', () => {
for (const j of res) {
expect(j).not.toEqual(0);
}
await waitBlocks(cm[1].sdk, 1);
await cm[1].blockWaiter.next();
}
const end = await Promise.all(
[2, 3, 4].map((i) => getKvCallbackStatus(cm[1], contractAddress, i)),
Expand Down Expand Up @@ -660,7 +662,7 @@ describe('Neutron / Interchain KV Query', () => {
testState.wallets.cosmos.demo2.address,
);

await waitBlocks(cm[1].sdk, 1);
await cm[1].blockWaiter.next();

const queryResult = await getRegisteredQuery(
cm[1],
Expand Down Expand Up @@ -720,7 +722,7 @@ describe('Neutron / Interchain KV Query', () => {
testState.wallets.cosmos.demo2.address,
);

await waitBlocks(cm[1].sdk, 1);
await cm[1].blockWaiter.next();

const queryResult = await getRegisteredQuery(
cm[1],
Expand Down
15 changes: 8 additions & 7 deletions src/testcases/interchain_tx_query.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import {
CosmosWrapper,
COSMOS_DENOM,
CosmosWrapper,
NEUTRON_DENOM,
NeutronContract,
} from '../helpers/cosmos';
import { proto } from '@cosmos-client/core';
import { TestStateLocalCosmosTestNet } from './common_localcosmosnet';
import { waitBlocks } from '../helpers/wait';
import Long from 'long';
import {
getRegisteredQuery,
Expand All @@ -28,11 +27,13 @@ describe('Neutron / Interchain TX Query', () => {
await testState.init();
cm = new CosmosWrapper(
testState.sdk1,
testState.blockWaiter1,
testState.wallets.neutron.demo1,
NEUTRON_DENOM,
);
cm2 = new CosmosWrapper(
testState.sdk2,
testState.blockWaiter2,
testState.wallets.cosmos.demo2,
COSMOS_DENOM,
);
Expand Down Expand Up @@ -139,7 +140,7 @@ describe('Neutron / Interchain TX Query', () => {
expect(balances.balances).toEqual([
{ amount: addr2ExpectedBalance.toString(), denom: cm2.denom },
]);
await waitBlocks(cm.sdk, query1UpdatePeriod * 2); // we are waiting for quite a big time just to be sure
await cm.blockWaiter.waitBlocks(query1UpdatePeriod * 2); // we are waiting for quite a big time just to be sure

// the different address is not registered by the contract, so its receivings aren't tracked
let deposits = await queryRecipientTxs(
Expand Down Expand Up @@ -168,7 +169,7 @@ describe('Neutron / Interchain TX Query', () => {
expect(balances.balances).toEqual([
{ amount: addr1ExpectedBalance.toString(), denom: cm2.denom }, // balance hasn't changed thus tx failed
]);
await waitBlocks(cm.sdk, query1UpdatePeriod * 2 + 1); // we are waiting for quite a big time just to be sure
await cm.blockWaiter.waitBlocks(query1UpdatePeriod * 2 + 1); // we are waiting for quite a big time just to be sure

// the watched address receivings are not changed
const deposits = await queryRecipientTxs(
Expand Down Expand Up @@ -471,7 +472,7 @@ describe('Neutron / Interchain TX Query', () => {
const watchedAddr5: string = addr5;
const query5UpdatePeriod = 12;

// by this checks we ensure the transactions will be processed in the desired order
// by these checks we ensure the transactions will be processed in the desired order
test('validate update periods', async () => {
expect(query5UpdatePeriod).toBeGreaterThanOrEqual(9);
expect(query5UpdatePeriod).toBeGreaterThanOrEqual(query4UpdatePeriod * 3);
Expand All @@ -494,7 +495,7 @@ describe('Neutron / Interchain TX Query', () => {
query5UpdatePeriod,
watchedAddr5,
);
await waitBlocks(cm.sdk, 2); // wait for queries handling on init
await cm.blockWaiter.waitBlocks(2); // wait for queries handling on init
});

test('make older sending', async () => {
Expand Down Expand Up @@ -613,7 +614,7 @@ describe('Neutron / Interchain TX Query', () => {
});

test('check that transfer has not been recorded', async () => {
await waitBlocks(cm.sdk, query4UpdatePeriod * 2 + 1); // we are waiting for quite a big time just to be sure
await cm.blockWaiter.waitBlocks(query4UpdatePeriod * 2 + 1); // we are waiting for quite a big time just to be sure
const deposits = await queryRecipientTxs(
cm,
contractAddress,
Expand Down
Loading

0 comments on commit 3942479

Please sign in to comment.