Skip to content

Commit

Permalink
Feature/optional ws url (#1527)
Browse files Browse the repository at this point in the history
Optional WS URL for RPCs to unblock new chain integration.
Charry-picks from:
smartcontractkit/chainlink#14354
smartcontractkit/chainlink#14373
smartcontractkit/chainlink#14534
smartcontractkit/chainlink#14364
smartcontractkit/chainlink#14929

---------

Co-authored-by: Joe Huang <[email protected]>
  • Loading branch information
dhaidashenko and huangzhen1997 authored Nov 14, 2024
1 parent 31925b8 commit 96b7ec5
Show file tree
Hide file tree
Showing 36 changed files with 695 additions and 103 deletions.
5 changes: 5 additions & 0 deletions .changeset/four-kangaroos-appear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Add config validation so it requires ws url when http polling disabled #bugfix
11 changes: 11 additions & 0 deletions .changeset/happy-feet-rhyme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
"chainlink": minor
---

This PR introduce few changes:
- Add a new config option `EVM.NodePool.NewHeadsPollInterval` (0 by default indicate disabled), which is an interval for polling new block periodically using http client rather than subscribe to ws feed.
- Updated new head handler for polling new head over http, and register the subscription in node lifecycle logic.
- If the polling new heads is enabled, WS new heads subscription will be replaced with the new http based polling.

Note: There will be another PR for making WS URL optional with some extra condition.
#added
13 changes: 13 additions & 0 deletions .changeset/kind-numbers-melt.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
---
"chainlink": minor
---

Adding feature flag for `LogBroadcaster` called `LogBroadcasterEnabled`, which is `true` by default to support backwards compatibility.
Adding `LogBroadcasterEnabled` allows certain chains to completely disable the `LogBroadcaster` feature, which is an old feature (getting replaced by logPoller) that only few products are using it:
* OCR1 Median
* *OCR2 Median when ChainReader is disabled
* *pre-OCR2 Keeper
* Flux Monitor
* Direct RequestOCR1 Median

#added
8 changes: 8 additions & 0 deletions .changeset/moody-rules-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": patch
---

- register polling subscription to avoid subscription leaking when rpc client gets closed.
- add a temporary special treatment for SubscribeNewHead before we replace it with SubscribeToHeads. Add a goroutine that forwards new head from poller to caller channel.
- fix a deadlock in poller, by using a new lock for subs slice in rpc client.
#bugfix
8 changes: 8 additions & 0 deletions .changeset/silly-lies-boil.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"chainlink": minor
---

Make websocket URL `WSURL` for `EVM.Nodes` optional, and apply logic so that:
* If WS URL was not provided, SubscribeFilterLogs should fail with an explicit error
* If WS URL was not provided LogBroadcaster should be disabled
#nops
5 changes: 3 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ jobs:
only-new-issues: false # disabled for PRs due to unreliability
args: --out-format colored-line-number,checkstyle:golangci-lint-report.xml
working-directory: ${{ matrix.project.path }}
continue-on-error: true

build-chainlink:
environment: integration
Expand Down Expand Up @@ -584,7 +585,7 @@ jobs:
test_config_chainlink_version: ${{ inputs.evm-ref || github.sha }}
test_config_selected_networks: ${{ env.SELECTED_NETWORKS }}
test_config_logging_run_id: ${{ github.run_id }}
test_config_logstream_log_targets: ${{ vars.LOGSTREAM_LOG_TARGETS }}
test_config_logstream_log_targets: "file"
test_config_test_log_collect: ${{ vars.TEST_LOG_COLLECT }}
cl_repo: ${{ env.CHAINLINK_IMAGE }}
cl_image_tag: ${{ inputs.evm-ref || github.sha }}${{ matrix.product.tag_suffix }}
Expand All @@ -606,7 +607,7 @@ jobs:
go_coverage_src_dir: /var/tmp/go-coverage
go_coverage_dest_dir: ${{ github.workspace }}/.covdata
DEFAULT_CHAINLINK_IMAGE: ${{ env.CHAINLINK_IMAGE }}
DEFAULT_LOKI_TENANT_ID: ${{ secrets.GRAFANA_INTERNAL_TENANT_ID }}
DEFAULT_LOKI_TENANT_ID: "promtail"
DEFAULT_LOKI_ENDPOINT: https://${{ secrets.GRAFANA_INTERNAL_HOST }}/loki/api/v1/push
DEFAULT_LOKI_BASIC_AUTH: ${{ secrets.GRAFANA_INTERNAL_BASIC_AUTH }}
DEFAULT_GRAFANA_BASE_URL: "http://localhost:8080/primary"
Expand Down
18 changes: 12 additions & 6 deletions common/client/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type NodeConfig interface {
FinalizedBlockPollInterval() time.Duration
EnforceRepeatableRead() bool
DeathDeclarationDelay() time.Duration
NewHeadsPollInterval() time.Duration
}

type ChainConfig interface {
Expand Down Expand Up @@ -90,14 +91,14 @@ type node[
services.StateMachine
lfcLog logger.Logger
name string
id int32
id int
chainID CHAIN_ID
nodePoolCfg NodeConfig
chainCfg ChainConfig
order int32
chainFamily string

ws url.URL
ws *url.URL
http *url.URL

rpc RPC
Expand All @@ -120,10 +121,10 @@ func NewNode[
nodeCfg NodeConfig,
chainCfg ChainConfig,
lggr logger.Logger,
wsuri url.URL,
wsuri *url.URL,
httpuri *url.URL,
name string,
id int32,
id int,
chainID CHAIN_ID,
nodeOrder int32,
rpc RPC,
Expand All @@ -135,8 +136,10 @@ func NewNode[
n.chainID = chainID
n.nodePoolCfg = nodeCfg
n.chainCfg = chainCfg
n.ws = wsuri
n.order = nodeOrder
if wsuri != nil {
n.ws = wsuri
}
if httpuri != nil {
n.http = httpuri
}
Expand All @@ -156,7 +159,10 @@ func NewNode[
}

func (n *node[CHAIN_ID, HEAD, RPC]) String() string {
s := fmt.Sprintf("(%s)%s:%s", Primary.String(), n.name, n.ws.String())
s := fmt.Sprintf("(%s)%s", Primary.String(), n.name)
if n.ws != nil {
s = s + fmt.Sprintf(":%s", n.ws.String())
}
if n.http != nil {
s = s + fmt.Sprintf(":%s", n.http.String())
}
Expand Down
9 changes: 7 additions & 2 deletions common/client/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ type testNodeConfig struct {
enforceRepeatableRead bool
finalizedBlockPollInterval time.Duration
deathDeclarationDelay time.Duration
newHeadsPollInterval time.Duration
}

func (n testNodeConfig) NewHeadsPollInterval() time.Duration {
return n.newHeadsPollInterval
}

func (n testNodeConfig) PollFailureThreshold() uint32 {
Expand Down Expand Up @@ -62,10 +67,10 @@ type testNodeOpts struct {
config testNodeConfig
chainConfig clientMocks.ChainConfig
lggr logger.Logger
wsuri url.URL
wsuri *url.URL
httpuri *url.URL
name string
id int32
id int
chainID types.ID
nodeOrder int32
rpc *mockNodeClient[types.ID, Head]
Expand Down
21 changes: 14 additions & 7 deletions core/chains/evm/client/config_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func NewClientConfigs(
deathDeclarationDelay time.Duration,
noNewFinalizedHeadsThreshold time.Duration,
finalizedBlockPollInterval time.Duration,

newHeadsPollInterval time.Duration,
) (commonclient.ChainConfig, evmconfig.NodePool, []*toml.Node, error) {
nodes, err := parseNodeConfigs(nodeCfgs)
if err != nil {
Expand All @@ -59,6 +59,7 @@ func NewClientConfigs(
EnforceRepeatableRead: enforceRepeatableRead,
DeathDeclarationDelay: commonconfig.MustNewDuration(deathDeclarationDelay),
FinalizedBlockPollInterval: commonconfig.MustNewDuration(finalizedBlockPollInterval),
NewHeadsPollInterval: commonconfig.MustNewDuration(newHeadsPollInterval),
}
nodePoolCfg := &evmconfig.NodePoolConfig{C: nodePool}
chainConfig := &evmconfig.EVMConfig{
Expand All @@ -79,15 +80,21 @@ func NewClientConfigs(
func parseNodeConfigs(nodeCfgs []NodeConfig) ([]*toml.Node, error) {
nodes := make([]*toml.Node, len(nodeCfgs))
for i, nodeCfg := range nodeCfgs {
if nodeCfg.WSURL == nil || nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing WS or HTTP URL", i)
var wsURL, httpURL *commonconfig.URL
// wsUrl requirement will be checked in EVMConfig validation
if nodeCfg.WSURL != nil {
wsURL = commonconfig.MustParseURL(*nodeCfg.WSURL)
}

if nodeCfg.HTTPURL == nil {
return nil, fmt.Errorf("node config [%d]: missing HTTP URL", i)
}
wsUrl := commonconfig.MustParseURL(*nodeCfg.WSURL)
httpUrl := commonconfig.MustParseURL(*nodeCfg.HTTPURL)

httpURL = commonconfig.MustParseURL(*nodeCfg.HTTPURL)
node := &toml.Node{
Name: nodeCfg.Name,
WSURL: wsUrl,
HTTPURL: httpUrl,
WSURL: wsURL,
HTTPURL: httpURL,
SendOnly: nodeCfg.SendOnly,
Order: nodeCfg.Order,
}
Expand Down
9 changes: 6 additions & 3 deletions core/chains/evm/client/config_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ func TestClientConfigBuilder(t *testing.T) {
finalityDepth := ptr(uint32(10))
finalityTagEnabled := ptr(true)
noNewHeadsThreshold := time.Second
newHeadsPollInterval := 0 * time.Second
chainCfg, nodePool, nodes, err := client.NewClientConfigs(selectionMode, leaseDuration, chainTypeStr, nodeConfigs,
pollFailureThreshold, pollInterval, syncThreshold, nodeIsSyncingEnabled, noNewHeadsThreshold, finalityDepth,
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold, pollInterval)
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold,
pollInterval, newHeadsPollInterval)
require.NoError(t, err)

// Validate node pool configs
Expand All @@ -52,6 +54,7 @@ func TestClientConfigBuilder(t *testing.T) {
require.Equal(t, *enforceRepeatableRead, nodePool.EnforceRepeatableRead())
require.Equal(t, deathDeclarationDelay, nodePool.DeathDeclarationDelay())
require.Equal(t, pollInterval, nodePool.FinalizedBlockPollInterval())
require.Equal(t, newHeadsPollInterval, nodePool.NewHeadsPollInterval())

// Validate node configs
require.Equal(t, *nodeConfigs[0].Name, *nodes[0].Name)
Expand Down Expand Up @@ -90,15 +93,15 @@ func TestNodeConfigs(t *testing.T) {
require.Len(t, tomlNodes, len(nodeConfigs))
})

t.Run("parsing missing ws url fails", func(t *testing.T) {
t.Run("ws can be optional", func(t *testing.T) {
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo1"),
HTTPURL: ptr("http://foo1.test"),
},
}
_, err := client.ParseTestNodeConfigs(nodeConfigs)
require.Error(t, err)
require.Nil(t, err)
})

t.Run("parsing missing http url fails", func(t *testing.T) {
Expand Down
15 changes: 9 additions & 6 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,25 @@ import (
)

func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, clientErrors evmconfig.ClientErrors, lggr logger.Logger, chainID *big.Int, nodes []*toml.Node, chainType chaintype.ChainType) Client {
var empty url.URL
var primaries []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]
var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
largePayloadRPCTimeout, defaultRPCTimeout := getRPCTimeouts(chainType)
for i, node := range nodes {
var ws *url.URL
if node.WSURL != nil {
ws = (*url.URL)(node.WSURL)
}
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
rpc := NewRPCClient(lggr, nil, (*url.URL)(node.HTTPURL), *node.Name, i, chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i),
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
rpc := NewRPCClient(lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i,
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), cfg.NewHeadsPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
lggr, ws, (*url.URL)(node.HTTPURL), *node.Name, i, chainID, *node.Order,
rpc, "EVM")
primaries = append(primaries, primaryNode)
}
Expand Down
4 changes: 3 additions & 1 deletion core/chains/evm/client/evm_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestNewEvmClient(t *testing.T) {
deathDeclarationDelay := time.Second * 3
noNewFinalizedBlocksThreshold := time.Second * 5
finalizedBlockPollInterval := time.Second * 4
newHeadsPollInterval := time.Second * 4
nodeConfigs := []client.NodeConfig{
{
Name: ptr("foo"),
Expand All @@ -40,7 +41,8 @@ func TestNewEvmClient(t *testing.T) {
finalityTagEnabled := ptr(true)
chainCfg, nodePool, nodes, err := client.NewClientConfigs(selectionMode, leaseDuration, chainTypeStr, nodeConfigs,
pollFailureThreshold, pollInterval, syncThreshold, nodeIsSyncingEnabled, noNewHeadsThreshold, finalityDepth,
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold, finalizedBlockPollInterval)
finalityTagEnabled, finalizedBlockOffset, enforceRepeatableRead, deathDeclarationDelay, noNewFinalizedBlocksThreshold,
finalizedBlockPollInterval, newHeadsPollInterval)
require.NoError(t, err)

client := client.NewEvmClient(nodePool, chainCfg, nil, logger.Test(t), testutils.FixtureChainID, nodes, chaintype.ChainType(chainTypeStr))
Expand Down
15 changes: 10 additions & 5 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type TestNodePoolConfig struct {
NodeErrors config.ClientErrors
EnforceRepeatableReadVal bool
NodeDeathDeclarationDelay time.Duration
NodeNewHeadsPollInterval time.Duration
}

func (tc TestNodePoolConfig) PollFailureThreshold() uint32 { return tc.NodePollFailureThreshold }
Expand All @@ -107,6 +108,10 @@ func (tc TestNodePoolConfig) FinalizedBlockPollInterval() time.Duration {
return tc.NodeFinalizedBlockPollInterval
}

func (tc TestNodePoolConfig) NewHeadsPollInterval() time.Duration {
return tc.NodeNewHeadsPollInterval
}

func (tc TestNodePoolConfig) Errors() config.ClientErrors {
return tc.NodeErrors
}
Expand All @@ -127,7 +132,7 @@ func NewChainClientWithTestNode(
rpcUrl string,
rpcHTTPURL *url.URL,
sendonlyRPCURLs []url.URL,
id int32,
id int,
chainID *big.Int,
) (Client, error) {
parsed, err := url.ParseRequestURI(rpcUrl)
Expand All @@ -140,10 +145,10 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}

var sendonlys []commonclient.SendOnlyNode[*big.Int, RPCClient]
Expand All @@ -152,7 +157,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
rpc := NewRPCClient(lggr, &empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down Expand Up @@ -198,7 +203,7 @@ func NewChainClientWithMockedRpc(
parsed, _ := url.ParseRequestURI("ws://test")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
cfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, parsed, nil, "eth-primary-node-0", 1, chainID, 1, rpc, "EVM")
primaries := []commonclient.Node[*big.Int, *evmtypes.Head, RPCClient]{n}
clientErrors := NewTestClientErrors()
c := NewChainClient(lggr, selectionMode, leaseDuration, noNewHeadsThreshold, primaries, nil, chainID, chainType, &clientErrors, 0)
Expand Down
Loading

0 comments on commit 96b7ec5

Please sign in to comment.