From 8b8369c1f44eb2d7de5803a77cf58bde1d42f066 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 15 Nov 2024 14:11:42 -0500 Subject: [PATCH] MultiNode Soak Testing (#894) * Add defaults * Add latest block methods * Address comments * lint * Fix lint overflow issues * Update transaction_sender.go * Fix lint * Validate node config * Update toml.go * Add SendOnly nodes * Use pointers on config * Add test outlines * Use test context * Use configured selection mode * Set defaults * lint * Add nil check * Add client test * Add subscription test * tidy * Fix imports * Update chain_test.go * Update multinode.go * Add comments * Update multinode.go * Wrap multinode config * Fix imports * Update .golangci.yml * Use MultiNode * Add multinode to txm * Use MultiNode * Update chain.go * Update balance_test.go * Add retries * Fix head * Update client.go * lint * lint * Use MultiNode TxSender * Update txm_internal_test.go * Address comments * Remove total difficulty * Register polling subs * Extract MultiNodeClient * Remove caching changes * Undo cache changes * Fix tests * Update chain.go * Fix variables * Move classify errors * Fix imports * lint * Update txm_internal_test.go * Update txm_internal_test.go * lint * Fix error classification * Update txm_internal_test.go * Update multinode_client.go * lint * Update classify_errors.go * Update classify_errors.go * Add tests * Add test coverage * lint * Add dial comment * CTF bump for image build * Update pkg/solana/client/multinode_client.go Co-authored-by: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> * Update txm.go * Create loader * Update transaction_sender.go * Fix tests * Update txm_internal_test.go * lint * Update txm.go * Add ctx * Fix imports * Add SendTxResult to TxSender * Update chain_test.go * Enable MultiNode * Move error classification * Add MultiNode config * Use loader * Update multinode.go * Update multinode.go * Use loader in txm tests * lint * Update testconfig.go * Update loader * Use single RPC * Fix tests * lint * Use default thresholds * Address comments * Update classify_errors.go * Update testconfig.go * Update errors * lint * Fix SendTransaction * Update chain.go * Update sendTx * Fix ctx issues * Enable multiple RPCs in soak tests * Update defaults for testing * Add health check tags * Increase sync threshold * Validate heads * Use latestChainInfo * Fix AliveLoop bug * Update configurations * Update transaction_sender.go * Get chain info * Update ctx * Update transaction_sender.go * Update transaction_sender.go * Increase tx timeout * Update transaction_sender.go * Update ctx * Add timer * Update transaction_sender.go * Update transaction_sender.go * Update testconfig.go * Fix ctx * Remove debug logging * Update run_soak_test.sh * lint * Add debugging logs * Fix ctx cancel * Fix ctx cancel * Fix DoAll ctx * Remove debugging logs * Remove logs * defer reportWg * Add result ctx logging * log on close * Update transaction_sender.go * add cancel func * Update transaction_sender.go * Update transaction_sender.go * Add ctx to reportSendTxAnomalies * Update comments * Fix comments * Address comments * lint * lint * Pass context * Update node_lifecycle.go * Use get reader function * Make rpcurls plural * Fix reader getters * lint * fix imports * Update transaction_sender.go * Remove TxError * Rename getReader * lint * Update chain_test.go * Update transmissions_cache.go * Update run_soak_test.sh * Fix deprecated method * Clean up getReader * Use AccountReader --------- Co-authored-by: Damjan Smickovski Co-authored-by: Dmytro Haidashenko <34754799+dhaidashenko@users.noreply.github.com> --- integration-tests/common/common.go | 14 +- integration-tests/common/test_common.go | 16 +-- integration-tests/config/config.go | 12 +- integration-tests/scripts/run_soak_test.sh | 59 ++++++++ integration-tests/solclient/solclient.go | 2 +- integration-tests/solclient/store.go | 4 +- integration-tests/testconfig/testconfig.go | 72 ++++++---- pkg/monitoring/chain_reader.go | 7 +- pkg/solana/cache_test.go | 26 ++-- pkg/solana/chain.go | 5 +- pkg/solana/chain_test.go | 6 +- pkg/solana/client/multinode/multi_node.go | 2 +- pkg/solana/client/multinode/node_lifecycle.go | 34 +++-- pkg/solana/client/multinode/poller.go | 2 +- .../client/multinode/transaction_sender.go | 132 +++++++++--------- pkg/solana/client/multinode_client.go | 46 +++--- pkg/solana/config/multinode.go | 24 ++-- pkg/solana/config_tracker.go | 10 +- pkg/solana/config_tracker_test.go | 4 +- pkg/solana/relay.go | 16 +-- pkg/solana/state_cache.go | 16 ++- pkg/solana/transmissions_cache.go | 16 ++- pkg/solana/transmitter.go | 11 +- pkg/solana/transmitter_test.go | 3 +- 24 files changed, 332 insertions(+), 207 deletions(-) create mode 100755 integration-tests/scripts/run_soak_test.sh diff --git a/integration-tests/common/common.go b/integration-tests/common/common.go index c4de45aea..05ccabbad 100644 --- a/integration-tests/common/common.go +++ b/integration-tests/common/common.go @@ -51,7 +51,7 @@ type TestEnvDetails struct { type ChainDetails struct { ChainName string ChainID string - RPCUrl string + RPCUrls []string RPCURLExternal string WSURLExternal string ProgramAddresses *chainConfig.ProgramAddresses @@ -116,9 +116,9 @@ func New(testConfig *tc.TestConfig) *Common { config = chainConfig.DevnetConfig() privateKeyString = *testConfig.Common.PrivateKey - if *testConfig.Common.RPCURL != "" { - config.RPCUrl = *testConfig.Common.RPCURL - config.WSUrl = *testConfig.Common.WsURL + if len(*testConfig.Common.RPCURLs) > 0 { + config.RPCUrls = *testConfig.Common.RPCURLs + config.WSUrls = *testConfig.Common.WsURLs config.ProgramAddresses = &chainConfig.ProgramAddresses{ OCR2: *testConfig.SolanaConfig.OCR2ProgramID, AccessController: *testConfig.SolanaConfig.AccessControllerProgramID, @@ -130,7 +130,7 @@ func New(testConfig *tc.TestConfig) *Common { c = &Common{ ChainDetails: &ChainDetails{ ChainID: config.ChainID, - RPCUrl: config.RPCUrl, + RPCUrls: config.RPCUrls, ChainName: config.ChainName, ProgramAddresses: config.ProgramAddresses, }, @@ -146,7 +146,7 @@ func New(testConfig *tc.TestConfig) *Common { } // provide getters for TestConfig (pointers to chain details) c.TestConfig.GetChainID = func() string { return c.ChainDetails.ChainID } - c.TestConfig.GetURL = func() string { return c.ChainDetails.RPCUrl } + c.TestConfig.GetURL = func() []string { return c.ChainDetails.RPCUrls } return c } @@ -298,7 +298,7 @@ func (c *Common) CreateJobsForContract(contractNodeInfo *ContractNodeInfo) error bootstrapNodeInternalIP = contractNodeInfo.BootstrapNode.InternalIP() } relayConfig := job.JSONConfig{ - "nodeEndpointHTTP": c.ChainDetails.RPCUrl, + "nodeEndpointHTTP": c.ChainDetails.RPCUrls, "ocr2ProgramID": contractNodeInfo.OCR2.ProgramAddress(), "transmissionsID": contractNodeInfo.Store.TransmissionsAddress(), "storeProgramID": contractNodeInfo.Store.ProgramAddress(), diff --git a/integration-tests/common/test_common.go b/integration-tests/common/test_common.go index a775a5199..b351ee73d 100644 --- a/integration-tests/common/test_common.go +++ b/integration-tests/common/test_common.go @@ -118,9 +118,9 @@ func (m *OCRv2TestState) DeployCluster(contractsDir string) { m.Common.ChainDetails.WSURLExternal = m.Common.Env.URLs["sol"][1] if *m.Config.TestConfig.Common.Network == "devnet" { - m.Common.ChainDetails.RPCUrl = *m.Config.TestConfig.Common.RPCURL - m.Common.ChainDetails.RPCURLExternal = *m.Config.TestConfig.Common.RPCURL - m.Common.ChainDetails.WSURLExternal = *m.Config.TestConfig.Common.WsURL + m.Common.ChainDetails.RPCUrls = *m.Config.TestConfig.Common.RPCURLs + m.Common.ChainDetails.RPCURLExternal = (*m.Config.TestConfig.Common.RPCURLs)[0] + m.Common.ChainDetails.WSURLExternal = (*m.Config.TestConfig.Common.WsURLs)[0] } m.Common.ChainDetails.MockserverURLInternal = m.Common.Env.URLs["qa_mock_adapter_internal"][0] @@ -133,14 +133,14 @@ func (m *OCRv2TestState) DeployCluster(contractsDir string) { require.NoError(m.Config.T, err) // Setting the External RPC url for Gauntlet - m.Common.ChainDetails.RPCUrl = sol.InternalHTTPURL + m.Common.ChainDetails.RPCUrls = []string{sol.InternalHTTPURL} m.Common.ChainDetails.RPCURLExternal = sol.ExternalHTTPURL m.Common.ChainDetails.WSURLExternal = sol.ExternalWsURL if *m.Config.TestConfig.Common.Network == "devnet" { - m.Common.ChainDetails.RPCUrl = *m.Config.TestConfig.Common.RPCURL - m.Common.ChainDetails.RPCURLExternal = *m.Config.TestConfig.Common.RPCURL - m.Common.ChainDetails.WSURLExternal = *m.Config.TestConfig.Common.WsURL + m.Common.ChainDetails.RPCUrls = *m.Config.TestConfig.Common.RPCURLs + m.Common.ChainDetails.RPCURLExternal = (*m.Config.TestConfig.Common.RPCURLs)[0] + m.Common.ChainDetails.WSURLExternal = (*m.Config.TestConfig.Common.WsURLs)[0] } b, err := test_env.NewCLTestEnvBuilder(). @@ -273,7 +273,7 @@ func (m *OCRv2TestState) CreateJobs() { require.NoError(m.Config.T, err, "Error connecting to websocket client") relayConfig := job.JSONConfig{ - "nodeEndpointHTTP": m.Common.ChainDetails.RPCUrl, + "nodeEndpointHTTP": m.Common.ChainDetails.RPCUrls, "ocr2ProgramID": m.Common.ChainDetails.ProgramAddresses.OCR2, "transmissionsID": m.Gauntlet.FeedAddress, "storeProgramID": m.Common.ChainDetails.ProgramAddresses.Store, diff --git a/integration-tests/config/config.go b/integration-tests/config/config.go index 232dfa5d3..1b96b1f77 100644 --- a/integration-tests/config/config.go +++ b/integration-tests/config/config.go @@ -3,8 +3,8 @@ package config type Config struct { ChainName string ChainID string - RPCUrl string - WSUrl string + RPCUrls []string + WSUrls []string ProgramAddresses *ProgramAddresses PrivateKey string } @@ -20,8 +20,8 @@ func DevnetConfig() *Config { ChainName: "solana", ChainID: "devnet", // Will be overridden if set in toml - RPCUrl: "https://api.devnet.solana.com", - WSUrl: "wss://api.devnet.solana.com/", + RPCUrls: []string{"https://api.devnet.solana.com"}, + WSUrls: []string{"wss://api.devnet.solana.com/"}, } } @@ -30,8 +30,8 @@ func LocalNetConfig() *Config { ChainName: "solana", ChainID: "localnet", // Will be overridden if set in toml - RPCUrl: "http://sol:8899", - WSUrl: "ws://sol:8900", + RPCUrls: []string{"http://sol:8899"}, + WSUrls: []string{"ws://sol:8900"}, ProgramAddresses: &ProgramAddresses{ OCR2: "E3j24rx12SyVsG6quKuZPbQqZPkhAUCh8Uek4XrKYD2x", AccessController: "2ckhep7Mvy1dExenBqpcdevhRu7CLuuctMcx7G9mWEvo", diff --git a/integration-tests/scripts/run_soak_test.sh b/integration-tests/scripts/run_soak_test.sh new file mode 100755 index 000000000..7e5490859 --- /dev/null +++ b/integration-tests/scripts/run_soak_test.sh @@ -0,0 +1,59 @@ +#!/bin/bash + +NODE_VERSION=18 + +cd ../smoke || exit + +echo "Switching to required Node.js version $NODE_VERSION..." +export NVM_DIR="$HOME/.nvm" +[ -s "$NVM_DIR/nvm.sh" ] && \. "$NVM_DIR/nvm.sh" +nvm use $NODE_VERSION + +echo "Initializing soak test..." +terminated_by_script=false +while IFS= read -r line; do + echo "$line" + # Check if the line contains the target string + if echo "$line" | grep -q "ocr2:inspect:responses"; then + # Send SIGINT (Ctrl+C) to the 'go test' process + sudo pkill -INT -P $$ go 2>/dev/null + terminated_by_script=true + break + fi +done < <(sudo go test -timeout 24h -count=1 -run TestSolanaOCRV2Smoke/embedded -test.timeout 30m 2>&1) + +# Capture the PID of the background process +READER_PID=$! + +# Start a background timer (sleeps for 15 minutes, then sends SIGALRM to the script) +( sleep 900 && kill -s ALRM $$ ) & +TIMER_PID=$! + +# Set a trap to catch the SIGALRM signal for timeout +trap 'on_timeout' ALRM + +# Function to handle timeout +on_timeout() { + echo "Error: failed to start soak test: timeout exceeded (15 minutes)." + # Send SIGINT to the 'go test' process + pkill -INT -P $$ go 2>/dev/null + # Clean up + kill "$TIMER_PID" 2>/dev/null + kill "$READER_PID" 2>/dev/null + exit 1 +} + +# Wait for the reader process to finish +wait "$READER_PID" +EXIT_STATUS=$? + +# Clean up: kill the timer process if it's still running +kill "$TIMER_PID" 2>/dev/null + +if [ "$terminated_by_script" = true ]; then + echo "Soak test started successfully" + exit 0 +else + echo "Soak test failed to start" + exit 1 +fi diff --git a/integration-tests/solclient/solclient.go b/integration-tests/solclient/solclient.go index 7b3921c19..2d5f52ac7 100644 --- a/integration-tests/solclient/solclient.go +++ b/integration-tests/solclient/solclient.go @@ -481,7 +481,7 @@ func SendFunds(senderPrivateKey string, receiverPublicKey string, lamports uint6 accountTo := solana.MustPublicKeyFromBase58(receiverPublicKey) // Get recent blockhash - recent, err := rpcClient.GetRecentBlockhash(context.Background(), rpc.CommitmentFinalized) + recent, err := rpcClient.GetLatestBlockhash(context.Background(), rpc.CommitmentFinalized) if err != nil { return err } diff --git a/integration-tests/solclient/store.go b/integration-tests/solclient/store.go index 238d5cc31..3bc48bec9 100644 --- a/integration-tests/solclient/store.go +++ b/integration-tests/solclient/store.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-solana/contracts/generated/store" relaySol "github.com/smartcontractkit/chainlink-solana/pkg/solana" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" ) type Store struct { @@ -19,7 +20,8 @@ type Store struct { } func (m *Store) GetLatestRoundData() (uint64, uint64, uint64, error) { - a, _, err := relaySol.GetLatestTransmission(context.Background(), m.Client.RPC, m.Feed.PublicKey(), rpc.CommitmentConfirmed) + getReader := func() (client.AccountReader, error) { return m.Client.RPC, nil } + a, _, err := relaySol.GetLatestTransmission(context.Background(), getReader, m.Feed.PublicKey(), rpc.CommitmentConfirmed) if err != nil { return 0, 0, 0, err } diff --git a/integration-tests/testconfig/testconfig.go b/integration-tests/testconfig/testconfig.go index 394d2bcee..1f482b7f5 100644 --- a/integration-tests/testconfig/testconfig.go +++ b/integration-tests/testconfig/testconfig.go @@ -44,7 +44,7 @@ type TestConfig struct { // getter funcs for passing parameters GetChainID func() string - GetURL func() string + GetURL func() []string } const ( @@ -188,22 +188,22 @@ func (c *TestConfig) ReadFromEnvVar() error { c.Network.RpcWsUrls = rpcWsUrls } - commonRPCURL := ctf_config.MustReadEnvVar_String(E2E_TEST_COMMON_RPC_URL_ENV) - if commonRPCURL != "" { + commonRPCURL := ctf_config.MustReadEnvVar_Strings(E2E_TEST_COMMON_RPC_URL_ENV, ",") + if len(commonRPCURL) > 0 { if c.Common == nil { c.Common = &Common{} } - logger.Info().Msgf("Using %s env var to override Common.RPCURL", E2E_TEST_COMMON_RPC_URL_ENV) - c.Common.RPCURL = &commonRPCURL + logger.Info().Msgf("Using %s env var to override Common.RPCURLs", E2E_TEST_COMMON_RPC_URL_ENV) + c.Common.RPCURLs = &commonRPCURL } - commonWSURL := ctf_config.MustReadEnvVar_String(E2E_TEST_COMMON_WS_URL_ENV) - if commonWSURL != "" { + commonWSURL := ctf_config.MustReadEnvVar_Strings(E2E_TEST_COMMON_WS_URL_ENV, ",") + if len(commonWSURL) > 0 { if c.Common == nil { c.Common = &Common{} } - logger.Info().Msgf("Using %s env var to override Common.WsURL", E2E_TEST_COMMON_WS_URL_ENV) - c.Common.WsURL = &commonWSURL + logger.Info().Msgf("Using %s env var to override Common.WsURLs", E2E_TEST_COMMON_WS_URL_ENV) + c.Common.WsURLs = &commonWSURL } commonPrivateKey := ctf_config.MustReadEnvVar_String(E2E_TEST_COMMON_PRIVATE_KEY_ENV) @@ -256,7 +256,8 @@ func (c *TestConfig) GetNodeConfig() *ctf_config.NodeConfig { } func (c *TestConfig) GetNodeConfigTOML() (string, error) { - var chainID, url string + var chainID string + var url []string if c.GetChainID != nil { chainID = c.GetChainID() } @@ -264,16 +265,35 @@ func (c *TestConfig) GetNodeConfigTOML() (string, error) { url = c.GetURL() } - solConfig := solcfg.TOMLConfig{ - Enabled: ptr.Ptr(true), - ChainID: ptr.Ptr(chainID), - Nodes: []*solcfg.Node{ - { - Name: ptr.Ptr("primary"), - URL: config.MustParseURL(url), - }, + mnConfig := solcfg.MultiNodeConfig{ + MultiNode: solcfg.MultiNode{ + Enabled: ptr.Ptr(true), + SyncThreshold: ptr.Ptr(uint32(170)), }, } + mnConfig.SetDefaults() + + var nodes []*solcfg.Node + for i, u := range url { + nodes = append(nodes, &solcfg.Node{ + Name: ptr.Ptr(fmt.Sprintf("primary-%d", i)), + URL: config.MustParseURL(u), + }) + } + + chainCfg := solcfg.Chain{ + // Increase timeout for TransactionSender + TxTimeout: config.MustNewDuration(2 * time.Minute), + } + chainCfg.SetDefaults() + + solConfig := solcfg.TOMLConfig{ + Enabled: ptr.Ptr(true), + ChainID: ptr.Ptr(chainID), + Nodes: nodes, + MultiNode: mnConfig, + Chain: chainCfg, + } baseConfig := node.NewBaseConfig() baseConfig.Solana = solcfg.TOMLConfigs{ &solConfig, @@ -357,12 +377,12 @@ type Common struct { InsideK8s *bool `toml:"inside_k8"` User *string `toml:"user"` // if rpc requires api key to be passed as an HTTP header - RPCURL *string `toml:"-"` - WsURL *string `toml:"-"` - PrivateKey *string `toml:"-"` - Stateful *bool `toml:"stateful_db"` - InternalDockerRepo *string `toml:"internal_docker_repo"` - DevnetImage *string `toml:"devnet_image"` + RPCURLs *[]string `toml:"-"` + WsURLs *[]string `toml:"-"` + PrivateKey *string `toml:"-"` + Stateful *bool `toml:"stateful_db"` + InternalDockerRepo *string `toml:"internal_docker_repo"` + DevnetImage *string `toml:"devnet_image"` } type SolanaConfig struct { @@ -410,10 +430,10 @@ func (c *Common) Validate() error { if c.PrivateKey == nil { return fmt.Errorf("private_key must be set") } - if c.RPCURL == nil { + if c.RPCURLs == nil { return fmt.Errorf("rpc_url must be set") } - if c.WsURL == nil { + if c.WsURLs == nil { return fmt.Errorf("rpc_url must be set") } diff --git a/pkg/monitoring/chain_reader.go b/pkg/monitoring/chain_reader.go index eb4d4b8e5..9b8c8ebff 100644 --- a/pkg/monitoring/chain_reader.go +++ b/pkg/monitoring/chain_reader.go @@ -7,6 +7,7 @@ import ( "github.com/gagliardetto/solana-go/rpc" pkgSolana "github.com/smartcontractkit/chainlink-solana/pkg/solana" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" ) //go:generate mockery --name ChainReader --output ./mocks/ @@ -31,11 +32,13 @@ type chainReader struct { } func (c *chainReader) GetState(ctx context.Context, account solana.PublicKey, commitment rpc.CommitmentType) (state pkgSolana.State, blockHeight uint64, err error) { - return pkgSolana.GetState(ctx, c.client, account, commitment) + getReader := func() (client.AccountReader, error) { return c.client, nil } + return pkgSolana.GetState(ctx, getReader, account, commitment) } func (c *chainReader) GetLatestTransmission(ctx context.Context, account solana.PublicKey, commitment rpc.CommitmentType) (answer pkgSolana.Answer, blockHeight uint64, err error) { - return pkgSolana.GetLatestTransmission(ctx, c.client, account, commitment) + getReader := func() (client.AccountReader, error) { return c.client, nil } + return pkgSolana.GetLatestTransmission(ctx, getReader, account, commitment) } func (c *chainReader) GetTokenAccountBalance(ctx context.Context, account solana.PublicKey, commitment rpc.CommitmentType) (out *rpc.GetTokenAccountBalanceResult, err error) { diff --git a/pkg/solana/cache_test.go b/pkg/solana/cache_test.go index e39bb52ad..a9126d0ac 100644 --- a/pkg/solana/cache_test.go +++ b/pkg/solana/cache_test.go @@ -106,8 +106,10 @@ func TestGetState(t *testing.T) { })) defer mockServer.Close() + reader := testSetupReader(t, mockServer.URL) + getReader := func() (client.AccountReader, error) { return reader, nil } // happy path does not error (actual state decoding handled in types_test) - _, _, err := GetState(context.TODO(), testSetupReader(t, mockServer.URL), solana.PublicKey{}, "") + _, _, err := GetState(context.TODO(), getReader, solana.PublicKey{}, "") require.NoError(t, err) } @@ -133,17 +135,18 @@ func TestGetLatestTransmission(t *testing.T) { defer mockServer.Close() reader := testSetupReader(t, mockServer.URL) - a, _, err := GetLatestTransmission(context.TODO(), reader, solana.PublicKey{}, "") + getReader := func() (client.AccountReader, error) { return reader, nil } + a, _, err := GetLatestTransmission(context.TODO(), getReader, solana.PublicKey{}, "") assert.NoError(t, err) assert.Equal(t, expectedTime, a.Timestamp) assert.Equal(t, expectedAns, a.Data.String()) // fail if returned transmission header is too short - _, _, err = GetLatestTransmission(context.TODO(), reader, solana.PublicKey{}, "") + _, _, err = GetLatestTransmission(context.TODO(), getReader, solana.PublicKey{}, "") assert.Error(t, err) // fail if returned transmission is too short - _, _, err = GetLatestTransmission(context.TODO(), reader, solana.PublicKey{}, "") + _, _, err = GetLatestTransmission(context.TODO(), getReader, solana.PublicKey{}, "") assert.Error(t, err) } @@ -166,12 +169,15 @@ func TestCache(t *testing.T) { w.Write(testTransmissionsResponse(t, body, 0)) //nolint:errcheck })) + reader := testSetupReader(t, mockServer.URL) + getAccountReader := func() (client.AccountReader, error) { return reader, nil } + lggr := logger.Test(t) stateCache := NewStateCache( solana.MustPublicKeyFromBase58("11111111111111111111111111111111"), "test-chain-id", config.NewDefault(), - testSetupReader(t, mockServer.URL), + getAccountReader, lggr, ) require.NoError(t, stateCache.Start(ctx)) @@ -186,7 +192,7 @@ func TestCache(t *testing.T) { solana.MustPublicKeyFromBase58("11111111111111111111111111111112"), "test-chain-id", config.NewDefault(), - testSetupReader(t, mockServer.URL), + getAccountReader, lggr, ) require.NoError(t, transmissionsCache.Start(ctx)) @@ -220,17 +226,19 @@ func TestNilPointerHandling(t *testing.T) { defer mockServer.Close() errString := "nil pointer returned in " + reader := testSetupReader(t, mockServer.URL) + getReader := func() (client.AccountReader, error) { return reader, nil } // fail on get state query - _, _, err := GetState(context.TODO(), reader, solana.PublicKey{}, "") + _, _, err := GetState(context.TODO(), getReader, solana.PublicKey{}, "") assert.EqualError(t, err, errString+"GetState.GetAccountInfoWithOpts") // fail on transmissions header query - _, _, err = GetLatestTransmission(context.TODO(), reader, solana.PublicKey{}, "") + _, _, err = GetLatestTransmission(context.TODO(), getReader, solana.PublicKey{}, "") assert.EqualError(t, err, errString+"GetLatestTransmission.GetAccountInfoWithOpts.Header") passFirst = true // allow proper response for header query, fail on transmission - _, _, err = GetLatestTransmission(context.TODO(), reader, solana.PublicKey{}, "") + _, _, err = GetLatestTransmission(context.TODO(), getReader, solana.PublicKey{}, "") assert.EqualError(t, err, errString+"GetLatestTransmission.GetAccountInfoWithOpts.Transmission") } diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 55b199912..ab901a548 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -296,10 +296,7 @@ func newChain(id string, cfg *config.TOMLConfig, ks loop.Keystore, lggr logger.L if result == nil { return solanago.Signature{}, errors.New("tx sender returned nil result") } - if result.Error() != nil { - return solanago.Signature{}, result.Error() - } - return result.Signature(), result.TxError() + return result.Signature(), result.Error() } tc = internal.NewLoader[client.ReaderWriter](func() (client.ReaderWriter, error) { return ch.multiNode.SelectRPC() }) diff --git a/pkg/solana/chain_test.go b/pkg/solana/chain_test.go index b5e9adaf8..db011d80e 100644 --- a/pkg/solana/chain_test.go +++ b/pkg/solana/chain_test.go @@ -472,8 +472,7 @@ func TestChain_MultiNode_TransactionSender(t *testing.T) { // Send tx using transaction sender result := c.txSender.SendTransaction(ctx, unsignedTx(receiver.PublicKey())) require.NotNil(t, result) - require.NoError(t, result.Error()) - require.Error(t, result.TxError()) + require.Error(t, result.Error()) require.Equal(t, mn.Fatal, result.Code()) require.Empty(t, result.Signature()) }) @@ -481,8 +480,7 @@ func TestChain_MultiNode_TransactionSender(t *testing.T) { t.Run("empty transaction", func(t *testing.T) { result := c.txSender.SendTransaction(ctx, &solana.Transaction{}) require.NotNil(t, result) - require.NoError(t, result.Error()) - require.Error(t, result.TxError()) + require.Error(t, result.Error()) require.Equal(t, mn.Fatal, result.Code()) require.Empty(t, result.Signature()) }) diff --git a/pkg/solana/client/multinode/multi_node.go b/pkg/solana/client/multinode/multi_node.go index bd97ebc7b..92a65912b 100644 --- a/pkg/solana/client/multinode/multi_node.go +++ b/pkg/solana/client/multinode/multi_node.go @@ -372,6 +372,6 @@ func (c *MultiNode[CHAIN_ID, RPC]) report(nodesStateInfo []nodeWithState) { c.lggr.Criticalw(rerr.Error(), "nodeStates", nodesStateInfo) c.SvcErrBuffer.Append(rerr) } else if dead > 0 { - c.lggr.Errorw(fmt.Sprintf("At least one primary node is dead: %d/%d nodes are alive", live, total), "nodeStates", nodesStateInfo) + c.lggr.Warnw(fmt.Sprintf("At least one primary node is dead: %d/%d nodes are alive", live, total), "nodeStates", nodesStateInfo) } } diff --git a/pkg/solana/client/multinode/node_lifecycle.go b/pkg/solana/client/multinode/node_lifecycle.go index d6b150690..bca637a22 100644 --- a/pkg/solana/client/multinode/node_lifecycle.go +++ b/pkg/solana/client/multinode/node_lifecycle.go @@ -128,6 +128,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { } } + // Get the latest chain info to use as local highest localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() var pollFailures uint32 @@ -164,10 +165,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareUnreachable() return } - _, latestChainInfo := n.StateAndLatest() - if outOfSync, liveNodes := n.isOutOfSyncWithPool(latestChainInfo); outOfSync { + if outOfSync, liveNodes := n.isOutOfSyncWithPool(); outOfSync { // note: there must be another live node for us to be out of sync - lggr.Errorw("RPC endpoint has fallen behind", "blockNumber", latestChainInfo.BlockNumber, "totalDifficulty", latestChainInfo.TotalDifficulty, "nodeState", n.getCachedState()) if liveNodes < 2 { lggr.Criticalf("RPC endpoint has fallen behind; %s %s", msgCannotDisable, msgDegradedState) continue @@ -306,9 +305,9 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewFinalizedHead(lggr logger.SugaredLogger } latestFinalizedBN := latestFinalized.BlockNumber() - lggr.Tracew("Got latest finalized head", "latestFinalized", latestFinalized) + lggr.Debugw("Got latest finalized head", "latestFinalized", latestFinalized) if latestFinalizedBN <= chainInfo.FinalizedBlockNumber { - lggr.Tracew("Ignoring previously seen finalized block number") + lggr.Debugw("Ignoring previously seen finalized block number") return false } @@ -324,10 +323,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) onNewHead(lggr logger.SugaredLogger, chainIn } promPoolRPCNodeNumSeenBlocks.WithLabelValues(n.chainID.String(), n.name).Inc() - lggr.Tracew("Got head", "head", head) + lggr.Debugw("Got head", "head", head) lggr = lggr.With("latestReceivedBlockNumber", chainInfo.BlockNumber, "blockNumber", head.BlockNumber(), "nodeState", n.getCachedState()) if head.BlockNumber() <= chainInfo.BlockNumber { - lggr.Tracew("Ignoring previously seen block number") + lggr.Debugw("Ignoring previously seen block number") return false } @@ -354,7 +353,7 @@ const ( // isOutOfSyncWithPool returns outOfSync true if num or td is more than SyncThresold behind the best node. // Always returns outOfSync false for SyncThreshold 0. // liveNodes is only included when outOfSync is true. -func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (outOfSync bool, liveNodes int) { +func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveNodes int) { if n.poolInfoProvider == nil { n.lfcLog.Warn("skipping sync state against the pool - should only occur in tests") return // skip for tests @@ -365,16 +364,22 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool(localState ChainInfo) (o } // Check against best node ln, ci := n.poolInfoProvider.LatestChainInfo() + localChainInfo, _ := n.rpc.GetInterceptedChainInfo() mode := n.nodePoolCfg.SelectionMode() switch mode { case NodeSelectionModeHighestHead, NodeSelectionModeRoundRobin, NodeSelectionModePriorityLevel: - return localState.BlockNumber < ci.BlockNumber-int64(threshold), ln + outOfSync = localChainInfo.BlockNumber < ci.BlockNumber-int64(threshold) case NodeSelectionModeTotalDifficulty: bigThreshold := big.NewInt(int64(threshold)) - return localState.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0, ln + outOfSync = localChainInfo.TotalDifficulty.Cmp(bigmath.Sub(ci.TotalDifficulty, bigThreshold)) < 0 default: panic("unrecognized NodeSelectionMode: " + mode) } + + if outOfSync && n.getCachedState() == NodeStateAlive { + n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty) + } + return outOfSync, ln } // outOfSyncLoop takes an OutOfSync node and waits until isOutOfSync returns false to go back to live status @@ -460,7 +465,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { // received a new head - clear NoNewHead flag syncIssues &= ^syncStatusNoNewHead - if outOfSync, _ := n.isOutOfSyncWithPool(localHighestChainInfo); !outOfSync { + if outOfSync, _ := n.isOutOfSyncWithPool(); !outOfSync { // we caught up with the pool - clear NotInSyncWithPool flag syncIssues &= ^syncStatusNotInSyncWithPool } else { @@ -511,7 +516,12 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { finalizedHeadsSub.ResetTimer(noNewFinalizedBlocksTimeoutThreshold) } - lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "syncIssues", syncIssues) + var highestSeen ChainInfo + if n.poolInfoProvider != nil { + highestSeen = n.poolInfoProvider.HighestUserObservations() + } + + lggr.Debugw(msgReceivedFinalizedBlock, "blockNumber", latestFinalized.BlockNumber(), "poolHighestBlockNumber", highestSeen.FinalizedBlockNumber, "syncIssues", syncIssues) case err := <-finalizedHeadsSub.Errors: lggr.Errorw("Finalized head subscription was terminated", "err", err) n.declareUnreachable() diff --git a/pkg/solana/client/multinode/poller.go b/pkg/solana/client/multinode/poller.go index 9ebe1dcfc..0ce87fade 100644 --- a/pkg/solana/client/multinode/poller.go +++ b/pkg/solana/client/multinode/poller.go @@ -65,7 +65,7 @@ func (p *Poller[T]) Err() <-chan error { } func (p *Poller[T]) pollingLoop(ctx context.Context) { - ticker := time.NewTicker(p.pollingInterval) + ticker := services.NewTicker(p.pollingInterval) defer ticker.Stop() for { diff --git a/pkg/solana/client/multinode/transaction_sender.go b/pkg/solana/client/multinode/transaction_sender.go index bd11a71a5..06b2e18be 100644 --- a/pkg/solana/client/multinode/transaction_sender.go +++ b/pkg/solana/client/multinode/transaction_sender.go @@ -26,7 +26,6 @@ var ( type SendTxResult interface { Code() SendTxReturnCode - TxError() error Error() error } @@ -92,89 +91,84 @@ type TransactionSender[TX any, RESULT SendTxResult, CHAIN_ID ID, RPC SendTxRPCCl // * If there is both success and terminal error - returns success and reports invariant violation // * Otherwise, returns any (effectively random) of the errors. func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) SendTransaction(ctx context.Context, tx TX) RESULT { - txResults := make(chan RESULT) - txResultsToReport := make(chan RESULT) - primaryNodeWg := sync.WaitGroup{} - - if txSender.State() != "Started" { - return txSender.newResult(errors.New("TransactionSender not started")) - } + var result RESULT + if !txSender.IfStarted(func() { + txResults := make(chan RESULT) + txResultsToReport := make(chan RESULT) + primaryNodeWg := sync.WaitGroup{} + + healthyNodesNum := 0 + err := txSender.multiNode.DoAll(ctx, func(ctx context.Context, rpc RPC, isSendOnly bool) { + if isSendOnly { + txSender.wg.Add(1) + go func(ctx context.Context) { + ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx)) + defer cancel() + defer txSender.wg.Done() + // Send-only nodes' results are ignored as they tend to return false-positive responses. + // Broadcast to them is necessary to speed up the propagation of TX in the network. + _ = txSender.broadcastTxAsync(ctx, rpc, tx) + }(ctx) + return + } - txSenderCtx, cancel := txSender.chStop.NewCtx() - reportWg := sync.WaitGroup{} - defer func() { + // Primary Nodes + healthyNodesNum++ + primaryNodeWg.Add(1) + go func(ctx context.Context) { + ctx, cancel := txSender.chStop.Ctx(context.WithoutCancel(ctx)) + defer cancel() + defer primaryNodeWg.Done() + r := txSender.broadcastTxAsync(ctx, rpc, tx) + select { + case <-ctx.Done(): + return + case txResults <- r: + } + + select { + case <-ctx.Done(): + return + case txResultsToReport <- r: + } + }(ctx) + }) + + // This needs to be done in parallel so the reporting knows when it's done (when the channel is closed) + txSender.wg.Add(1) go func() { - reportWg.Wait() - cancel() + defer txSender.wg.Done() + primaryNodeWg.Wait() + close(txResultsToReport) + close(txResults) }() - }() - - healthyNodesNum := 0 - err := txSender.multiNode.DoAll(txSenderCtx, func(ctx context.Context, rpc RPC, isSendOnly bool) { - if isSendOnly { - txSender.wg.Add(1) - go func() { - defer txSender.wg.Done() - // Send-only nodes' results are ignored as they tend to return false-positive responses. - // Broadcast to them is necessary to speed up the propagation of TX in the network. - _ = txSender.broadcastTxAsync(ctx, rpc, tx) - }() + + if err != nil { + result = txSender.newResult(err) return } - // Primary Nodes - healthyNodesNum++ - primaryNodeWg.Add(1) - go func() { - defer primaryNodeWg.Done() - r := txSender.broadcastTxAsync(ctx, rpc, tx) - select { - case <-ctx.Done(): - return - case txResults <- r: - } + txSender.wg.Add(1) + go txSender.reportSendTxAnomalies(ctx, tx, txResultsToReport) - select { - case <-ctx.Done(): - return - case txResultsToReport <- r: - } - }() - }) - - // This needs to be done in parallel so the reporting knows when it's done (when the channel is closed) - txSender.wg.Add(1) - go func() { - defer txSender.wg.Done() - primaryNodeWg.Wait() - close(txResultsToReport) - close(txResults) - }() - - if err != nil { - return txSender.newResult(err) + result = txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) + }) { + result = txSender.newResult(errors.New("TransactionSender not started")) } - txSender.wg.Add(1) - reportWg.Add(1) - go func() { - defer reportWg.Done() - txSender.reportSendTxAnomalies(tx, txResultsToReport) - }() - - return txSender.collectTxResults(ctx, tx, healthyNodesNum, txResults) + return result } func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) broadcastTxAsync(ctx context.Context, rpc RPC, tx TX) RESULT { result := rpc.SendTransaction(ctx, tx) - txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.TxError()) - if !slices.Contains(sendTxSuccessfulCodes, result.Code()) { - txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.TxError()) + txSender.lggr.Debugw("Node sent transaction", "tx", tx, "err", result.Error()) + if !slices.Contains(sendTxSuccessfulCodes, result.Code()) && ctx.Err() == nil { + txSender.lggr.Warnw("RPC returned error", "tx", tx, "err", result.Error()) } return result } -func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(tx TX, txResults <-chan RESULT) { +func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomalies(ctx context.Context, tx TX, txResults <-chan RESULT) { defer txSender.wg.Done() resultsByCode := sendTxResults[RESULT]{} // txResults eventually will be closed @@ -183,7 +177,7 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) reportSendTxAnomal } _, criticalErr := aggregateTxResults[RESULT](resultsByCode) - if criticalErr != nil { + if criticalErr != nil && ctx.Err() == nil { txSender.lggr.Criticalw("observed invariant violation on SendTransaction", "tx", tx, "resultsByCode", resultsByCode, "err", criticalErr) PromMultiNodeInvariantViolations.WithLabelValues(txSender.chainFamily, txSender.chainID.String(), criticalErr.Error()).Inc() } @@ -256,6 +250,7 @@ loop: // ignore critical error as it's reported in reportSendTxAnomalies result, _ := aggregateTxResults(errorsByCode) + txSender.lggr.Debugw("Collected results", "errorsByCode", errorsByCode, "result", result) return result } @@ -267,6 +262,7 @@ func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Start(ctx context. func (txSender *TransactionSender[TX, RESULT, CHAIN_ID, RPC]) Close() error { return txSender.StopOnce("TransactionSender", func() error { + txSender.lggr.Debug("Closing TransactionSender") close(txSender.chStop) txSender.wg.Wait() return nil diff --git a/pkg/solana/client/multinode_client.go b/pkg/solana/client/multinode_client.go index 0a68b78f6..e6a70de9c 100644 --- a/pkg/solana/client/multinode_client.go +++ b/pkg/solana/client/multinode_client.go @@ -37,7 +37,7 @@ func (h *Head) BlockDifficulty() *big.Int { } func (h *Head) IsValid() bool { - return h != nil && h.BlockHeight != nil && h.BlockHash != nil + return h != nil && h.BlockHeight != nil && *h.BlockHeight > 0 && h.BlockHash != nil } var _ mn.RPCClient[mn.StringID, *Head] = (*MultiNodeClient)(nil) @@ -102,12 +102,19 @@ func (m *MultiNodeClient) SubscribeToHeads(ctx context.Context) (<-chan *Head, m ctx, cancel, chStopInFlight, _ := m.acquireQueryCtx(ctx, m.cfg.TxTimeout()) defer cancel() - pollInterval := m.cfg.MultiNode.PollInterval() + // TODO: BCFR-1070 - Add BlockPollInterval + pollInterval := m.cfg.MultiNode.FinalizedBlockPollInterval() // Use same interval as finalized polling if pollInterval == 0 { return nil, nil, errors.New("PollInterval is 0") } timeout := pollInterval - poller, channel := mn.NewPoller[*Head](pollInterval, m.LatestBlock, timeout, m.log) + poller, channel := mn.NewPoller[*Head](pollInterval, func(pollRequestCtx context.Context) (*Head, error) { + if mn.CtxIsHeathCheckRequest(ctx) { + pollRequestCtx = mn.CtxAddHealthCheckFlag(pollRequestCtx) + } + return m.LatestBlock(pollRequestCtx) + }, timeout, m.log) + if err := poller.Start(ctx); err != nil { return nil, nil, err } @@ -130,7 +137,12 @@ func (m *MultiNodeClient) SubscribeToFinalizedHeads(ctx context.Context) (<-chan return nil, nil, errors.New("FinalizedBlockPollInterval is 0") } timeout := finalizedBlockPollInterval - poller, channel := mn.NewPoller[*Head](finalizedBlockPollInterval, m.LatestFinalizedBlock, timeout, m.log) + poller, channel := mn.NewPoller[*Head](finalizedBlockPollInterval, func(pollRequestCtx context.Context) (*Head, error) { + if mn.CtxIsHeathCheckRequest(ctx) { + pollRequestCtx = mn.CtxAddHealthCheckFlag(pollRequestCtx) + } + return m.LatestFinalizedBlock(pollRequestCtx) + }, timeout, m.log) if err := poller.Start(ctx); err != nil { return nil, nil, err } @@ -158,6 +170,10 @@ func (m *MultiNodeClient) LatestBlock(ctx context.Context) (*Head, error) { BlockHeight: &result.Value.LastValidBlockHeight, BlockHash: &result.Value.Blockhash, } + if !head.IsValid() { + return nil, errors.New("invalid head") + } + m.onNewHead(ctx, chStopInFlight, head) return head, nil } @@ -175,6 +191,10 @@ func (m *MultiNodeClient) LatestFinalizedBlock(ctx context.Context) (*Head, erro BlockHeight: &result.Value.LastValidBlockHeight, BlockHash: &result.Value.Blockhash, } + if !head.IsValid() { + return nil, errors.New("invalid head") + } + m.onNewFinalizedHead(ctx, chStopInFlight, head) return head, nil } @@ -301,18 +321,16 @@ func (m *MultiNodeClient) GetInterceptedChainInfo() (latest, highestUserObservat } type SendTxResult struct { - err error - txErr error - code mn.SendTxReturnCode - sig solana.Signature + err error + code mn.SendTxReturnCode + sig solana.Signature } var _ mn.SendTxResult = (*SendTxResult)(nil) func NewSendTxResult(err error) *SendTxResult { result := &SendTxResult{ - err: err, - txErr: err, + err: err, } result.code = ClassifySendError(nil, err) return result @@ -322,10 +340,6 @@ func (r *SendTxResult) Error() error { return r.err } -func (r *SendTxResult) TxError() error { - return r.txErr -} - func (r *SendTxResult) Code() mn.SendTxReturnCode { return r.code } @@ -336,7 +350,7 @@ func (r *SendTxResult) Signature() solana.Signature { func (m *MultiNodeClient) SendTransaction(ctx context.Context, tx *solana.Transaction) *SendTxResult { var sendTxResult = &SendTxResult{} - sendTxResult.sig, sendTxResult.txErr = m.SendTx(ctx, tx) - sendTxResult.code = ClassifySendError(tx, sendTxResult.txErr) + sendTxResult.sig, sendTxResult.err = m.SendTx(ctx, tx) + sendTxResult.code = ClassifySendError(tx, sendTxResult.err) return sendTxResult } diff --git a/pkg/solana/config/multinode.go b/pkg/solana/config/multinode.go index 0c49d8b22..d002d489e 100644 --- a/pkg/solana/config/multinode.go +++ b/pkg/solana/config/multinode.go @@ -97,17 +97,17 @@ func (c *MultiNodeConfig) SetDefaults() { if c.MultiNode.PollFailureThreshold == nil { c.MultiNode.PollFailureThreshold = ptr(uint32(5)) } - // Poll interval is set to 10 seconds to ensure timely updates while minimizing resource usage. + // Poll interval is set to 15 seconds to ensure timely updates while minimizing resource usage. if c.MultiNode.PollInterval == nil { - c.MultiNode.PollInterval = config.MustNewDuration(10 * time.Second) + c.MultiNode.PollInterval = config.MustNewDuration(15 * time.Second) } // Selection mode defaults to priority level to enable using node priorities if c.MultiNode.SelectionMode == nil { c.MultiNode.SelectionMode = ptr(mn.NodeSelectionModePriorityLevel) } - // The sync threshold is set to 5 to allow for some flexibility in node synchronization before considering it out of sync. + // The sync threshold is set to 10 to allow for some flexibility in node synchronization before considering it out of sync. if c.MultiNode.SyncThreshold == nil { - c.MultiNode.SyncThreshold = ptr(uint32(5)) + c.MultiNode.SyncThreshold = ptr(uint32(10)) } // Lease duration is set to 1 minute by default to allow node locks for a reasonable amount of time. if c.MultiNode.LeaseDuration == nil { @@ -125,19 +125,19 @@ func (c *MultiNodeConfig) SetDefaults() { if c.MultiNode.EnforceRepeatableRead == nil { c.MultiNode.EnforceRepeatableRead = ptr(true) } - // The delay before declaring a node dead is set to 10 seconds to give nodes time to recover from temporary issues. + // The delay before declaring a node dead is set to 20 seconds to give nodes time to recover from temporary issues. if c.MultiNode.DeathDeclarationDelay == nil { - c.MultiNode.DeathDeclarationDelay = config.MustNewDuration(10 * time.Second) + c.MultiNode.DeathDeclarationDelay = config.MustNewDuration(20 * time.Second) } /* Chain Configs */ - // Threshold for no new heads is set to 10 seconds, assuming that heads should update at a reasonable pace. + // Threshold for no new heads is set to 20 seconds, assuming that heads should update at a reasonable pace. if c.MultiNode.NodeNoNewHeadsThreshold == nil { - c.MultiNode.NodeNoNewHeadsThreshold = config.MustNewDuration(10 * time.Second) + c.MultiNode.NodeNoNewHeadsThreshold = config.MustNewDuration(20 * time.Second) } - // Similar to heads, finalized heads should be updated within 10 seconds. + // Similar to heads, finalized heads should be updated within 20 seconds. if c.MultiNode.NoNewFinalizedHeadsThreshold == nil { - c.MultiNode.NoNewFinalizedHeadsThreshold = config.MustNewDuration(10 * time.Second) + c.MultiNode.NoNewFinalizedHeadsThreshold = config.MustNewDuration(20 * time.Second) } // Finality tags are used in Solana and enabled by default. if c.MultiNode.FinalityTagEnabled == nil { @@ -147,9 +147,9 @@ func (c *MultiNodeConfig) SetDefaults() { if c.MultiNode.FinalityDepth == nil { c.MultiNode.FinalityDepth = ptr(uint32(0)) } - // Finalized block offset will not be used since finality tags are enabled. + // Finalized block offset allows for RPCs to be slightly behind the finalized block. if c.MultiNode.FinalizedBlockOffset == nil { - c.MultiNode.FinalizedBlockOffset = ptr(uint32(0)) + c.MultiNode.FinalizedBlockOffset = ptr(uint32(50)) } } diff --git a/pkg/solana/config_tracker.go b/pkg/solana/config_tracker.go index 998790b45..3ddff2715 100644 --- a/pkg/solana/config_tracker.go +++ b/pkg/solana/config_tracker.go @@ -5,13 +5,11 @@ import ( "github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median" "github.com/smartcontractkit/libocr/offchainreporting2/types" - - "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" ) type ConfigTracker struct { stateCache *StateCache - reader client.Reader + getReader GetReader } func (c *ConfigTracker) Notify() <-chan struct{} { @@ -75,5 +73,9 @@ func (c *ConfigTracker) LatestConfig(ctx context.Context, changedInBlock uint64) // LatestBlockHeight returns the height of the most recent block in the chain. func (c *ConfigTracker) LatestBlockHeight(ctx context.Context) (blockHeight uint64, err error) { - return c.reader.SlotHeight(ctx) // this returns the latest slot height through CommitmentProcessed + reader, err := c.getReader() + if err != nil { + return 0, err + } + return reader.SlotHeight(ctx) // this returns the latest slot height through CommitmentProcessed } diff --git a/pkg/solana/config_tracker_test.go b/pkg/solana/config_tracker_test.go index 1e88d4ecd..d0e2d8625 100644 --- a/pkg/solana/config_tracker_test.go +++ b/pkg/solana/config_tracker_test.go @@ -8,6 +8,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" ) func TestLatestBlockHeight(t *testing.T) { @@ -19,7 +21,7 @@ func TestLatestBlockHeight(t *testing.T) { ctx := context.Background() c := &ConfigTracker{ - reader: testSetupReader(t, mockServer.URL), + getReader: func() (client.Reader, error) { return testSetupReader(t, mockServer.URL), nil }, } h, err := c.LatestBlockHeight(ctx) diff --git a/pkg/solana/relay.go b/pkg/solana/relay.go index 8266293ef..d98ab0442 100644 --- a/pkg/solana/relay.go +++ b/pkg/solana/relay.go @@ -154,7 +154,8 @@ func (r *Relayer) NewMedianProvider(ctx context.Context, rargs relaytypes.RelayA } cfg := configWatcher.chain.Config() - transmissionsCache := NewTransmissionsCache(transmissionsID, relayConfig.ChainID, cfg, configWatcher.reader, r.lggr) + getReader := func() (client.AccountReader, error) { return configWatcher.chain.Reader() } + transmissionsCache := NewTransmissionsCache(transmissionsID, relayConfig.ChainID, cfg, getReader, r.lggr) return &medianProvider{ configProvider: configWatcher, transmissionsCache: transmissionsCache, @@ -169,7 +170,7 @@ func (r *Relayer) NewMedianProvider(ctx context.Context, rargs relaytypes.RelayA storeProgramID: configWatcher.storeProgramID, transmissionsID: transmissionsID, transmissionSigner: transmitterAccount, - reader: configWatcher.reader, + getReader: configWatcher.chain.Reader, stateCache: configWatcher.stateCache, lggr: r.lggr, txManager: configWatcher.chain.TxManager(), @@ -195,7 +196,6 @@ type configProvider struct { offchainConfigDigester types.OffchainConfigDigester configTracker types.ContractConfigTracker chain Chain - reader client.Reader } func newConfigProvider(_ context.Context, lggr logger.Logger, chain Chain, args relaytypes.RelayArgs) (*configProvider, error) { @@ -222,11 +222,8 @@ func newConfigProvider(_ context.Context, lggr logger.Logger, chain Chain, args StateID: stateID, } - reader, err := chain.Reader() - if err != nil { - return nil, fmt.Errorf("error in NewMedianProvider.chain.Reader: %w", err) - } - stateCache := NewStateCache(stateID, relayConfig.ChainID, chain.Config(), reader, lggr) + getAccountReader := func() (client.AccountReader, error) { return chain.Reader() } + stateCache := NewStateCache(stateID, relayConfig.ChainID, chain.Config(), getAccountReader, lggr) return &configProvider{ chainID: relayConfig.ChainID, stateID: stateID, @@ -234,9 +231,8 @@ func newConfigProvider(_ context.Context, lggr logger.Logger, chain Chain, args storeProgramID: storeProgramID, stateCache: stateCache, offchainConfigDigester: offchainConfigDigester, - configTracker: &ConfigTracker{stateCache: stateCache, reader: reader}, + configTracker: &ConfigTracker{stateCache: stateCache, getReader: chain.Reader}, chain: chain, - reader: reader, }, nil } diff --git a/pkg/solana/state_cache.go b/pkg/solana/state_cache.go index 9faa766d0..4f6f2b084 100644 --- a/pkg/solana/state_cache.go +++ b/pkg/solana/state_cache.go @@ -23,16 +23,24 @@ type StateCache struct { *client.Cache[State] } -func NewStateCache(stateID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *StateCache { +type GetReader func() (client.Reader, error) +type GetAccountReader func() (client.AccountReader, error) + +func NewStateCache(stateID solana.PublicKey, chainID string, cfg config.Config, getReader GetAccountReader, lggr logger.Logger) *StateCache { name := "ocr2_median_state" getter := func(ctx context.Context) (State, uint64, error) { - return GetState(ctx, reader, stateID, cfg.Commitment()) + return GetState(ctx, getReader, stateID, cfg.Commitment()) } return &StateCache{client.NewCache(name, stateID, chainID, cfg, getter, logger.With(lggr, "cache", name))} } -func GetState(ctx context.Context, reader client.AccountReader, account solana.PublicKey, commitment rpc.CommitmentType) (State, uint64, error) { - res, err := reader.GetAccountInfoWithOpts(ctx, account, &rpc.GetAccountInfoOpts{ +func GetState(ctx context.Context, getReader GetAccountReader, account solana.PublicKey, commitment rpc.CommitmentType) (State, uint64, error) { + r, err := getReader() + if err != nil { + return State{}, 0, fmt.Errorf("failed to get reader: %w", err) + } + + res, err := r.GetAccountInfoWithOpts(ctx, account, &rpc.GetAccountInfoOpts{ Commitment: commitment, Encoding: "base64", }) diff --git a/pkg/solana/transmissions_cache.go b/pkg/solana/transmissions_cache.go index 75ad30a6b..acc530cbb 100644 --- a/pkg/solana/transmissions_cache.go +++ b/pkg/solana/transmissions_cache.go @@ -19,19 +19,25 @@ type TransmissionsCache struct { *client.Cache[Answer] } -func NewTransmissionsCache(transmissionsID solana.PublicKey, chainID string, cfg config.Config, reader client.Reader, lggr logger.Logger) *TransmissionsCache { +func NewTransmissionsCache(transmissionsID solana.PublicKey, chainID string, cfg config.Config, getReader GetAccountReader, lggr logger.Logger) *TransmissionsCache { name := "ocr2_median_transmissions" getter := func(ctx context.Context) (Answer, uint64, error) { - return GetLatestTransmission(ctx, reader, transmissionsID, cfg.Commitment()) + return GetLatestTransmission(ctx, getReader, transmissionsID, cfg.Commitment()) } return &TransmissionsCache{client.NewCache(name, transmissionsID, chainID, cfg, getter, logger.With(lggr, "cache", name))} } -func GetLatestTransmission(ctx context.Context, reader client.AccountReader, account solana.PublicKey, commitment rpc.CommitmentType) (Answer, uint64, error) { +func GetLatestTransmission(ctx context.Context, getReader GetAccountReader, account solana.PublicKey, commitment rpc.CommitmentType) (Answer, uint64, error) { // query for transmission header headerStart := AccountDiscriminatorLen // skip account discriminator headerLen := TransmissionsHeaderLen - res, err := reader.GetAccountInfoWithOpts(ctx, account, &rpc.GetAccountInfoOpts{ + + r, err := getReader() + if err != nil { + return Answer{}, 0, fmt.Errorf("failed to get reader: %w", err) + } + + res, err := r.GetAccountInfoWithOpts(ctx, account, &rpc.GetAccountInfoOpts{ Encoding: "base64", Commitment: commitment, DataSlice: &rpc.DataSlice{ @@ -71,7 +77,7 @@ func GetLatestTransmission(ctx context.Context, reader client.AccountReader, acc transmissionOffset := AccountDiscriminatorLen + TransmissionsHeaderMaxSize + (uint64(cursor) * transmissionLen) - res, err = reader.GetAccountInfoWithOpts(ctx, account, &rpc.GetAccountInfoOpts{ + res, err = r.GetAccountInfoWithOpts(ctx, account, &rpc.GetAccountInfoOpts{ Encoding: "base64", Commitment: commitment, DataSlice: &rpc.DataSlice{ diff --git a/pkg/solana/transmitter.go b/pkg/solana/transmitter.go index a488730d0..951e9633e 100644 --- a/pkg/solana/transmitter.go +++ b/pkg/solana/transmitter.go @@ -11,15 +11,13 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils" - - "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" ) var _ types.ContractTransmitter = (*Transmitter)(nil) type Transmitter struct { stateID, programID, storeProgramID, transmissionsID, transmissionSigner solana.PublicKey - reader client.Reader + getReader GetReader stateCache *StateCache lggr logger.Logger txManager TxManager @@ -32,7 +30,12 @@ func (c *Transmitter) Transmit( report types.Report, sigs []types.AttributedOnchainSignature, ) error { - blockhash, err := c.reader.LatestBlockhash(ctx) + reader, err := c.getReader() + if err != nil { + return fmt.Errorf("error on Transmit.Reader: %w", err) + } + + blockhash, err := reader.LatestBlockhash(ctx) if err != nil { return fmt.Errorf("error on Transmit.GetRecentBlockhash: %w", err) } diff --git a/pkg/solana/transmitter_test.go b/pkg/solana/transmitter_test.go index 6aef6c921..1d058d36a 100644 --- a/pkg/solana/transmitter_test.go +++ b/pkg/solana/transmitter_test.go @@ -14,6 +14,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/client" clientmocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/client/mocks" "github.com/smartcontractkit/chainlink-solana/pkg/solana/fees" "github.com/smartcontractkit/chainlink-solana/pkg/solana/txm" @@ -68,7 +69,7 @@ func TestTransmitter_TxSize(t *testing.T) { storeProgramID: mustNewRandomPublicKey(), transmissionsID: mustNewRandomPublicKey(), transmissionSigner: signer.PublicKey(), - reader: rw, + getReader: func() (client.Reader, error) { return rw, nil }, stateCache: &StateCache{}, lggr: logger.Test(t), txManager: mockTxm,