Skip to content

Commit

Permalink
Merge pull request #33 from ora-io/dev
Browse files Browse the repository at this point in the history
Dev
  • Loading branch information
murongg authored Feb 26, 2025
2 parents 1f0286a + f59394b commit ce3bbb5
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 30 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ora-stack",
"version": "0.3.3",
"version": "0.3.4",
"private": true,
"packageManager": "[email protected]",
"description": "",
Expand Down
2 changes: 1 addition & 1 deletion packages/orap/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ora-io/orap",
"type": "module",
"version": "0.3.3",
"version": "0.3.4",
"packageManager": "[email protected]",
"description": "",
"author": "Norman (nom4dv3), MuRong",
Expand Down
35 changes: 35 additions & 0 deletions packages/orap/tests/test-case.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import dotenv from 'dotenv'
import { RekuProviderManager } from '@ora-io/reku'
// import { sleep } from '@ora-io/utils'
import { startDemo } from '../examples/declarativeDemo/app'

dotenv.config({ path: './packages/orap/tests/.env' })

const chain = 'mainnet'

const wsProvider: RekuProviderManager = new RekuProviderManager(
process.env[`${chain.toUpperCase()}_WSS`]!,
{
// heartbeatInterval: 100,
disabledHeartbeat: true,
},
)
const httpProvider: RekuProviderManager = new RekuProviderManager(
process.env[`${chain.toUpperCase()}_HTTP`]!,
{
// heartbeatInterval: 500,
},
)
const storeConfig = { port: parseInt(process.env.REDIS_PORT!), host: process.env.REDIS_HOST }

setTimeout(async () => {
// for (let i = 0; i < 50; i++) {
// wsProvider.reconnect()
// await sleep(1000)
// }
// setInterval(() => {
// httpProvider.reconnect()
// }, 2000)
}, 10000 * 2)

startDemo({ wsProvider, httpProvider }, storeConfig)
2 changes: 1 addition & 1 deletion packages/reku/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ora-io/reku",
"type": "module",
"version": "0.3.4",
"version": "0.3.5",
"packageManager": "[email protected]",
"description": "",
"author": "Norman (nom4dv3), MuRong",
Expand Down
16 changes: 13 additions & 3 deletions packages/reku/src/event/crosschecker/autochecker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ export class AutoCrossChecker extends BaseCrossChecker {

this.cache = new CrossCheckerCacheManager(options?.store, { keyPrefix: options?.storeKeyPrefix, ttl: options?.storeTtl })

let latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3)
let latestBlockNum = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider?.getBlockNumber()
}, 15 * 1000, 3)

// resume checkpoint priority: options.fromBlock > cache > latestBlockNum + 1
const defaultInitCheckpoint = await this.cache.getCheckpoint() ?? (latestBlockNum)
Expand Down Expand Up @@ -87,7 +91,11 @@ export class AutoCrossChecker extends BaseCrossChecker {
}

const waitNextCrosscheck = async (): Promise<boolean> => {
latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3)
latestBlockNum = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider?.getBlockNumber()
}, 15 * 1000, 3)

// If auto-follow is enabled, update toBlock and check block range
if (options.autoFollowLatestBlock) {
Expand Down Expand Up @@ -145,7 +153,9 @@ export class AutoCrossChecker extends BaseCrossChecker {
else {
debug('Because the latest block %d is too old, skip this cross check', latestBlockNum)
}
return endingCondition()
const end = endingCondition()
debug('polling ending condition: %s', end)
return end
}, pollingInterval)
}

Expand Down
18 changes: 15 additions & 3 deletions packages/reku/src/event/crosschecker/basechecker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ export class BaseCrossChecker {

// define from, to
// TODO: use blockNumber for performance
const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3)
const block = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider.getBlock('latest')
}, 15 * 1000, 3)
if (!block) {
console.warn('crosscheck failed to get latest block')
return
Expand All @@ -58,7 +62,11 @@ export class BaseCrossChecker {
ccfOptions: CrossCheckFromParam,
) {
// TODO: use blockNumber for performance
const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3)
const block = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider.getBlock('latest')
}, 15 * 1000, 3)
if (!block) {
console.warn('crosscheck failed to get latest block')
return
Expand Down Expand Up @@ -121,7 +129,11 @@ export class BaseCrossChecker {
...(topics && { topics }),
}
if (this.provider.provider) {
const logs = await timeoutWithRetry(() => this.provider.provider.getLogs(params), 15 * 1000, 3)
const logs = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider.getLogs(params)
}, 15 * 1000, 3)
// get ignoreLogs keys
const ignoreLogs = options.ignoreLogs

Expand Down
80 changes: 63 additions & 17 deletions packages/reku/src/provider/provider.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { EventEmitter } from 'node:events'
import type { InterfaceAbi } from 'ethers'
import { Interface, WebSocketProvider, ethers } from 'ethers'
import type { ErrorEvent, WebSocket } from 'ws'
import type { ContractAddress } from '@ora-io/utils'
import { WebSocket } from 'ws'
import type { ErrorEvent } from 'ws'
import { type ContractAddress, isInstanceof, to } from '@ora-io/utils'
import { debug } from '../debug'
import { RekuContractManager } from './contract'

Expand Down Expand Up @@ -39,13 +40,17 @@ export class RekuProviderManager {
}

connect() {
const url = new URL(this.providerUrl)
if (url.protocol === 'ws:' || url.protocol === 'wss:')
if (this.isWebSocketProviderUrl)
this._provider = new ethers.WebSocketProvider(this.providerUrl)
else
this._provider = new ethers.JsonRpcProvider(this.providerUrl)
}

get isWebSocketProviderUrl() {
const url = new URL(this.providerUrl)
return url.protocol === 'ws:' || url.protocol === 'wss:'
}

get provider() {
return this._provider as ethers.JsonRpcProvider | WebSocketProvider
}
Expand All @@ -54,6 +59,16 @@ export class RekuProviderManager {
return this._contracts
}

get websocket() {
if (isInstanceof(this._provider, ethers.WebSocketProvider))
return this._provider.websocket
return undefined
}

get destroyed() {
return this._provider?.destroyed
}

addContract(address: ContractAddress, contract: ethers.Contract): RekuContractManager | undefined
addContract(address: ContractAddress, abi: Interface | InterfaceAbi): RekuContractManager | undefined
addContract(address: ContractAddress, abi: Interface | InterfaceAbi | ethers.Contract): RekuContractManager | undefined {
Expand Down Expand Up @@ -156,7 +171,18 @@ export class RekuProviderManager {
socket.onerror = null
debug('remove all listeners of websocket provider')
}
this._provider?.destroy()
debug('reconnect destroyed: %s', this._provider?.destroyed)
if (this._provider && !this._provider.destroyed) {
if (isInstanceof(this._provider, ethers.WebSocketProvider)) {
debug('reconnect websocket readyState: %s', this.websocket?.readyState)
if (this.websocket?.readyState !== WebSocket.CONNECTING)
to(Promise.resolve(this._provider.destroy()))
}
else {
to(Promise.resolve(this._provider.destroy()))
}
}

this._provider = undefined

setTimeout(() => {
Expand Down Expand Up @@ -186,21 +212,41 @@ export class RekuProviderManager {
if (this._options?.disabledHeartbeat)
return
debug('start heartbeat')
this._heartbeatTimer = setInterval(() => {
debug('heartbeat running...')
debug('heartbeat has provider: %s', !!this._provider)
this._provider?.send('net_version', [])
.then((res) => {
debug('heartbeat response: %s', res)
})
.catch((err) => {
this.reconnect()
this._event?.emit('error', err)
debug('heartbeat error: %s', err)
})
this._heartbeatTimer = setInterval(async () => {
if (!this.destroyed) {
debug('heartbeat running...')
const hasProvider = this._hasProvider()
debug('heartbeat has provider: %s', hasProvider)
this._provider?.send('net_version', [])
.then((res) => {
debug('heartbeat response: %s', res)
})
.catch((err) => {
this.reconnect()
this._event?.emit('error', err)
debug('heartbeat error: %s', err)
})
.finally(() => {
debug('heartbeat finally')
})
}
else {
debug('heartbeat destroyed')
}
}, this._heartbeatInterval)
}

private _hasProvider() {
const hasProvider = !!this._provider && !!this._provider.provider
let isInstance = false
if (this.isWebSocketProviderUrl)
isInstance = isInstanceof(this._provider, ethers.WebSocketProvider) && isInstanceof(this._provider.provider, ethers.WebSocketProvider)
else
isInstance = isInstanceof(this._provider, ethers.JsonRpcProvider) && isInstanceof(this._provider.provider, ethers.JsonRpcProvider)

return hasProvider && isInstance && !this._provider?.destroyed
}

private _clearHeartbeat() {
if (this._heartbeatTimer) {
debug('clear heartbeat')
Expand Down
4 changes: 2 additions & 2 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ora-io/utils",
"type": "module",
"version": "0.3.2",
"version": "0.3.3",
"packageManager": "[email protected]",
"description": "",
"author": "Norman (nom4dv3), MuRong",
Expand Down Expand Up @@ -43,7 +43,7 @@
"ethers": ">=6.13.0"
},
"dependencies": {
"@murongg/utils": "^0.1.28",
"@murongg/utils": "^0.2.0",
"cache-manager": "5.7.6",
"cache-manager-ioredis-yet": "2.1.1",
"debug": "^4.3.7",
Expand Down
9 changes: 7 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ce3bbb5

Please sign in to comment.