From c55c696a40eb0386c8cb03e69b6b7f0de2af5cd6 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 22 Jan 2025 01:38:05 +0400 Subject: [PATCH] v2.2.0 feat(cl/el): support multiple nodes (#1964) * beacon/goclient: support multiple nodes * eth/executionclient: support multiple nodes * eth/executionclient: implement round-robin client switch on failure * eth/executionclient: assert node configs are same * eth/executionclient: unit tests for doCall method * eth/executionclient: fix linter * temporary use ";" as separator because "," is used as separator by deployment bot * return what spec field mismatches * don't check INTERVALS_PER_SLOT * check all genesis fields * remove struct comparison * eth/executionclient: fix a bug with address * k8s: temporary tcpproxy pod * k8s: temporary tcpproxy binary * fix linter * beacon/goclient: allow delayed start * beacon/goclient: move version check and log * eth/executionclient: get rid of reconnect * eth/executionclient: attempt to fix incorrect removal of reconnect * eth/executionclient: try another implementation * eth/executionclient: handle sync tolerance * beacon/goclient: fix unit tests * fix linter * eth/executionclient: fail if any client cannot be dialed on start * eth/executionclient: consider sync distance when checking if node is healthy * eth/executionclient: disconnect if stream fails * eth/executionclient: add some managed client tests * fix linter * initial implementation of execution multi client * delete tcpproxy * multi client options * implement ethereum.LogFilterer * log call failure * make FetchHistoricalLogs asynchronous * fix linter * behave like single client if there's only one client * add some multi client unit tests * more unit tests * call reconnect * cleanup debug logs * add HeaderByNumber to multi client * resolve PR review comments * add a comment about a closure * simplify condition for Fatal log * change context methods to functions * fix bug with no logs in chan * make sure execution client is healthy before fetching logs * fix linter * switch to the next client if the current one is not healthy * more unit tests * draft: healthy streaming * fix build * ec -> mc * returned multi err in call * is_healthy -> isHealthy * Revert "draft: healthy streaming" This reverts commit 1c5d7080 * simplify FetchHistoricalLogs * fix tests * get rid of reconnect call * add a comment for StreamLogs * add healthy channel * update comment about separator * refactor MultiClient to use atomic operations and concurrency pool for health checks * fix lint * Healthy(): dont fail if at least one is heathy * dont switch client if no error * undo second client index bump removal * fix client skipping * fix * no shadowing * advance currentClientIndex to next client on error * comment * add a comment about Fatal log * wait for other clients in SubmitBlindedBeaconBlock * refactor multi client health check * fix atLeastOneSubmitted usage * refactor * refactor Healthy() * assert only same genesis * remove double return * double default syncDistanceTolerance to 8 * reduce sync distance tolerance to 6 * use time.Equal --------- Co-authored-by: zippoxer Co-authored-by: y0sher --- beacon/goclient/aggregator.go | 8 +- beacon/goclient/attest.go | 12 +- beacon/goclient/attest_test.go | 28 +- beacon/goclient/committee_subscribe.go | 8 +- beacon/goclient/goclient.go | 190 ++- beacon/goclient/goclient_test.go | 7 +- beacon/goclient/proposer.go | 92 +- beacon/goclient/signing.go | 12 +- beacon/goclient/sync_committee.go | 12 +- .../goclient/sync_committee_contribution.go | 12 +- beacon/goclient/validator.go | 4 +- beacon/goclient/voluntary_exit.go | 2 +- cli/operator/node.go | 60 +- eth/executionclient/config.go | 4 +- eth/executionclient/defaults.go | 4 + eth/executionclient/execution_client.go | 105 +- eth/executionclient/execution_client_test.go | 5 +- eth/executionclient/mocks.go | 395 +++++ eth/executionclient/multi_client.go | 365 +++++ eth/executionclient/multi_client_test.go | 1452 +++++++++++++++++ eth/executionclient/options.go | 67 + operator/duties/scheduler.go | 2 +- operator/node.go | 4 +- protocol/v2/blockchain/beacon/client.go | 2 +- 24 files changed, 2707 insertions(+), 145 deletions(-) create mode 100644 eth/executionclient/mocks.go create mode 100644 eth/executionclient/multi_client.go create mode 100644 eth/executionclient/multi_client_test.go diff --git a/beacon/goclient/aggregator.go b/beacon/goclient/aggregator.go index 5a9c2652db..27a8adc958 100644 --- a/beacon/goclient/aggregator.go +++ b/beacon/goclient/aggregator.go @@ -41,11 +41,11 @@ func (gc *GoClient) SubmitAggregateSelectionProof(slot phase0.Slot, committeeInd } aggDataReqStart := time.Now() - aggDataResp, err := gc.client.AggregateAttestation(gc.ctx, &api.AggregateAttestationOpts{ + aggDataResp, err := gc.multiClient.AggregateAttestation(gc.ctx, &api.AggregateAttestationOpts{ Slot: slot, AttestationDataRoot: root, }) - recordRequestDuration(gc.ctx, "AggregateAttestation", gc.client.Address(), http.MethodGet, time.Since(aggDataReqStart), err) + recordRequestDuration(gc.ctx, "AggregateAttestation", gc.multiClient.Address(), http.MethodGet, time.Since(aggDataReqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "AggregateAttestation"), @@ -79,8 +79,8 @@ func (gc *GoClient) SubmitAggregateSelectionProof(slot phase0.Slot, committeeInd // SubmitSignedAggregateSelectionProof broadcasts a signed aggregator msg func (gc *GoClient) SubmitSignedAggregateSelectionProof(msg *phase0.SignedAggregateAndProof) error { start := time.Now() - err := gc.client.SubmitAggregateAttestations(gc.ctx, []*phase0.SignedAggregateAndProof{msg}) - recordRequestDuration(gc.ctx, "SubmitAggregateAttestations", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitAggregateAttestations(gc.ctx, []*phase0.SignedAggregateAndProof{msg}) + recordRequestDuration(gc.ctx, "SubmitAggregateAttestations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) return err } diff --git a/beacon/goclient/attest.go b/beacon/goclient/attest.go index edca50866d..7561e89fd9 100644 --- a/beacon/goclient/attest.go +++ b/beacon/goclient/attest.go @@ -17,11 +17,11 @@ import ( // AttesterDuties returns attester duties for a given epoch. func (gc *GoClient) AttesterDuties(ctx context.Context, epoch phase0.Epoch, validatorIndices []phase0.ValidatorIndex) ([]*eth2apiv1.AttesterDuty, error) { start := time.Now() - resp, err := gc.client.AttesterDuties(ctx, &api.AttesterDutiesOpts{ + resp, err := gc.multiClient.AttesterDuties(ctx, &api.AttesterDutiesOpts{ Epoch: epoch, Indices: validatorIndices, }) - recordRequestDuration(gc.ctx, "AttesterDuties", gc.client.Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "AttesterDuties", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "AttesterDuties"), @@ -61,11 +61,11 @@ func (gc *GoClient) GetAttestationData(slot phase0.Slot, committeeIndex phase0.C // Have to make beacon node request and cache the result. result, err, _ := gc.attestationReqInflight.Do(slot, func() (*phase0.AttestationData, error) { attDataReqStart := time.Now() - resp, err := gc.client.AttestationData(gc.ctx, &api.AttestationDataOpts{ + resp, err := gc.multiClient.AttestationData(gc.ctx, &api.AttestationDataOpts{ Slot: slot, }) - recordRequestDuration(gc.ctx, "AttestationData", gc.client.Address(), http.MethodGet, time.Since(attDataReqStart), err) + recordRequestDuration(gc.ctx, "AttestationData", gc.multiClient.Address(), http.MethodGet, time.Since(attDataReqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, @@ -123,8 +123,8 @@ func withCommitteeIndex(data *phase0.AttestationData, committeeIndex phase0.Comm // SubmitAttestations implements Beacon interface func (gc *GoClient) SubmitAttestations(attestations []*phase0.Attestation) error { start := time.Now() - err := gc.client.SubmitAttestations(gc.ctx, attestations) - recordRequestDuration(gc.ctx, "SubmitAttestations", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitAttestations(gc.ctx, attestations) + recordRequestDuration(gc.ctx, "SubmitAttestations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitAttestations"), diff --git a/beacon/goclient/attest_test.go b/beacon/goclient/attest_test.go index e3f653e760..9f1b52d52d 100644 --- a/beacon/goclient/attest_test.go +++ b/beacon/goclient/attest_test.go @@ -14,13 +14,14 @@ import ( "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/sourcegraph/conc/pool" "github.com/ssvlabs/ssv-spec/types" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + operatordatastore "github.com/ssvlabs/ssv/operator/datastore" "github.com/ssvlabs/ssv/operator/slotticker" "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon" registrystorage "github.com/ssvlabs/ssv/registry/storage" "github.com/ssvlabs/ssv/utils/hashmap" - "github.com/stretchr/testify/require" - "go.uber.org/zap" ) func TestGoClient_GetAttestationData(t *testing.T) { @@ -44,6 +45,29 @@ func TestGoClient_GetAttestationData(t *testing.T) { t.Logf("mock server handling request: %s", r.URL.Path) expInitRequests := map[string][]byte{ + "/eth/v1/config/spec": []byte(`{ + "data": { + "CONFIG_NAME": "holesky", + "GENESIS_FORK_VERSION": "0x01017000", + "CAPELLA_FORK_VERSION": "0x04017000", + "MIN_GENESIS_TIME": "1695902100", + "SECONDS_PER_SLOT": "12", + "SLOTS_PER_EPOCH": "32", + "EPOCHS_PER_SYNC_COMMITTEE_PERIOD": "256", + "SYNC_COMMITTEE_SIZE": "512", + "SYNC_COMMITTEE_SUBNET_COUNT": "4", + "TARGET_AGGREGATORS_PER_COMMITTEE": "16", + "TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE": "16", + "INTERVALS_PER_SLOT": "3" + } + }`), + "/eth/v1/beacon/genesis": []byte(`{ + "data": { + "genesis_time": "1695902400", + "genesis_validators_root": "0x9143aa7c615a7f7115e2b6aac319c03529df8242ae705fba9df39b79c59fa8b1", + "genesis_fork_version": "0x01017000" + } + }`), "/eth/v1/node/syncing": []byte(`{ "data": { "head_slot": "4239945", diff --git a/beacon/goclient/committee_subscribe.go b/beacon/goclient/committee_subscribe.go index 91783c0952..3e10d761ea 100644 --- a/beacon/goclient/committee_subscribe.go +++ b/beacon/goclient/committee_subscribe.go @@ -12,8 +12,8 @@ import ( // SubmitBeaconCommitteeSubscriptions is implementation for subscribing committee to subnet (p2p topic) func (gc *GoClient) SubmitBeaconCommitteeSubscriptions(ctx context.Context, subscription []*eth2apiv1.BeaconCommitteeSubscription) error { start := time.Now() - err := gc.client.SubmitBeaconCommitteeSubscriptions(ctx, subscription) - recordRequestDuration(gc.ctx, "SubmitBeaconCommitteeSubscriptions", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitBeaconCommitteeSubscriptions(ctx, subscription) + recordRequestDuration(gc.ctx, "SubmitBeaconCommitteeSubscriptions", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitBeaconCommitteeSubscriptions"), @@ -26,8 +26,8 @@ func (gc *GoClient) SubmitBeaconCommitteeSubscriptions(ctx context.Context, subs // SubmitSyncCommitteeSubscriptions is implementation for subscribing sync committee to subnet (p2p topic) func (gc *GoClient) SubmitSyncCommitteeSubscriptions(ctx context.Context, subscription []*eth2apiv1.SyncCommitteeSubscription) error { start := time.Now() - err := gc.client.SubmitSyncCommitteeSubscriptions(ctx, subscription) - recordRequestDuration(gc.ctx, "SubmitSyncCommitteeSubscriptions", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitSyncCommitteeSubscriptions(ctx, subscription) + recordRequestDuration(gc.ctx, "SubmitSyncCommitteeSubscriptions", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitSyncCommitteeSubscriptions"), diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index 73dc90fbaf..d7c77a48cf 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -12,15 +12,16 @@ import ( "github.com/attestantio/go-eth2-client/api" apiv1 "github.com/attestantio/go-eth2-client/api/v1" eth2clienthttp "github.com/attestantio/go-eth2-client/http" + eth2clientmulti "github.com/attestantio/go-eth2-client/multi" "github.com/attestantio/go-eth2-client/spec" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/jellydator/ttlcache/v3" "github.com/pkg/errors" "github.com/rs/zerolog" + spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" "tailscale.com/util/singleflight" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/logging/fields" operatordatastore "github.com/ssvlabs/ssv/operator/datastore" "github.com/ssvlabs/ssv/operator/slotticker" @@ -69,9 +70,15 @@ func ParseNodeClient(version string) NodeClient { // Client defines all go-eth2-client interfaces used in ssv type Client interface { - eth2client.Service + MultiClient + eth2client.NodeVersionProvider eth2client.NodeClientProvider + eth2client.BlindedProposalSubmitter +} + +type MultiClient interface { + eth2client.Service eth2client.SpecProvider eth2client.GenesisProvider @@ -87,7 +94,6 @@ type Client interface { eth2client.NodeSyncingProvider eth2client.ProposalProvider eth2client.ProposalSubmitter - eth2client.BlindedProposalSubmitter eth2client.DomainProvider eth2client.SyncCommitteeMessagesSubmitter eth2client.BeaconBlockRootProvider @@ -100,20 +106,13 @@ type Client interface { eth2client.VoluntaryExitSubmitter } -type NodeClientProvider interface { - NodeClient() NodeClient -} - -var _ NodeClientProvider = (*GoClient)(nil) - // GoClient implementing Beacon struct type GoClient struct { log *zap.Logger ctx context.Context network beaconprotocol.Network - client Client - nodeVersion string - nodeClient NodeClient + clients []Client + multiClient MultiClient syncDistanceTolerance phase0.Slot nodeSyncingFn func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) @@ -155,27 +154,72 @@ func New( longTimeout = DefaultLongTimeout } - httpClient, err := eth2clienthttp.New(opt.Context, - // WithAddress supplies the address of the beacon node, in host:port format. - eth2clienthttp.WithAddress(opt.BeaconNodeAddr), - // LogLevel supplies the level of logging to carry out. - eth2clienthttp.WithLogLevel(zerolog.DebugLevel), - eth2clienthttp.WithTimeout(commonTimeout), - eth2clienthttp.WithReducedMemoryUsage(true), + beaconAddrList := strings.Split(opt.BeaconNodeAddr, ";") // TODO: Decide what symbol to use as a separator. Bootnodes are currently separated by ";". Deployment bot currently uses ",". + if len(beaconAddrList) == 0 { + return nil, fmt.Errorf("no beacon node address provided") + } + + var consensusClients []Client + var consensusClientsAsServices []eth2client.Service + for _, beaconAddr := range beaconAddrList { + httpClient, err := setupHTTPClient(opt.Context, logger, beaconAddr, commonTimeout) + if err != nil { + return nil, err + } + + nodeVersionResp, err := httpClient.NodeVersion(opt.Context, &api.NodeVersionOpts{}) + if err != nil { + logger.Error(clResponseErrMsg, + zap.String("api", "NodeVersion"), + zap.Error(err), + ) + return nil, fmt.Errorf("failed to get node version: %w", err) + } + if nodeVersionResp == nil { + logger.Error(clNilResponseErrMsg, + zap.String("api", "NodeVersion"), + ) + return nil, fmt.Errorf("node version response is nil") + } + + logger.Info("consensus client connected", + fields.Name(httpClient.Name()), + fields.Address(httpClient.Address()), + zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))), + zap.String("version", nodeVersionResp.Data), + ) + + consensusClients = append(consensusClients, httpClient) + consensusClientsAsServices = append(consensusClientsAsServices, httpClient) + } + + err := assertSameGenesis(opt.Context, consensusClients...) + if err != nil { + return nil, fmt.Errorf("assert same spec: %w", err) + } + + multiClient, err := eth2clientmulti.New( + opt.Context, + eth2clientmulti.WithClients(consensusClientsAsServices), + eth2clientmulti.WithLogLevel(zerolog.DebugLevel), + eth2clientmulti.WithTimeout(commonTimeout), ) if err != nil { - logger.Error("Consensus client initialization failed", + logger.Error("Consensus multi client initialization failed", zap.String("address", opt.BeaconNodeAddr), zap.Error(err), ) - return nil, fmt.Errorf("failed to create http client: %w", err) + + return nil, fmt.Errorf("create multi client: %w", err) } + consensusClient := multiClient.(*eth2clientmulti.Service) client := &GoClient{ log: logger, ctx: opt.Context, network: opt.Network, - client: httpClient.(*eth2clienthttp.Service), + clients: consensusClients, + multiClient: consensusClient, syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance), operatorDataStore: operatorDataStore, registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{}, @@ -190,43 +234,79 @@ func New( client.nodeSyncingFn = client.nodeSyncing - nodeVersionResp, err := client.client.NodeVersion(opt.Context, &api.NodeVersionOpts{}) + go client.registrationSubmitter(slotTickerProvider) + // Start automatic expired item deletion for attestationDataCache. + go client.attestationDataCache.Start() + + return client, nil +} + +func setupHTTPClient(ctx context.Context, logger *zap.Logger, addr string, commonTimeout time.Duration) (*eth2clienthttp.Service, error) { + httpClient, err := eth2clienthttp.New( + ctx, + // WithAddress supplies the address of the beacon node, in host:port format. + eth2clienthttp.WithAddress(addr), + // LogLevel supplies the level of logging to carry out. + eth2clienthttp.WithLogLevel(zerolog.DebugLevel), + eth2clienthttp.WithTimeout(commonTimeout), + eth2clienthttp.WithReducedMemoryUsage(true), + eth2clienthttp.WithAllowDelayedStart(true), + ) if err != nil { - logger.Error(clResponseErrMsg, - zap.String("api", "NodeVersion"), + logger.Error("Consensus http client initialization failed", + zap.String("address", addr), zap.Error(err), ) - return nil, fmt.Errorf("failed to get node version: %w", err) + + return nil, fmt.Errorf("create http client: %w", err) } - if nodeVersionResp == nil { - logger.Error(clNilResponseErrMsg, - zap.String("api", "NodeVersion"), - ) - return nil, fmt.Errorf("node version response is nil") + + return httpClient.(*eth2clienthttp.Service), nil +} + +// assertSameGenesis should receive a non-empty list +func assertSameGenesis(ctx context.Context, services ...Client) error { + firstGenesis, err := services[0].Genesis(ctx, &api.GenesisOpts{}) + if err != nil { + return fmt.Errorf("get first genesis: %w", err) } - client.nodeVersion = nodeVersionResp.Data - client.nodeClient = ParseNodeClient(nodeVersionResp.Data) - - logger.Info("consensus client connected", - fields.Name(httpClient.Name()), - fields.Address(httpClient.Address()), - zap.String("client", string(client.nodeClient)), - zap.String("version", client.nodeVersion), - ) - go client.registrationSubmitter(slotTickerProvider) - // Start automatic expired item deletion for attestationDataCache. - go client.attestationDataCache.Start() + for _, service := range services[1:] { + srvGenesis, err := service.Genesis(ctx, &api.GenesisOpts{}) + if err != nil { + return fmt.Errorf("get service genesis: %w", err) + } - return client, nil + if err := sameGenesis(firstGenesis.Data, srvGenesis.Data); err != nil { + return fmt.Errorf("different genesis: %w", err) + } + } + + return nil } -func (gc *GoClient) nodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) { - return gc.client.NodeSyncing(ctx, opts) +func sameGenesis(a, b *apiv1.Genesis) error { + if a == nil || b == nil { // Input parameters should never be nil, so the check may fail if both are nil + return fmt.Errorf("genesis is nil") + } + + if !a.GenesisTime.Equal(b.GenesisTime) { + return fmt.Errorf("genesis time mismatch, got %v and %v", a.GenesisTime, b.GenesisTime) + } + + if a.GenesisValidatorsRoot != b.GenesisValidatorsRoot { + return fmt.Errorf("genesis validators root mismatch, got %v and %v", a.GenesisValidatorsRoot, b.GenesisValidatorsRoot) + } + + if a.GenesisForkVersion != b.GenesisForkVersion { + return fmt.Errorf("genesis fork version mismatch, got %v and %v", a.GenesisForkVersion, b.GenesisForkVersion) + } + + return nil } -func (gc *GoClient) NodeClient() NodeClient { - return gc.nodeClient +func (gc *GoClient) nodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) { + return gc.multiClient.NodeSyncing(ctx, opts) } var errSyncing = errors.New("syncing") @@ -241,26 +321,26 @@ func (gc *GoClient) Healthy(ctx context.Context) error { zap.Error(err), ) // TODO: get rid of global variable, pass metrics to goClient - recordBeaconClientStatus(ctx, statusUnknown, gc.client.Address()) + recordBeaconClientStatus(ctx, statusUnknown, gc.multiClient.Address()) return fmt.Errorf("failed to obtain node syncing status: %w", err) } if nodeSyncingResp == nil { gc.log.Error(clNilResponseErrMsg, zap.String("api", "NodeSyncing"), ) - recordBeaconClientStatus(ctx, statusUnknown, gc.client.Address()) + recordBeaconClientStatus(ctx, statusUnknown, gc.multiClient.Address()) return fmt.Errorf("node syncing response is nil") } if nodeSyncingResp.Data == nil { gc.log.Error(clNilResponseDataErrMsg, zap.String("api", "NodeSyncing"), ) - recordBeaconClientStatus(ctx, statusUnknown, gc.client.Address()) + recordBeaconClientStatus(ctx, statusUnknown, gc.multiClient.Address()) return fmt.Errorf("node syncing data is nil") } syncState := nodeSyncingResp.Data - recordBeaconClientStatus(ctx, statusSyncing, gc.client.Address()) - recordSyncDistance(ctx, syncState.SyncDistance, gc.client.Address()) + recordBeaconClientStatus(ctx, statusSyncing, gc.multiClient.Address()) + recordSyncDistance(ctx, syncState.SyncDistance, gc.multiClient.Address()) // TODO: also check if syncState.ElOffline when github.com/attestantio/go-eth2-client supports it if syncState.IsSyncing && syncState.SyncDistance > gc.syncDistanceTolerance { @@ -272,7 +352,7 @@ func (gc *GoClient) Healthy(ctx context.Context) error { return fmt.Errorf("optimistic") } - recordBeaconClientStatus(ctx, statusSynced, gc.client.Address()) + recordBeaconClientStatus(ctx, statusSynced, gc.multiClient.Address()) return nil } @@ -291,7 +371,7 @@ func (gc *GoClient) slotStartTime(slot phase0.Slot) time.Time { } func (gc *GoClient) Events(ctx context.Context, topics []string, handler eth2client.EventHandlerFunc) error { - if err := gc.client.Events(ctx, topics, handler); err != nil { + if err := gc.multiClient.Events(ctx, topics, handler); err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "Events"), zap.Error(err), diff --git a/beacon/goclient/goclient_test.go b/beacon/goclient/goclient_test.go index b9c0e4b51a..7f608d9b1e 100644 --- a/beacon/goclient/goclient_test.go +++ b/beacon/goclient/goclient_test.go @@ -14,12 +14,13 @@ import ( v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/ssvlabs/ssv-spec/types" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + operatordatastore "github.com/ssvlabs/ssv/operator/datastore" "github.com/ssvlabs/ssv/operator/slotticker" "github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon" registrystorage "github.com/ssvlabs/ssv/registry/storage" - "github.com/stretchr/testify/require" - "go.uber.org/zap" ) func TestHealthy(t *testing.T) { @@ -87,7 +88,7 @@ func TestTimeouts(t *testing.T) { return nil }) _, err := mockClient(ctx, undialableServer.URL, commonTimeout, longTimeout) - require.ErrorContains(t, err, "client is not active") + require.ErrorContains(t, err, "context deadline exceeded") } // Too slow to respond to the Validators request. diff --git a/beacon/goclient/proposer.go b/beacon/goclient/proposer.go index 798a713a5e..df93c6ff85 100644 --- a/beacon/goclient/proposer.go +++ b/beacon/goclient/proposer.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "sync/atomic" "time" "github.com/attestantio/go-eth2-client/api" @@ -16,10 +17,12 @@ import ( "github.com/attestantio/go-eth2-client/spec/deneb" "github.com/attestantio/go-eth2-client/spec/phase0" ssz "github.com/ferranbt/fastssz" + "github.com/sourcegraph/conc/pool" spectypes "github.com/ssvlabs/ssv-spec/types" + "go.uber.org/zap" + "github.com/ssvlabs/ssv/logging/fields" "github.com/ssvlabs/ssv/operator/slotticker" - "go.uber.org/zap" ) const ( @@ -29,11 +32,11 @@ const ( // ProposerDuties returns proposer duties for the given epoch. func (gc *GoClient) ProposerDuties(ctx context.Context, epoch phase0.Epoch, validatorIndices []phase0.ValidatorIndex) ([]*eth2apiv1.ProposerDuty, error) { start := time.Now() - resp, err := gc.client.ProposerDuties(ctx, &api.ProposerDutiesOpts{ + resp, err := gc.multiClient.ProposerDuties(ctx, &api.ProposerDutiesOpts{ Epoch: epoch, Indices: validatorIndices, }) - recordRequestDuration(gc.ctx, "ProposerDuties", gc.client.Address(), http.MethodGet, time.Since(start), err) + recordRequestDuration(gc.ctx, "ProposerDuties", gc.multiClient.Address(), http.MethodGet, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, @@ -61,13 +64,13 @@ func (gc *GoClient) GetBeaconBlock(slot phase0.Slot, graffitiBytes, randao []byt copy(graffiti[:], graffitiBytes[:]) reqStart := time.Now() - proposalResp, err := gc.client.Proposal(gc.ctx, &api.ProposalOpts{ + proposalResp, err := gc.multiClient.Proposal(gc.ctx, &api.ProposalOpts{ Slot: slot, RandaoReveal: sig, Graffiti: graffiti, SkipRandaoVerification: false, }) - recordRequestDuration(gc.ctx, "Proposal", gc.client.Address(), http.MethodGet, time.Since(reqStart), err) + recordRequestDuration(gc.ctx, "Proposal", gc.multiClient.Address(), http.MethodGet, time.Since(reqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, @@ -187,17 +190,64 @@ func (gc *GoClient) SubmitBlindedBeaconBlock(block *api.VersionedBlindedProposal Proposal: signedBlock, } - start := time.Now() - err := gc.client.SubmitBlindedProposal(gc.ctx, opts) - recordRequestDuration(gc.ctx, "SubmitBlindedProposal", gc.client.Address(), http.MethodPost, time.Since(start), err) - if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitBlindedProposal"), - zap.Error(err), - ) + // As gc.multiClient doesn't have a method for blinded block submission + // (because it must be submitted to the same node that returned that block), + // we need to submit it to client(s) directly. + if len(gc.clients) == 1 { + start := time.Now() + err := gc.clients[0].SubmitBlindedProposal(gc.ctx, opts) + recordRequestDuration(gc.ctx, "SubmitBlindedProposal", gc.clients[0].Address(), http.MethodPost, time.Since(start), err) + if err != nil { + gc.log.Error(clResponseErrMsg, + zap.String("api", "SubmitBlindedProposal"), + zap.Error(err), + ) + return err + } + + return nil } - return err + // Although we got a blinded block from one node and that node has to submit it, + // other nodes might know this payload too and have a chance to submit it successfully. + // + // So we do the following: + // + // Submit the blinded proposal to all clients concurrently. + // If any client succeeds, cancel the remaining submissions. + // Wait for all submissions to finish or timeout after 1 minute. + // + // TODO: Make sure this the above is correct. Should we submit only to the node that returned the block? + + logger := gc.log.With(zap.String("api", "SubmitBlindedProposal")) + + submissions := atomic.Int32{} + p := pool.New().WithErrors().WithContext(gc.ctx) + for _, client := range gc.clients { + client := client + p.Go(func(ctx context.Context) error { + if err := client.SubmitBlindedProposal(ctx, opts); err == nil { + logger.Debug("consensus client returned an error while submitting blinded proposal. As at least one node must submit successfully, it's expected that some nodes may fail to submit.", + zap.String("client_addr", client.Address()), + zap.Error(err)) + return err + } + + submissions.Add(1) + return nil + }) + } + err := p.Wait() + if submissions.Load() > 0 { + // At least one client has submitted the proposal successfully, + // so we can return without error. + return nil + } + if err != nil { + logger.Error("no consensus clients have been able to submit blinded proposal. See adjacent logs for error details.") + return fmt.Errorf("no consensus clients have been able to submit blinded proposal") + } + return nil } // SubmitBeaconBlock submit the block to the node @@ -244,8 +294,8 @@ func (gc *GoClient) SubmitBeaconBlock(block *api.VersionedProposal, sig phase0.B } start := time.Now() - err := gc.client.SubmitProposal(gc.ctx, opts) - recordRequestDuration(gc.ctx, "SubmitProposal", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitProposal(gc.ctx, opts) + recordRequestDuration(gc.ctx, "SubmitProposal", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitProposal"), @@ -268,8 +318,8 @@ func (gc *GoClient) SubmitProposalPreparation(feeRecipients map[phase0.Validator }) } start := time.Now() - err := gc.client.SubmitProposalPreparations(gc.ctx, preparations) - recordRequestDuration(gc.ctx, "SubmitProposalPreparations", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitProposalPreparations(gc.ctx, preparations) + recordRequestDuration(gc.ctx, "SubmitProposalPreparations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitProposalPreparations"), @@ -363,10 +413,10 @@ func (gc *GoClient) submitBatchedRegistrations(slot phase0.Slot, registrations [ bs = len(registrations) } + // TODO: Do we need to submit them to all nodes? start := time.Now() - err := gc.client.SubmitValidatorRegistrations(gc.ctx, registrations[0:bs]) - - recordRequestDuration(gc.ctx, "SubmitValidatorRegistrations", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitValidatorRegistrations(gc.ctx, registrations[0:bs]) + recordRequestDuration(gc.ctx, "SubmitValidatorRegistrations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitValidatorRegistrations"), diff --git a/beacon/goclient/signing.go b/beacon/goclient/signing.go index 03ef3ddd30..034121a8b1 100644 --- a/beacon/goclient/signing.go +++ b/beacon/goclient/signing.go @@ -19,8 +19,8 @@ import ( func (gc *GoClient) computeVoluntaryExitDomain(ctx context.Context) (phase0.Domain, error) { start := time.Now() - specResponse, err := gc.client.Spec(gc.ctx, &api.SpecOpts{}) - recordRequestDuration(gc.ctx, "Spec", gc.client.Address(), http.MethodGet, time.Since(start), err) + specResponse, err := gc.multiClient.Spec(gc.ctx, &api.SpecOpts{}) + recordRequestDuration(gc.ctx, "Spec", gc.multiClient.Address(), http.MethodGet, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "Spec"), @@ -58,8 +58,8 @@ func (gc *GoClient) computeVoluntaryExitDomain(ctx context.Context) (phase0.Doma } start = time.Now() - genesisResponse, err := gc.client.Genesis(ctx, &api.GenesisOpts{}) - recordRequestDuration(gc.ctx, "Genesis", gc.client.Address(), http.MethodGet, time.Since(start), err) + genesisResponse, err := gc.multiClient.Genesis(ctx, &api.GenesisOpts{}) + recordRequestDuration(gc.ctx, "Genesis", gc.multiClient.Address(), http.MethodGet, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "Genesis"), @@ -113,8 +113,8 @@ func (gc *GoClient) DomainData(epoch phase0.Epoch, domain phase0.DomainType) (ph } start := time.Now() - data, err := gc.client.Domain(gc.ctx, domain, epoch) - recordRequestDuration(gc.ctx, "Domain", gc.client.Address(), http.MethodGet, time.Since(start), err) + data, err := gc.multiClient.Domain(gc.ctx, domain, epoch) + recordRequestDuration(gc.ctx, "Domain", gc.multiClient.Address(), http.MethodGet, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "Domain"), diff --git a/beacon/goclient/sync_committee.go b/beacon/goclient/sync_committee.go index 0b9e8494be..ac36b66766 100644 --- a/beacon/goclient/sync_committee.go +++ b/beacon/goclient/sync_committee.go @@ -18,11 +18,11 @@ import ( // SyncCommitteeDuties returns sync committee duties for a given epoch func (gc *GoClient) SyncCommitteeDuties(ctx context.Context, epoch phase0.Epoch, validatorIndices []phase0.ValidatorIndex) ([]*eth2apiv1.SyncCommitteeDuty, error) { reqStart := time.Now() - resp, err := gc.client.SyncCommitteeDuties(ctx, &api.SyncCommitteeDutiesOpts{ + resp, err := gc.multiClient.SyncCommitteeDuties(ctx, &api.SyncCommitteeDutiesOpts{ Epoch: epoch, Indices: validatorIndices, }) - recordRequestDuration(gc.ctx, "SyncCommitteeDuties", gc.client.Address(), http.MethodPost, time.Since(reqStart), err) + recordRequestDuration(gc.ctx, "SyncCommitteeDuties", gc.multiClient.Address(), http.MethodPost, time.Since(reqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SyncCommitteeDuties"), @@ -43,10 +43,10 @@ func (gc *GoClient) SyncCommitteeDuties(ctx context.Context, epoch phase0.Epoch, // GetSyncMessageBlockRoot returns beacon block root for sync committee func (gc *GoClient) GetSyncMessageBlockRoot(slot phase0.Slot) (phase0.Root, spec.DataVersion, error) { reqStart := time.Now() - resp, err := gc.client.BeaconBlockRoot(gc.ctx, &api.BeaconBlockRootOpts{ + resp, err := gc.multiClient.BeaconBlockRoot(gc.ctx, &api.BeaconBlockRootOpts{ Block: "head", }) - recordRequestDuration(gc.ctx, "BeaconBlockRoot", gc.client.Address(), http.MethodGet, time.Since(reqStart), err) + recordRequestDuration(gc.ctx, "BeaconBlockRoot", gc.multiClient.Address(), http.MethodGet, time.Since(reqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "BeaconBlockRoot"), @@ -74,8 +74,8 @@ func (gc *GoClient) GetSyncMessageBlockRoot(slot phase0.Slot) (phase0.Root, spec // SubmitSyncMessages submits a signed sync committee msg func (gc *GoClient) SubmitSyncMessages(msgs []*altair.SyncCommitteeMessage) error { reqStart := time.Now() - err := gc.client.SubmitSyncCommitteeMessages(gc.ctx, msgs) - recordRequestDuration(gc.ctx, "SubmitSyncCommitteeMessages", gc.client.Address(), http.MethodPost, time.Since(reqStart), err) + err := gc.multiClient.SubmitSyncCommitteeMessages(gc.ctx, msgs) + recordRequestDuration(gc.ctx, "SubmitSyncCommitteeMessages", gc.multiClient.Address(), http.MethodPost, time.Since(reqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitSyncCommitteeMessages"), diff --git a/beacon/goclient/sync_committee_contribution.go b/beacon/goclient/sync_committee_contribution.go index 26276823c1..a665d4ee78 100644 --- a/beacon/goclient/sync_committee_contribution.go +++ b/beacon/goclient/sync_committee_contribution.go @@ -45,10 +45,10 @@ func (gc *GoClient) GetSyncCommitteeContribution(slot phase0.Slot, selectionProo gc.waitForOneThirdSlotDuration(slot) scDataReqStart := time.Now() - beaconBlockRootResp, err := gc.client.BeaconBlockRoot(gc.ctx, &api.BeaconBlockRootOpts{ + beaconBlockRootResp, err := gc.multiClient.BeaconBlockRoot(gc.ctx, &api.BeaconBlockRootOpts{ Block: fmt.Sprint(slot), }) - recordRequestDuration(gc.ctx, "BeaconBlockRoot", gc.client.Address(), http.MethodGet, time.Since(scDataReqStart), err) + recordRequestDuration(gc.ctx, "BeaconBlockRoot", gc.multiClient.Address(), http.MethodGet, time.Since(scDataReqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "BeaconBlockRoot"), @@ -82,12 +82,12 @@ func (gc *GoClient) GetSyncCommitteeContribution(slot phase0.Slot, selectionProo index := i g.Go(func() error { start := time.Now() - syncCommitteeContrResp, err := gc.client.SyncCommitteeContribution(gc.ctx, &api.SyncCommitteeContributionOpts{ + syncCommitteeContrResp, err := gc.multiClient.SyncCommitteeContribution(gc.ctx, &api.SyncCommitteeContributionOpts{ Slot: slot, SubcommitteeIndex: subnetIDs[index], BeaconBlockRoot: *blockRoot, }) - recordRequestDuration(gc.ctx, "SyncCommitteeContribution", gc.client.Address(), http.MethodGet, time.Since(start), err) + recordRequestDuration(gc.ctx, "SyncCommitteeContribution", gc.multiClient.Address(), http.MethodGet, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SyncCommitteeContribution"), @@ -126,8 +126,8 @@ func (gc *GoClient) GetSyncCommitteeContribution(slot phase0.Slot, selectionProo // SubmitSignedContributionAndProof broadcasts to the network func (gc *GoClient) SubmitSignedContributionAndProof(contribution *altair.SignedContributionAndProof) error { start := time.Now() - err := gc.client.SubmitSyncCommitteeContributions(gc.ctx, []*altair.SignedContributionAndProof{contribution}) - recordRequestDuration(gc.ctx, "SubmitSyncCommitteeContributions", gc.client.Address(), http.MethodPost, time.Since(start), err) + err := gc.multiClient.SubmitSyncCommitteeContributions(gc.ctx, []*altair.SignedContributionAndProof{contribution}) + recordRequestDuration(gc.ctx, "SubmitSyncCommitteeContributions", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitSyncCommitteeContributions"), diff --git a/beacon/goclient/validator.go b/beacon/goclient/validator.go index b1825c3ef5..872af09623 100644 --- a/beacon/goclient/validator.go +++ b/beacon/goclient/validator.go @@ -14,12 +14,12 @@ import ( // GetValidatorData returns metadata (balance, index, status, more) for each pubkey from the node func (gc *GoClient) GetValidatorData(validatorPubKeys []phase0.BLSPubKey) (map[phase0.ValidatorIndex]*eth2apiv1.Validator, error) { reqStart := time.Now() - resp, err := gc.client.Validators(gc.ctx, &api.ValidatorsOpts{ + resp, err := gc.multiClient.Validators(gc.ctx, &api.ValidatorsOpts{ State: "head", // TODO maybe need to get the chainId (head) as var PubKeys: validatorPubKeys, Common: api.CommonOpts{Timeout: gc.longTimeout}, }) - recordRequestDuration(gc.ctx, "Validators", gc.client.Address(), http.MethodPost, time.Since(reqStart), err) + recordRequestDuration(gc.ctx, "Validators", gc.multiClient.Address(), http.MethodPost, time.Since(reqStart), err) if err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "Validators"), diff --git a/beacon/goclient/voluntary_exit.go b/beacon/goclient/voluntary_exit.go index 54883fced6..fb6d333f3f 100644 --- a/beacon/goclient/voluntary_exit.go +++ b/beacon/goclient/voluntary_exit.go @@ -6,7 +6,7 @@ import ( ) func (gc *GoClient) SubmitVoluntaryExit(voluntaryExit *phase0.SignedVoluntaryExit) error { - if err := gc.client.SubmitVoluntaryExit(gc.ctx, voluntaryExit); err != nil { + if err := gc.multiClient.SubmitVoluntaryExit(gc.ctx, voluntaryExit); err != nil { gc.log.Error(clResponseErrMsg, zap.String("api", "SubmitVoluntaryExit"), zap.Error(err), diff --git a/cli/operator/node.go b/cli/operator/node.go index 84fc9ab048..26b0f37751 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -19,9 +19,9 @@ import ( "github.com/ilyakaznacheev/cleanenv" "github.com/pkg/errors" "github.com/spf13/cobra" + spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/api/handlers" apiserver "github.com/ssvlabs/ssv/api/server" "github.com/ssvlabs/ssv/beacon/goclient" @@ -205,19 +205,49 @@ var StartNodeCmd = &cobra.Command{ consensusClient := setupConsensusClient(logger, operatorDataStore, slotTickerProvider) - executionClient, err := executionclient.New( - cmd.Context(), - cfg.ExecutionClient.Addr, - ethcommon.HexToAddress(networkConfig.RegistryContractAddr), - executionclient.WithLogger(logger), - executionclient.WithFollowDistance(executionclient.DefaultFollowDistance), - executionclient.WithConnectionTimeout(cfg.ExecutionClient.ConnectionTimeout), - executionclient.WithReconnectionInitialInterval(executionclient.DefaultReconnectionInitialInterval), - executionclient.WithReconnectionMaxInterval(executionclient.DefaultReconnectionMaxInterval), - executionclient.WithSyncDistanceTolerance(cfg.ExecutionClient.SyncDistanceTolerance), - ) - if err != nil { - logger.Fatal("could not connect to execution client", zap.Error(err)) + executionAddrList := strings.Split(cfg.ExecutionClient.Addr, ";") // TODO: Decide what symbol to use as a separator. Bootnodes are currently separated by ";". Deployment bot currently uses ",". + if len(executionAddrList) == 0 { + logger.Fatal("no execution node address provided") + } + + var executionClient executionclient.Provider + + if len(executionAddrList) == 1 { + ec, err := executionclient.New( + cmd.Context(), + executionAddrList[0], + ethcommon.HexToAddress(networkConfig.RegistryContractAddr), + executionclient.WithLogger(logger), + executionclient.WithFollowDistance(executionclient.DefaultFollowDistance), + executionclient.WithConnectionTimeout(cfg.ExecutionClient.ConnectionTimeout), + executionclient.WithReconnectionInitialInterval(executionclient.DefaultReconnectionInitialInterval), + executionclient.WithReconnectionMaxInterval(executionclient.DefaultReconnectionMaxInterval), + executionclient.WithHealthInvalidationInterval(executionclient.DefaultHealthInvalidationInterval), + executionclient.WithSyncDistanceTolerance(cfg.ExecutionClient.SyncDistanceTolerance), + ) + if err != nil { + logger.Fatal("could not connect to execution client", zap.Error(err)) + } + + executionClient = ec + } else { + ec, err := executionclient.NewMulti( + cmd.Context(), + executionAddrList, + ethcommon.HexToAddress(networkConfig.RegistryContractAddr), + executionclient.WithLoggerMulti(logger), + executionclient.WithFollowDistanceMulti(executionclient.DefaultFollowDistance), + executionclient.WithConnectionTimeoutMulti(cfg.ExecutionClient.ConnectionTimeout), + executionclient.WithReconnectionInitialIntervalMulti(executionclient.DefaultReconnectionInitialInterval), + executionclient.WithReconnectionMaxIntervalMulti(executionclient.DefaultReconnectionMaxInterval), + executionclient.WithHealthInvalidationIntervalMulti(executionclient.DefaultHealthInvalidationInterval), + executionclient.WithSyncDistanceToleranceMulti(cfg.ExecutionClient.SyncDistanceTolerance), + ) + if err != nil { + logger.Fatal("could not connect to execution client", zap.Error(err)) + } + + executionClient = ec } cfg.P2pNetworkConfig.NodeStorage = nodeStorage @@ -680,7 +710,7 @@ func setupConsensusClient( func syncContractEvents( ctx context.Context, logger *zap.Logger, - executionClient *executionclient.ExecutionClient, + executionClient executionclient.Provider, validatorCtrl validator.Controller, networkConfig networkconfig.NetworkConfig, nodeStorage operatorstorage.Storage, diff --git a/eth/executionclient/config.go b/eth/executionclient/config.go index f28fe89a55..7f40854d50 100644 --- a/eth/executionclient/config.go +++ b/eth/executionclient/config.go @@ -8,7 +8,7 @@ import ( // ExecutionOptions contains config configurations related to Ethereum execution client. type ExecutionOptions struct { - Addr string `yaml:"ETH1Addr" env:"ETH_1_ADDR" env-required:"true" env-description:"Execution client WebSocket address"` + Addr string `yaml:"ETH1Addr" env:"ETH_1_ADDR" env-required:"true" env-description:"Execution client WebSocket address. Supports multiple comma-separated addresses"` ConnectionTimeout time.Duration `yaml:"ETH1ConnectionTimeout" env:"ETH_1_CONNECTION_TIMEOUT" env-default:"10s" env-description:"Execution client connection timeout"` - SyncDistanceTolerance uint64 `yaml:"ETH1SyncDistanceTolerance" env:"ETH_1_SYNC_DISTANCE_TOLERANCE" env-default:"4" env-description:"The number of out-of-sync blocks we can tolerate"` + SyncDistanceTolerance uint64 `yaml:"ETH1SyncDistanceTolerance" env:"ETH_1_SYNC_DISTANCE_TOLERANCE" env-default:"5" env-description:"The number of out-of-sync blocks we can tolerate"` } diff --git a/eth/executionclient/defaults.go b/eth/executionclient/defaults.go index bc845dcf1d..8f42a11e13 100644 --- a/eth/executionclient/defaults.go +++ b/eth/executionclient/defaults.go @@ -8,8 +8,12 @@ const ( DefaultConnectionTimeout = 10 * time.Second DefaultReconnectionInitialInterval = 1 * time.Second DefaultReconnectionMaxInterval = 64 * time.Second + DefaultHealthInvalidationInterval = 10 * time.Second // TODO: decide on this value, for now choosing a bit less than block interval DefaultFollowDistance = 8 // TODO ALAN: revert DefaultHistoricalLogsBatchSize = 500 defaultLogBuf = 8 * 1024 + maxReconnectionAttempts = 5000 + reconnectionBackoffFactor = 2 + healthCheckInterval = 30 * time.Second ) diff --git a/eth/executionclient/execution_client.go b/eth/executionclient/execution_client.go index 1c3c744a04..f76196e6eb 100644 --- a/eth/executionclient/execution_client.go +++ b/eth/executionclient/execution_client.go @@ -6,6 +6,8 @@ import ( "errors" "fmt" "math/big" + "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum" @@ -22,8 +24,32 @@ import ( "github.com/ssvlabs/ssv/utils/tasks" ) +//go:generate mockgen -package=executionclient -destination=./mocks.go -source=./execution_client.go + +type Provider interface { + FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (logs <-chan BlockLogs, errors <-chan error, err error) + StreamLogs(ctx context.Context, fromBlock uint64) <-chan BlockLogs + Filterer() (*contract.ContractFilterer, error) + BlockByNumber(ctx context.Context, number *big.Int) (*ethtypes.Block, error) + HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Header, error) + ChainID(ctx context.Context) (*big.Int, error) + Healthy(ctx context.Context) error + SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- ethtypes.Log) (ethereum.Subscription, error) + FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]ethtypes.Log, error) + Close() error +} + +type SingleClientProvider interface { + Provider + SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) + streamLogsToChan(ctx context.Context, logs chan<- BlockLogs, fromBlock uint64) (lastBlock uint64, err error) +} + +var _ Provider = &ExecutionClient{} + var ( ErrClosed = fmt.Errorf("closed") + ErrUnhealthy = fmt.Errorf("unhealthy") ErrNotConnected = fmt.Errorf("not connected") ErrBadInput = fmt.Errorf("bad input") ErrNothingToSync = errors.New("nothing to sync") @@ -45,14 +71,18 @@ type ExecutionClient struct { connectionTimeout time.Duration reconnectionInitialInterval time.Duration reconnectionMaxInterval time.Duration + healthInvalidationInterval time.Duration logBatchSize uint64 syncDistanceTolerance uint64 syncProgressFn func(context.Context) (*ethereum.SyncProgress, error) // variables - client *ethclient.Client - closed chan struct{} + client *ethclient.Client + closed chan struct{} + lastSyncedTime atomic.Int64 + healthyChMu sync.Mutex + healthyCh chan struct{} } // New creates a new instance of ExecutionClient. @@ -65,8 +95,10 @@ func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, o connectionTimeout: DefaultConnectionTimeout, reconnectionInitialInterval: DefaultReconnectionInitialInterval, reconnectionMaxInterval: DefaultReconnectionMaxInterval, + healthInvalidationInterval: DefaultHealthInvalidationInterval, logBatchSize: DefaultHistoricalLogsBatchSize, // TODO Make batch of logs adaptive depending on "websocket: read limit" closed: make(chan struct{}), + healthyCh: make(chan struct{}), } for _, opt := range opts { opt(client) @@ -81,6 +113,11 @@ func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, o return client, nil } +// TODO: add comments about SyncProgress, syncProgress, syncProgressFn +func (ec *ExecutionClient) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { + return ec.syncProgressFn(ctx) +} + func (ec *ExecutionClient) syncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { return ec.client.SyncProgress(ctx) } @@ -209,6 +246,8 @@ func (ec *ExecutionClient) StreamLogs(ctx context.Context, fromBlock uint64) <-c return case <-ec.closed: return + case <-ec.healthyCh: + return default: lastBlock, err := ec.streamLogsToChan(ctx, logs, fromBlock) if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { @@ -232,7 +271,7 @@ func (ec *ExecutionClient) StreamLogs(ctx context.Context, fromBlock uint64) <-c } ec.logger.Error("failed to stream registry events, reconnecting", zap.Error(err)) - ec.reconnect(ctx) + ec.reconnect(ctx) // TODO: ethclient implements reconnection, consider removing this logic after thorough testing fromBlock = lastBlock + 1 } } @@ -249,11 +288,36 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error { return ErrClosed } + lastHealthyTime := time.Unix(ec.lastSyncedTime.Load(), 0) + if ec.healthInvalidationInterval != 0 && time.Since(lastHealthyTime) <= ec.healthInvalidationInterval { + // Synced recently, reuse the result (only if ec.healthInvalidationInterval is set). + return nil + } + + ec.healthyChMu.Lock() + defer ec.healthyChMu.Unlock() + + if err := ec.healthy(ctx); err != nil { + close(ec.healthyCh) + return fmt.Errorf("unhealthy: %w", err) + } + + // Reset the healthyCh channel if it was closed. + select { + case <-ec.healthyCh: + default: + ec.healthyCh = make(chan struct{}) + } + + return nil +} + +func (ec *ExecutionClient) healthy(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, ec.connectionTimeout) defer cancel() start := time.Now() - sp, err := ec.syncProgressFn(ctx) + sp, err := ec.SyncProgress(ctx) if err != nil { recordExecutionClientStatus(ctx, statusFailure, ec.nodeAddr) ec.logger.Error(elResponseErrMsg, @@ -277,6 +341,8 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error { } recordExecutionClientStatus(ctx, statusReady, ec.nodeAddr) + ec.lastSyncedTime.Store(time.Now().Unix()) + return nil } @@ -304,6 +370,30 @@ func (ec *ExecutionClient) HeaderByNumber(ctx context.Context, blockNumber *big. return h, nil } +func (ec *ExecutionClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- ethtypes.Log) (ethereum.Subscription, error) { + logs, err := ec.client.SubscribeFilterLogs(ctx, q, ch) + if err != nil { + ec.logger.Error(elResponseErrMsg, + zap.String("method", "EthSubscribe"), + zap.Error(err)) + return nil, err + } + + return logs, nil +} + +func (ec *ExecutionClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]ethtypes.Log, error) { + logs, err := ec.client.FilterLogs(ctx, q) + if err != nil { + ec.logger.Error(elResponseErrMsg, + zap.String("method", "eth_getLogs"), + zap.Error(err)) + return nil, err + } + + return logs, nil +} + func (ec *ExecutionClient) isClosed() bool { select { case <-ec.closed: @@ -336,6 +426,9 @@ func (ec *ExecutionClient) streamLogsToChan(ctx context.Context, logs chan<- Blo case <-ec.closed: return fromBlock, ErrClosed + case <-ec.healthyCh: + return fromBlock, ErrUnhealthy + case err := <-sub.Err(): if err == nil { return fromBlock, ErrClosed @@ -417,3 +510,7 @@ func (ec *ExecutionClient) reconnect(ctx context.Context) { func (ec *ExecutionClient) Filterer() (*contract.ContractFilterer, error) { return contract.NewContractFilterer(ec.contractAddress, ec.client) } + +func (ec *ExecutionClient) ChainID(ctx context.Context) (*big.Int, error) { + return ec.client.ChainID(ctx) +} diff --git a/eth/executionclient/execution_client_test.go b/eth/executionclient/execution_client_test.go index a677a4ed34..40da1fd450 100644 --- a/eth/executionclient/execution_client_test.go +++ b/eth/executionclient/execution_client_test.go @@ -637,7 +637,7 @@ func TestSyncProgress(t *testing.T) { require.NotEmpty(t, contractCode) // Create a client and connect to the simulator - client, err := New(ctx, addr, contractAddr) + client, err := New(ctx, addr, contractAddr, WithHealthInvalidationInterval(0)) require.NoError(t, err) err = client.Healthy(ctx) @@ -650,11 +650,9 @@ func TestSyncProgress(t *testing.T) { p.HighestBlock = 6 return p, nil } - err = client.Healthy(ctx) require.ErrorIs(t, err, errSyncing) }) - t.Run("within tolerable limits", func(t *testing.T) { client, err := New(ctx, addr, contractAddr, WithSyncDistanceTolerance(2)) require.NoError(t, err) @@ -665,7 +663,6 @@ func TestSyncProgress(t *testing.T) { p.HighestBlock = 7 return p, nil } - err = client.Healthy(ctx) require.NoError(t, err) }) diff --git a/eth/executionclient/mocks.go b/eth/executionclient/mocks.go new file mode 100644 index 0000000000..f9171642d4 --- /dev/null +++ b/eth/executionclient/mocks.go @@ -0,0 +1,395 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: ./execution_client.go +// +// Generated by this command: +// +// mockgen -package=executionclient -destination=./mocks.go -source=./execution_client.go +// + +// Package executionclient is a generated GoMock package. +package executionclient + +import ( + context "context" + big "math/big" + reflect "reflect" + + ethereum "github.com/ethereum/go-ethereum" + types "github.com/ethereum/go-ethereum/core/types" + contract "github.com/ssvlabs/ssv/eth/contract" + gomock "go.uber.org/mock/gomock" +) + +// MockProvider is a mock of Provider interface. +type MockProvider struct { + ctrl *gomock.Controller + recorder *MockProviderMockRecorder + isgomock struct{} +} + +// MockProviderMockRecorder is the mock recorder for MockProvider. +type MockProviderMockRecorder struct { + mock *MockProvider +} + +// NewMockProvider creates a new mock instance. +func NewMockProvider(ctrl *gomock.Controller) *MockProvider { + mock := &MockProvider{ctrl: ctrl} + mock.recorder = &MockProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockProvider) EXPECT() *MockProviderMockRecorder { + return m.recorder +} + +// BlockByNumber mocks base method. +func (m *MockProvider) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BlockByNumber", ctx, number) + ret0, _ := ret[0].(*types.Block) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BlockByNumber indicates an expected call of BlockByNumber. +func (mr *MockProviderMockRecorder) BlockByNumber(ctx, number any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockByNumber", reflect.TypeOf((*MockProvider)(nil).BlockByNumber), ctx, number) +} + +// ChainID mocks base method. +func (m *MockProvider) ChainID(ctx context.Context) (*big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainID", ctx) + ret0, _ := ret[0].(*big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainID indicates an expected call of ChainID. +func (mr *MockProviderMockRecorder) ChainID(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainID", reflect.TypeOf((*MockProvider)(nil).ChainID), ctx) +} + +// Close mocks base method. +func (m *MockProvider) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockProviderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockProvider)(nil).Close)) +} + +// FetchHistoricalLogs mocks base method. +func (m *MockProvider) FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (<-chan BlockLogs, <-chan error, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchHistoricalLogs", ctx, fromBlock) + ret0, _ := ret[0].(<-chan BlockLogs) + ret1, _ := ret[1].(<-chan error) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// FetchHistoricalLogs indicates an expected call of FetchHistoricalLogs. +func (mr *MockProviderMockRecorder) FetchHistoricalLogs(ctx, fromBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchHistoricalLogs", reflect.TypeOf((*MockProvider)(nil).FetchHistoricalLogs), ctx, fromBlock) +} + +// FilterLogs mocks base method. +func (m *MockProvider) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FilterLogs", ctx, q) + ret0, _ := ret[0].([]types.Log) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FilterLogs indicates an expected call of FilterLogs. +func (mr *MockProviderMockRecorder) FilterLogs(ctx, q any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterLogs", reflect.TypeOf((*MockProvider)(nil).FilterLogs), ctx, q) +} + +// Filterer mocks base method. +func (m *MockProvider) Filterer() (*contract.ContractFilterer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Filterer") + ret0, _ := ret[0].(*contract.ContractFilterer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Filterer indicates an expected call of Filterer. +func (mr *MockProviderMockRecorder) Filterer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filterer", reflect.TypeOf((*MockProvider)(nil).Filterer)) +} + +// HeaderByNumber mocks base method. +func (m *MockProvider) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HeaderByNumber", ctx, blockNumber) + ret0, _ := ret[0].(*types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HeaderByNumber indicates an expected call of HeaderByNumber. +func (mr *MockProviderMockRecorder) HeaderByNumber(ctx, blockNumber any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeaderByNumber", reflect.TypeOf((*MockProvider)(nil).HeaderByNumber), ctx, blockNumber) +} + +// Healthy mocks base method. +func (m *MockProvider) Healthy(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Healthy", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Healthy indicates an expected call of Healthy. +func (mr *MockProviderMockRecorder) Healthy(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Healthy", reflect.TypeOf((*MockProvider)(nil).Healthy), ctx) +} + +// StreamLogs mocks base method. +func (m *MockProvider) StreamLogs(ctx context.Context, fromBlock uint64) <-chan BlockLogs { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StreamLogs", ctx, fromBlock) + ret0, _ := ret[0].(<-chan BlockLogs) + return ret0 +} + +// StreamLogs indicates an expected call of StreamLogs. +func (mr *MockProviderMockRecorder) StreamLogs(ctx, fromBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamLogs", reflect.TypeOf((*MockProvider)(nil).StreamLogs), ctx, fromBlock) +} + +// SubscribeFilterLogs mocks base method. +func (m *MockProvider) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeFilterLogs", ctx, q, ch) + ret0, _ := ret[0].(ethereum.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SubscribeFilterLogs indicates an expected call of SubscribeFilterLogs. +func (mr *MockProviderMockRecorder) SubscribeFilterLogs(ctx, q, ch any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeFilterLogs", reflect.TypeOf((*MockProvider)(nil).SubscribeFilterLogs), ctx, q, ch) +} + +// MockSingleClientProvider is a mock of SingleClientProvider interface. +type MockSingleClientProvider struct { + ctrl *gomock.Controller + recorder *MockSingleClientProviderMockRecorder + isgomock struct{} +} + +// MockSingleClientProviderMockRecorder is the mock recorder for MockSingleClientProvider. +type MockSingleClientProviderMockRecorder struct { + mock *MockSingleClientProvider +} + +// NewMockSingleClientProvider creates a new mock instance. +func NewMockSingleClientProvider(ctrl *gomock.Controller) *MockSingleClientProvider { + mock := &MockSingleClientProvider{ctrl: ctrl} + mock.recorder = &MockSingleClientProviderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockSingleClientProvider) EXPECT() *MockSingleClientProviderMockRecorder { + return m.recorder +} + +// BlockByNumber mocks base method. +func (m *MockSingleClientProvider) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BlockByNumber", ctx, number) + ret0, _ := ret[0].(*types.Block) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// BlockByNumber indicates an expected call of BlockByNumber. +func (mr *MockSingleClientProviderMockRecorder) BlockByNumber(ctx, number any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockByNumber", reflect.TypeOf((*MockSingleClientProvider)(nil).BlockByNumber), ctx, number) +} + +// ChainID mocks base method. +func (m *MockSingleClientProvider) ChainID(ctx context.Context) (*big.Int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ChainID", ctx) + ret0, _ := ret[0].(*big.Int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ChainID indicates an expected call of ChainID. +func (mr *MockSingleClientProviderMockRecorder) ChainID(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainID", reflect.TypeOf((*MockSingleClientProvider)(nil).ChainID), ctx) +} + +// Close mocks base method. +func (m *MockSingleClientProvider) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockSingleClientProviderMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSingleClientProvider)(nil).Close)) +} + +// FetchHistoricalLogs mocks base method. +func (m *MockSingleClientProvider) FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (<-chan BlockLogs, <-chan error, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchHistoricalLogs", ctx, fromBlock) + ret0, _ := ret[0].(<-chan BlockLogs) + ret1, _ := ret[1].(<-chan error) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// FetchHistoricalLogs indicates an expected call of FetchHistoricalLogs. +func (mr *MockSingleClientProviderMockRecorder) FetchHistoricalLogs(ctx, fromBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchHistoricalLogs", reflect.TypeOf((*MockSingleClientProvider)(nil).FetchHistoricalLogs), ctx, fromBlock) +} + +// FilterLogs mocks base method. +func (m *MockSingleClientProvider) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FilterLogs", ctx, q) + ret0, _ := ret[0].([]types.Log) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FilterLogs indicates an expected call of FilterLogs. +func (mr *MockSingleClientProviderMockRecorder) FilterLogs(ctx, q any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterLogs", reflect.TypeOf((*MockSingleClientProvider)(nil).FilterLogs), ctx, q) +} + +// Filterer mocks base method. +func (m *MockSingleClientProvider) Filterer() (*contract.ContractFilterer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Filterer") + ret0, _ := ret[0].(*contract.ContractFilterer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Filterer indicates an expected call of Filterer. +func (mr *MockSingleClientProviderMockRecorder) Filterer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filterer", reflect.TypeOf((*MockSingleClientProvider)(nil).Filterer)) +} + +// HeaderByNumber mocks base method. +func (m *MockSingleClientProvider) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*types.Header, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "HeaderByNumber", ctx, blockNumber) + ret0, _ := ret[0].(*types.Header) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// HeaderByNumber indicates an expected call of HeaderByNumber. +func (mr *MockSingleClientProviderMockRecorder) HeaderByNumber(ctx, blockNumber any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HeaderByNumber", reflect.TypeOf((*MockSingleClientProvider)(nil).HeaderByNumber), ctx, blockNumber) +} + +// Healthy mocks base method. +func (m *MockSingleClientProvider) Healthy(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Healthy", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Healthy indicates an expected call of Healthy. +func (mr *MockSingleClientProviderMockRecorder) Healthy(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Healthy", reflect.TypeOf((*MockSingleClientProvider)(nil).Healthy), ctx) +} + +// StreamLogs mocks base method. +func (m *MockSingleClientProvider) StreamLogs(ctx context.Context, fromBlock uint64) <-chan BlockLogs { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StreamLogs", ctx, fromBlock) + ret0, _ := ret[0].(<-chan BlockLogs) + return ret0 +} + +// StreamLogs indicates an expected call of StreamLogs. +func (mr *MockSingleClientProviderMockRecorder) StreamLogs(ctx, fromBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamLogs", reflect.TypeOf((*MockSingleClientProvider)(nil).StreamLogs), ctx, fromBlock) +} + +// SubscribeFilterLogs mocks base method. +func (m *MockSingleClientProvider) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeFilterLogs", ctx, q, ch) + ret0, _ := ret[0].(ethereum.Subscription) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SubscribeFilterLogs indicates an expected call of SubscribeFilterLogs. +func (mr *MockSingleClientProviderMockRecorder) SubscribeFilterLogs(ctx, q, ch any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeFilterLogs", reflect.TypeOf((*MockSingleClientProvider)(nil).SubscribeFilterLogs), ctx, q, ch) +} + +// SyncProgress mocks base method. +func (m *MockSingleClientProvider) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncProgress", ctx) + ret0, _ := ret[0].(*ethereum.SyncProgress) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncProgress indicates an expected call of SyncProgress. +func (mr *MockSingleClientProviderMockRecorder) SyncProgress(ctx any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncProgress", reflect.TypeOf((*MockSingleClientProvider)(nil).SyncProgress), ctx) +} + +// streamLogsToChan mocks base method. +func (m *MockSingleClientProvider) streamLogsToChan(ctx context.Context, logs chan<- BlockLogs, fromBlock uint64) (uint64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "streamLogsToChan", ctx, logs, fromBlock) + ret0, _ := ret[0].(uint64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// streamLogsToChan indicates an expected call of streamLogsToChan. +func (mr *MockSingleClientProviderMockRecorder) streamLogsToChan(ctx, logs, fromBlock any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "streamLogsToChan", reflect.TypeOf((*MockSingleClientProvider)(nil).streamLogsToChan), ctx, logs, fromBlock) +} diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go new file mode 100644 index 0000000000..2f29cfc8bf --- /dev/null +++ b/eth/executionclient/multi_client.go @@ -0,0 +1,365 @@ +package executionclient + +import ( + "context" + "fmt" + "math/big" + "sync/atomic" + "time" + + "github.com/cockroachdb/errors" + "github.com/ethereum/go-ethereum" + ethcommon "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/sourcegraph/conc/pool" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/ssvlabs/ssv/eth/contract" +) + +var _ Provider = &MultiClient{} + +type MultiClient struct { + // optional + logger *zap.Logger + // followDistance defines an offset into the past from the head block such that the block + // at this offset will be considered as very likely finalized. + followDistance uint64 // TODO: consider reading the finalized checkpoint from consensus layer + connectionTimeout time.Duration + reconnectionInitialInterval time.Duration + reconnectionMaxInterval time.Duration + healthInvalidationInterval time.Duration + logBatchSize uint64 + syncDistanceTolerance uint64 + + contractAddress ethcommon.Address + chainID *big.Int + closed chan struct{} + + nodeAddrs []string + clients []SingleClientProvider + currentClientIndex atomic.Int64 +} + +// NewMulti creates a new instance of MultiClient. +func NewMulti(ctx context.Context, nodeAddrs []string, contractAddr ethcommon.Address, opts ...OptionMulti) (*MultiClient, error) { + if len(nodeAddrs) == 0 { + return nil, fmt.Errorf("no node address provided") + } + + multiClient := &MultiClient{ + nodeAddrs: nodeAddrs, + contractAddress: contractAddr, + logger: zap.NewNop(), + followDistance: DefaultFollowDistance, + connectionTimeout: DefaultConnectionTimeout, + reconnectionInitialInterval: DefaultReconnectionInitialInterval, + reconnectionMaxInterval: DefaultReconnectionMaxInterval, + logBatchSize: DefaultHistoricalLogsBatchSize, + } + + for _, opt := range opts { + opt(multiClient) + } + + // The underlying client may call Fatal on unsuccessful reconnection attempt. + // Therefore, we need to override Fatal's behavior to avoid crashing. + logger := multiClient.logger.WithOptions(zap.WithFatalHook(zapcore.WriteThenNoop)) + + for _, nodeAddr := range nodeAddrs { + singleClient, err := New( + ctx, + nodeAddr, + contractAddr, + WithLogger(logger), + WithFollowDistance(multiClient.followDistance), + WithConnectionTimeout(multiClient.connectionTimeout), + WithReconnectionInitialInterval(multiClient.reconnectionInitialInterval), + WithReconnectionMaxInterval(multiClient.reconnectionMaxInterval), + WithHealthInvalidationInterval(multiClient.healthInvalidationInterval), + WithSyncDistanceTolerance(multiClient.syncDistanceTolerance), + ) + if err != nil { + return nil, fmt.Errorf("create single client: %w", err) + } + + multiClient.clients = append(multiClient.clients, singleClient) + } + + same, err := multiClient.assertSameChainIDs(ctx) + if err != nil { + return nil, fmt.Errorf("assert same chain IDs: %w", err) + } + if !same { + return nil, fmt.Errorf("execution clients' chain IDs are not same") + } + + return multiClient, nil +} + +// assertSameChainIDs checks if all healthy clients have the same chain ID. +// It sets firstChainID to the chain ID of the first healthy client encountered. +func (mc *MultiClient) assertSameChainIDs(ctx context.Context) (bool, error) { + for i, client := range mc.clients { + addr := mc.nodeAddrs[i] + + chainID, err := client.ChainID(ctx) + if err != nil { + mc.logger.Error("failed to get chain ID", zap.String("address", addr), zap.Error(err)) + return false, fmt.Errorf("get chain ID: %w", err) + } + if mc.chainID == nil { + mc.chainID = chainID + continue + } + if mc.chainID.Cmp(chainID) != 0 { + mc.logger.Error("chain ID mismatch", + zap.String("observed_chain_id", mc.chainID.String()), + zap.String("checked_chain_id", chainID.String()), + zap.String("address", addr)) + return false, nil + } + } + + return true, nil +} + +// FetchHistoricalLogs retrieves historical logs emitted by the contract starting from fromBlock. +// It calls FetchHistoricalLogs of all clients until a no-error result. +// It doesn't handle errors in the error channel to simplify logic. +// In this case, caller should call Panic/Fatal to restart the node. +func (mc *MultiClient) FetchHistoricalLogs(ctx context.Context, fromBlock uint64) (<-chan BlockLogs, <-chan error, error) { + var logCh <-chan BlockLogs + var errCh <-chan error + + f := func(client SingleClientProvider) (any, error) { + singleLogsCh, singleErrCh, err := client.FetchHistoricalLogs(ctx, fromBlock) + if err != nil { + return nil, err + } + + logCh = singleLogsCh + errCh = singleErrCh + return nil, nil + } + + _, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f) + if err != nil { + return nil, nil, err + } + + return logCh, errCh, nil +} + +// StreamLogs subscribes to events emitted by the contract. +// NOTE: StreamLogs spawns a goroutine which calls os.Exit(1) if no client is available. +func (mc *MultiClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan BlockLogs { + logs := make(chan BlockLogs) + + go func() { + defer close(logs) + for { + select { + case <-ctx.Done(): + return + case <-mc.closed: + return + default: + // Update healthyCh of all nodes and make sure at least one of them is available. + if err := mc.Healthy(ctx); err != nil { + mc.logger.Fatal("no healthy clients", zap.Error(err)) + } + + f := func(client SingleClientProvider) (any, error) { + lastBlock, err := client.streamLogsToChan(ctx, logs, fromBlock) + if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { + return lastBlock, err + } + if err != nil { + // fromBlock's value in the outer scope is updated here, so this function needs to be a closure + fromBlock = max(fromBlock, lastBlock+1) + return nil, err + } + + return nil, nil + } + + _, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f) + if err != nil && !errors.Is(err, ErrClosed) && !errors.Is(err, context.Canceled) { + // NOTE: There are unit tests that trigger Fatal and override its behavior. + // Therefore, the code must call `return` afterward. + mc.logger.Fatal("failed to stream registry events", zap.Error(err)) + } + return + } + } + }() + + return logs +} + +// Healthy returns if execution client is currently healthy: responds to requests and not in the syncing state. +func (mc *MultiClient) Healthy(ctx context.Context) error { + healthyClients := atomic.Int32{} + p := pool.New().WithErrors().WithContext(ctx) + for i, client := range mc.clients { + p.Go(func(ctx context.Context) error { + err := client.Healthy(ctx) + if err != nil { + mc.logger.Warn("client is not healthy", + zap.String("addr", mc.nodeAddrs[i]), + zap.Error(err)) + return err + } + healthyClients.Add(1) + return nil + }) + } + err := p.Wait() + if healthyClients.Load() > 0 { + return nil + } + return fmt.Errorf("no healthy clients: %w", err) +} + +// BlockByNumber retrieves a block by its number. +func (mc *MultiClient) BlockByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Block, error) { + f := func(client SingleClientProvider) (any, error) { + return client.BlockByNumber(ctx, blockNumber) + } + res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f) + if err != nil { + return nil, err + } + + return res.(*ethtypes.Block), nil +} + +// HeaderByNumber retrieves a block header by its number. +func (mc *MultiClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) (*ethtypes.Header, error) { + f := func(client SingleClientProvider) (any, error) { + return client.HeaderByNumber(ctx, blockNumber) + } + res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f) + if err != nil { + return nil, err + } + + return res.(*ethtypes.Header), nil +} + +func (mc *MultiClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- ethtypes.Log) (ethereum.Subscription, error) { + f := func(client SingleClientProvider) (any, error) { + return client.SubscribeFilterLogs(ctx, q, ch) + } + res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f) + if err != nil { + return nil, err + } + + return res.(ethereum.Subscription), nil +} + +func (mc *MultiClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]ethtypes.Log, error) { + f := func(client SingleClientProvider) (any, error) { + return client.FilterLogs(ctx, q) + } + res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f) + if err != nil { + return nil, err + } + + return res.([]ethtypes.Log), nil +} + +func (mc *MultiClient) Filterer() (*contract.ContractFilterer, error) { + return contract.NewContractFilterer(mc.contractAddress, mc) +} + +func (mc *MultiClient) ChainID(_ context.Context) (*big.Int, error) { + return mc.chainID, nil +} + +func (mc *MultiClient) Close() error { + close(mc.closed) + + var multiErr error + for i, client := range mc.clients { + if err := client.Close(); err != nil { + mc.logger.Debug("Failed to close client", zap.String("address", mc.nodeAddrs[i]), zap.Error(err)) + multiErr = errors.Join(multiErr, err) + } + } + + return multiErr +} + +func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error)) (any, error) { + if len(mc.clients) == 1 { + return f(mc.clients[0]) + } + + // Iterate over the clients in round-robin fashion, + // starting from the most likely healthy client (currentClientIndex). + var startingIndex = int(mc.currentClientIndex.Load()) + var allErrs error + for i := 0; i < len(mc.clients); i++ { + clientIndex := (startingIndex + i) % len(mc.clients) + nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging. + client := mc.clients[clientIndex] + + logger := mc.logger.With( + zap.String("addr", mc.nodeAddrs[clientIndex]), + zap.String("method", methodFromContext(ctx))) + + // Make sure this client is healthy. This shouldn't cause too many requests because the result is cached. + // TODO: Make sure the allowed tolerance doesn't cause issues in log streaming. + if err := client.Healthy(ctx); err != nil { + logger.Warn("client is not healthy, switching to next client", + zap.String("next_addr", mc.nodeAddrs[nextClientIndex]), + zap.Error(err)) + allErrs = errors.Join(allErrs, err) + mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. + continue + } + + logger.Debug("calling client") + + v, err := f(client) + if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { + mc.logger.Debug("received graceful closure from client", zap.Error(err)) + return v, err + } + + if err != nil { + mc.logger.Error("call failed, trying another client", + zap.String("next_addr", mc.nodeAddrs[nextClientIndex]), + zap.Error(err)) + + allErrs = errors.Join(allErrs, err) + mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. + continue + } + + // Update currentClientIndex to the successful client. + mc.currentClientIndex.Store(int64(clientIndex)) + return v, nil + } + + return nil, fmt.Errorf("all clients failed: %w", allErrs) +} + +type methodContextKey struct{} + +func contextWithMethod(ctx context.Context, method string) context.Context { + return context.WithValue(ctx, methodContextKey{}, method) +} + +func methodFromContext(ctx context.Context) string { + v, ok := ctx.Value(methodContextKey{}).(string) + if !ok { + return "" + } + return v +} diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go new file mode 100644 index 0000000000..7edf729662 --- /dev/null +++ b/eth/executionclient/multi_client_test.go @@ -0,0 +1,1452 @@ +package executionclient + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "math/big" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum" + ethcommon "github.com/ethereum/go-ethereum/common" + ethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +func TestNewMulti(t *testing.T) { + t.Run("no node addresses", func(t *testing.T) { + ctx := context.Background() + + mc, err := NewMulti(ctx, []string{}, ethcommon.Address{}) + + require.Nil(t, mc, "MultiClient should be nil on error") + require.Error(t, err, "expected an error due to no node addresses") + require.Contains(t, err.Error(), "no node address provided") + }) + + t.Run("error creating single client", func(t *testing.T) { + ctx := context.Background() + addr := "invalid-addr" + addresses := []string{addr} + + mc, err := NewMulti(ctx, addresses, ethcommon.Address{}) + + require.Nil(t, mc, "MultiClient should be nil on error") + require.Error(t, err) + require.Contains(t, err.Error(), "create single client") + }) + + t.Run("chain ID mismatch", func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + ChainID(gomock.Any()). + Return(big.NewInt(1), nil). + AnyTimes() + + mockClient2. + EXPECT(). + ChainID(gomock.Any()). + Return(big.NewInt(2), nil). + AnyTimes() + + mc := &MultiClient{ + nodeAddrs: []string{"mock1", "mock2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + } + + same, err := mc.assertSameChainIDs(context.Background()) + + require.NoError(t, err) + require.False(t, same) + }) +} + +func TestNewMulti_WithOptions(t *testing.T) { + ctx := context.Background() + + sim := simTestBackend(testAddr) + + rpcServer, _ := sim.Node().RPCHandler() + httpsrv := httptest.NewServer(rpcServer.WebsocketHandler([]string{"*"})) + defer rpcServer.Stop() + defer httpsrv.Close() + addr := httpToWebSocketURL(httpsrv.URL) + + addresses := []string{addr} + contractAddr := ethcommon.HexToAddress("0x1234") + + customLogger := zap.NewExample() + const customFollowDistance = uint64(10) + const customTimeout = 100 * time.Millisecond + const customReconnectionInterval = 10 * time.Millisecond + const customReconnectionMaxInterval = 1 * time.Second + const customHealthInvalidationInterval = 50 * time.Millisecond + const customLogBatchSize = 11 + const customSyncDistanceTolerance = 12 + + mc, err := NewMulti( + ctx, + addresses, + contractAddr, + WithLoggerMulti(customLogger), + WithFollowDistanceMulti(customFollowDistance), + WithConnectionTimeoutMulti(customTimeout), + WithReconnectionInitialIntervalMulti(customReconnectionInterval), + WithReconnectionMaxIntervalMulti(customReconnectionMaxInterval), + WithHealthInvalidationIntervalMulti(customHealthInvalidationInterval), + WithLogBatchSizeMulti(customLogBatchSize), + WithSyncDistanceToleranceMulti(customSyncDistanceTolerance), + ) + require.NoError(t, err) + require.NotNil(t, mc) + require.Equal(t, customLogger.Named("execution_client_multi"), mc.logger) + require.EqualValues(t, customFollowDistance, mc.followDistance) + require.EqualValues(t, customTimeout, mc.connectionTimeout) + require.EqualValues(t, customReconnectionInterval, mc.reconnectionInitialInterval) + require.EqualValues(t, customReconnectionMaxInterval, mc.reconnectionMaxInterval) + require.EqualValues(t, customHealthInvalidationInterval, mc.healthInvalidationInterval) + require.EqualValues(t, customLogBatchSize, mc.logBatchSize) + require.EqualValues(t, customSyncDistanceTolerance, mc.syncDistanceTolerance) +} + +func TestMultiClient_assertSameChainIDs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + ChainID(gomock.Any()). + Return(big.NewInt(5), nil). + Times(1) + + mockClient2. + EXPECT(). + ChainID(gomock.Any()). + Return(big.NewInt(5), nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1", "mock2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + } + + same, err := mc.assertSameChainIDs(context.Background()) + require.NoError(t, err) + require.True(t, same, "expected chain IDs to match") + + chainID, err := mc.ChainID(context.Background()) + require.NoError(t, err) + require.NotNil(t, chainID) + require.Equal(t, int64(5), chainID.Int64()) +} + +func TestMultiClient_assertSameChainIDs_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + nodeAddr1 := "mockNode1" + nodeAddr2 := "mockNode2" + + mockClient1. + EXPECT(). + ChainID(gomock.Any()). + Return(big.NewInt(1), nil). + Times(1) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + fetchErr := fmt.Errorf("failed to get chain ID") + mockClient2. + EXPECT(). + ChainID(gomock.Any()). + Return(nil, fetchErr). + Times(1) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mc := &MultiClient{ + nodeAddrs: []string{nodeAddr1, nodeAddr2}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + same, err := mc.assertSameChainIDs(context.Background()) + + require.False(t, same, "Expected chain IDs to not match due to an error") + require.Error(t, err, "Expected an error when fetching chain ID from a client") + require.Contains(t, err.Error(), "get chain ID: failed to get chain ID", "Error message should indicate the chain ID retrieval failure") +} + +func TestMultiClient_FetchHistoricalLogs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient := NewMockSingleClientProvider(ctrl) + + logCh := make(chan BlockLogs, 1) + errCh := make(chan error, 1) + + mockClient. + EXPECT(). + FetchHistoricalLogs(gomock.Any(), uint64(100)). + DoAndReturn(func(ctx context.Context, fromBlock uint64) (<-chan BlockLogs, <-chan error, error) { + go func() { + logCh <- BlockLogs{BlockNumber: 100} + close(logCh) + close(errCh) + }() + return logCh, errCh, nil + }). + Times(1) + + mockClient. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + mc := &MultiClient{ + nodeAddrs: []string{"mockaddr"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logs, errs, err := mc.FetchHistoricalLogs(ctx, 100) + require.NoError(t, err) + require.NotNil(t, logs) + require.NotNil(t, errs) + + firstLog, ok1 := <-logs + require.True(t, ok1, "expected to receive the first log from channel") + require.Equal(t, uint64(100), firstLog.BlockNumber) + + _, open := <-logs + require.False(t, open, "logs channel should be closed once done") + + errVal, openErr := <-errs + require.False(t, openErr, "errors channel should be closed") + require.Nil(t, errVal, "expected no errors") +} + +func TestMultiClient_FetchHistoricalLogs_AllClientsNothingToSync(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + FetchHistoricalLogs(gomock.Any(), uint64(100)). + Return((<-chan BlockLogs)(nil), (<-chan error)(nil), ErrNothingToSync). + Times(1) + + mockClient2. + EXPECT(). + FetchHistoricalLogs(gomock.Any(), uint64(100)). + Return((<-chan BlockLogs)(nil), (<-chan error)(nil), ErrNothingToSync). + Times(1) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logs, errs, err := mc.FetchHistoricalLogs(ctx, 100) + require.Error(t, err) + require.ErrorContains(t, err, ErrNothingToSync.Error()) + require.Nil(t, logs) + require.Nil(t, errs) +} + +func TestMultiClient_FetchHistoricalLogs_MixedErrors(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + FetchHistoricalLogs(gomock.Any(), uint64(100)). + Return((<-chan BlockLogs)(nil), (<-chan error)(nil), fmt.Errorf("unexpected error")). + Times(1) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mockClient2. + EXPECT(). + FetchHistoricalLogs(gomock.Any(), uint64(100)). + Return((<-chan BlockLogs)(nil), (<-chan error)(nil), ErrNothingToSync). + Times(1) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logs, errs, err := mc.FetchHistoricalLogs(ctx, 100) + require.Error(t, err) + require.ErrorContains(t, err, "all clients failed") + require.Nil(t, logs) + require.Nil(t, errs) +} + +func TestMultiClient_StreamLogs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 200} + out <- BlockLogs{BlockNumber: 201} + return 201, nil // Success + }). + Times(1) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + mockClient2. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(202)). // fromBlock=202 + Return(uint64(202), nil). // Should not be called + Times(0) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + hook := &fatalHook{} + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), + closed: make(chan struct{}), + } + + logsCh := mc.StreamLogs(ctx, 200) + + var wg sync.WaitGroup + wg.Add(2) // Expecting two logs: 200, 201 + + var receivedLogs []BlockLogs + var mu sync.Mutex + + go func() { + for blk := range logsCh { + mu.Lock() + receivedLogs = append(receivedLogs, blk) + mu.Unlock() + wg.Done() + } + }() + + wg.Wait() + + require.Len(t, receivedLogs, 2, "expected to receive two logs") + require.Equal(t, uint64(200), receivedLogs[0].BlockNumber) + require.Equal(t, uint64(201), receivedLogs[1].BlockNumber) + + _, open := <-logsCh + require.False(t, open, "logs channel should be closed after all logs are received") + + // Make sure Fatal was not called since the first client succeeded + require.False(t, hook.called, "did not expect Fatal to be called") +} + +func TestMultiClient_StreamLogs_Success(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient := NewMockSingleClientProvider(ctrl) + + mockClient. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 200} + out <- BlockLogs{BlockNumber: 201} + return 201, nil + }). + Times(1) + + mockClient. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + hook := &fatalHook{} + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), + closed: make(chan struct{}), + } + + logsCh := mc.StreamLogs(ctx, 200) + + var wg sync.WaitGroup + wg.Add(2) // Expecting two logs: 200, 201 + + var receivedLogs []BlockLogs + var mu sync.Mutex + + go func() { + for blk := range logsCh { + mu.Lock() + receivedLogs = append(receivedLogs, blk) + mu.Unlock() + wg.Done() + } + }() + + wg.Wait() + + require.Len(t, receivedLogs, 2, "expected to receive two logs") + require.Equal(t, uint64(200), receivedLogs[0].BlockNumber) + require.Equal(t, uint64(201), receivedLogs[1].BlockNumber) + + _, open := <-logsCh + require.False(t, open, "logs channel should be closed after all logs are received") + + // Make sure Fatal was not called since the client succeeded + require.False(t, hook.called, "did not expect Fatal to be called") +} + +func TestMultiClient_StreamLogs_Failover(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + gomock.InOrder( + // First client: mockClient1 with fromBlock=200 + mockClient1. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 200} + out <- BlockLogs{BlockNumber: 201} + return 201, errors.New("network error") // Triggers failover + }). + Times(1), + + // Second client: mockClient2 with fromBlock=202 + mockClient2. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(202)). + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 202} + return 202, ErrClosed // Reference exported ErrClosed + }). + Times(1), + ) + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockClient2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logsCh := mc.StreamLogs(ctx, 200) + + var wg sync.WaitGroup + wg.Add(3) // Expecting three logs: 200, 201, 202 + + var receivedLogs []BlockLogs + var mu sync.Mutex + + go func() { + for blk := range logsCh { + mu.Lock() + receivedLogs = append(receivedLogs, blk) + mu.Unlock() + wg.Done() + } + }() + + wg.Wait() + + require.Len(t, receivedLogs, 3, "expected to receive three logs") + require.Equal(t, uint64(200), receivedLogs[0].BlockNumber) + require.Equal(t, uint64(201), receivedLogs[1].BlockNumber) + require.Equal(t, uint64(202), receivedLogs[2].BlockNumber) +} + +func TestMultiClient_StreamLogs_AllClientsFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + // Setup both clients to fail + mockClient1. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 200} + return 200, errors.New("network error") // Triggers failover + }). + Times(1) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + mockClient2. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(201)). // Updated fromBlock to 201 + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 201} + return 201, errors.New("network error") // All clients failed + }). + Times(1) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + hook := &fatalHook{} + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), + closed: make(chan struct{}), + } + + logsCh := mc.StreamLogs(ctx, 200) + + var wg sync.WaitGroup + wg.Add(2) // Expecting two logs: 200, 201 + + var receivedLogs []BlockLogs + var mu sync.Mutex + + go func() { + for blk := range logsCh { + mu.Lock() + receivedLogs = append(receivedLogs, blk) + mu.Unlock() + wg.Done() + } + }() + + wg.Wait() + + require.Len(t, receivedLogs, 2, "expected to receive two logs") + require.Equal(t, uint64(200), receivedLogs[0].BlockNumber) + require.Equal(t, uint64(201), receivedLogs[1].BlockNumber) + + _, open := <-logsCh + require.False(t, open, "logs channel should be closed after all logs are received") + + // Make sure Fatal was called due to all clients failing + require.True(t, hook.called, "expected Fatal to be called due to all clients failing") +} + +func TestMultiClient_StreamLogs_SameFromBlock(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + // Setup mockClient1 to return lastBlock=200 without errors + mockClient1. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 200} + return 200, nil + }). + Times(1) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + // Setup mockClient2 to not be called + mockClient2. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), gomock.Any()). + Times(0) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logsCh := mc.StreamLogs(ctx, 200) + + blk1, ok1 := <-logsCh + require.True(t, ok1, "expected to receive the first log from channel") + require.Equal(t, uint64(200), blk1.BlockNumber) + + _, open := <-logsCh + require.False(t, open, "logs channel should be closed after all logs are received") +} + +func TestMultiClient_StreamLogs_MultipleFailoverAttempts(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + mockClient3 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + mockClient3. + EXPECT(). + Healthy(gomock.Any()). + DoAndReturn(func(ctx context.Context) error { + return nil + }). + AnyTimes() + + gomock.InOrder( + // Setup mockClient1 to fail with fromBlock=200 + mockClient1. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + Return(uint64(0), errors.New("network error")). + Times(1), + + // Setup mockClient2 to fail with fromBlock=200 + mockClient2. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + Return(uint64(0), errors.New("network error")). + Times(1), + + // Setup mockClient3 to handle fromBlock=200 + mockClient3. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). + DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { + out <- BlockLogs{BlockNumber: 200} + return 200, nil + }). + Times(1), + ) + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2", "mockNode3"}, + clients: []SingleClientProvider{mockClient1, mockClient2, mockClient3}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logsCh := mc.StreamLogs(ctx, 200) + + blk1, ok1 := <-logsCh + require.True(t, ok1, "expected to receive the first log from channel") + require.Equal(t, uint64(200), blk1.BlockNumber) + + _, open := <-logsCh + require.False(t, open, "logs channel should be closed after all logs are received") +} + +func TestMultiClient_StreamLogs_NoHealthyClients(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.Background() + nodeAddrs := []string{"mockNode1", "mockNode2"} + contractAddr := ethcommon.HexToAddress("0x1234") + fromBlock := uint64(100) + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + healthErr := errors.New("client1 unhealthy") + mockClient1.EXPECT().Healthy(gomock.Any()).Return(healthErr).AnyTimes() + mockClient2.EXPECT().Healthy(gomock.Any()).Return(healthErr).AnyTimes() + + hook := &fatalHook{} + + mc := &MultiClient{ + logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), + nodeAddrs: nodeAddrs, + clients: []SingleClientProvider{mockClient1, mockClient2}, + closed: make(chan struct{}), + contractAddress: contractAddr, + } + + logsCh := mc.StreamLogs(ctx, fromBlock) + + _, open := <-logsCh + require.False(t, open, "logs channel should be closed after fatal log") + + require.True(t, hook.called, "expected Fatal to be called") +} + +func TestMultiClient_Healthy(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + mockClient. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + err := mc.Healthy(context.Background()) + require.NoError(t, err) +} + +func TestMultiClient_Healthy_MultiClient(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + // Setup only one client to be healthy to iterate all of them + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + Return(fmt.Errorf("not healthy")). + Times(1) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + err := mc.Healthy(context.Background()) + require.NoError(t, err, "expected all clients to be healthy") +} + +func TestMultiClient_Healthy_AllClientsUnhealthy(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + Return(fmt.Errorf("client1 unhealthy")). + Times(1) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + Return(fmt.Errorf("client2 unhealthy")). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + err := mc.Healthy(context.Background()) + require.Error(t, err) + require.Contains(t, err.Error(), "client1 unhealthy") + require.Contains(t, err.Error(), "client2 unhealthy") +} + +func TestMultiClient_BlockByNumber(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + mockClient. + EXPECT(). + BlockByNumber(gomock.Any(), big.NewInt(1234)). + Return(ðtypes.Block{}, nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + blk, err := mc.BlockByNumber(context.Background(), big.NewInt(1234)) + require.NoError(t, err) + require.NotNil(t, blk) +} + +func TestMultiClient_BlockByNumber_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + mockClient. + EXPECT(). + BlockByNumber(gomock.Any(), big.NewInt(1234)). + Return((*ethtypes.Block)(nil), fmt.Errorf("block not found")). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + blk, err := mc.BlockByNumber(context.Background(), big.NewInt(1234)) + require.Error(t, err) + require.Nil(t, blk) + require.Contains(t, err.Error(), "block not found") +} + +func TestMultiClient_HeaderByNumber(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + mockClient. + EXPECT(). + HeaderByNumber(gomock.Any(), big.NewInt(1234)). + Return(ðtypes.Header{}, nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + blk, err := mc.HeaderByNumber(context.Background(), big.NewInt(1234)) + require.NoError(t, err) + require.NotNil(t, blk) +} + +func TestMultiClient_HeaderByNumber_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + mockClient. + EXPECT(). + HeaderByNumber(gomock.Any(), big.NewInt(1234)). + Return((*ethtypes.Header)(nil), fmt.Errorf("header not found")). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + blk, err := mc.HeaderByNumber(context.Background(), big.NewInt(1234)) + require.Error(t, err) + require.Nil(t, blk) + require.Contains(t, err.Error(), "header not found") +} + +func TestMultiClient_SubscribeFilterLogs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + sub := &dummySubscription{} + + query := ethereum.FilterQuery{ + Addresses: []ethcommon.Address{ethcommon.HexToAddress("0x1234")}, + FromBlock: big.NewInt(100), + ToBlock: big.NewInt(200), + } + + logCh := make(chan ethtypes.Log) + + mockClient. + EXPECT(). + SubscribeFilterLogs(gomock.Any(), query, logCh). + Return(sub, nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + subscription, err := mc.SubscribeFilterLogs(context.Background(), query, logCh) + require.NoError(t, err) + require.Equal(t, sub, subscription) +} + +func TestMultiClient_SubscribeFilterLogs_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + query := ethereum.FilterQuery{ + Addresses: []ethcommon.Address{ethcommon.HexToAddress("0x1234")}, + FromBlock: big.NewInt(100), + ToBlock: big.NewInt(200), + } + + logCh := make(chan ethtypes.Log) + + mockClient. + EXPECT(). + SubscribeFilterLogs(gomock.Any(), query, logCh). + Return((ethereum.Subscription)(nil), fmt.Errorf("subscription error")). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + subscription, err := mc.SubscribeFilterLogs(context.Background(), query, logCh) + require.Error(t, err) + require.Nil(t, subscription) + require.Contains(t, err.Error(), "subscription error") +} + +func TestMultiClient_FilterLogs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + query := ethereum.FilterQuery{Addresses: []ethcommon.Address{ethcommon.HexToAddress("0x1234")}} + + expectedLogs := []ethtypes.Log{ + {Address: ethcommon.HexToAddress("0x1234")}, + } + + mockClient. + EXPECT(). + FilterLogs(gomock.Any(), query). + Return(expectedLogs, nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logs, err := mc.FilterLogs(context.Background(), query) + require.NoError(t, err) + require.Equal(t, expectedLogs, logs) +} + +func TestMultiClient_FilterLogs_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + query := ethereum.FilterQuery{Addresses: []ethcommon.Address{ethcommon.HexToAddress("0x1234")}} + + mockClient. + EXPECT(). + FilterLogs(gomock.Any(), query). + Return(nil, fmt.Errorf("filtering error")). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + logs, err := mc.FilterLogs(context.Background(), query) + require.Error(t, err) + require.Nil(t, logs) + require.Contains(t, err.Error(), "filtering error") +} + +func TestMultiClient_Filterer(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mc := &MultiClient{ + contractAddress: ethcommon.HexToAddress("0x1234"), + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + filterer, err := mc.Filterer() + require.NoError(t, err) + require.NotNil(t, filterer) +} + +func TestMultiClient_Filterer_Integration(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + contractAddr := ethcommon.HexToAddress("0x1234") + mc := &MultiClient{ + contractAddress: contractAddr, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + filterer, err := mc.Filterer() + require.NoError(t, err) + require.NotNil(t, filterer) + + const event = `{ + "address": "0x4b133c68a084b8a88f72edcd7944b69c8d545f03", + "topics": [ + "0x48a3ea0796746043948f6341d17ff8200937b99262a0b48c2663b951ed7114e5", + "0x00000000000000000000000077fc6e8b24a623725d935bc88057098d0bca6eb3" + ], + "data": "0x000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001a0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000000030000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000017c4071580000000000000000000000000000000000000000000000000000000000000001000000000000000000000000000000000000000000000000532d024bb0158000000000000000000000000000000000000000000000000000000000000000000400000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000000300000000000000000000000000000000000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000030b24454393691331ee6eba4ffa2dbb2600b9859f908c3e648b6c6de9e1dea3e9329866015d08355c8d451427762b913d1000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000520823f86dfcae5771e9b847c07ed6dda49211274db079f3397b3b06ab7291cebd688f63cb5648ca535c4ec4f9e2b89a48515a035dc24f4738ada942b72d32f75a7e72ac8a8c39d73a6fd633d0f58cadc4618dc70c8160cab5573d541c88a6aebada0acbbeacd49f2931c8c0c548d1b0f69cd468c803ec3fe06bddf08186ae3b64e0b5f1762feb141a06e71c828cedd3c878a08a40fd84d3a0449308c458fd324e67eb6df89e28bf6c304a1e71dcb8c9823b85c2dcca82a980cffb994da194e70b487d02db1ff328b6a8d5f6b519ffc1b524753ce8ed56d4ec1a3cdddc3b24f427b22caa351a1fa0d9f523bd279756ec38080c4850691c1f520dee49a9f4267c0a7a53c7818d165681c9a615a391ee5e4cc9c0e51725c1a92f7e92a393afe4f8f3f35503f241288822a1d721f5c164ea7f85d2b43b638678593d670e79c17a0e1398ac6bbd3ef7ccbdf67e38f6d058ffc5319280868c9a44529b1a4ea7f73792680e67693c502ad9053935edc312c48a94a0d18c71c18af8eb46ae8979d30f176969063430c14ef18a51b3dca4f8775551f4e1fc6a651ee909fc4f7b872c041514a01b4d88b972be86960e3472419ef1577c92af61d572e4e07b32bb38a0e52a1c75f03e1cee80d053b97e3e238c022521c48c6c1dc0f0def8fa0472d7669095b0e9304e63af8b5a9928d9fcf4de166029d88891d10ae6abafe150cc6e9aa6464d76064b16a19b09dad4556dffa580d14cd6755fa2274022abbc917eb7a50f296a153781742c2f101cf280b7f095bf443d51be4dcdd114804fb40ba496c16a3c3a3e82d8645833e51c22cebb78ce4b18b6b9eb3b480f5478b3ed97b5a93b705f41d05ed8423f424c5d05317c4e6e53e954570a46027361f7f3f18a581860720dac25afc00f4378a35439fd860ccbc0f0586ae6cf44d53f828faf77c1949bfe58c428de7263d48b1f7ecbb24a25c6abc11aa6105fe41a9a1f608c6895d808e8ca805efd306ec8774201a63e7d9220e031c5e8abdde49f5d56590637a5234b4b35a20875d5e0a5b06c2834dae47dd50633c371ef1071ea3d79a8ce727c2e83d3fae85a596112404875e847c743ddde50bc13b5d661f558e4d02f29b972188418d2f875d0603abbe9ab5c1fc19ae80636d9aa3a6c80be21b2970b84aa4659244424f943b3a99c8ec73304bcc8fc51519f1655ad6f75954af3cb238e946ac50aa365fd6538a7190b6e64b320f822a0010e92e1e4f3d773d25c4e29b3d590e75b4ebfecd6c059df2060f44344dda27f2f794aeb3dfcde62c7b24b80ff95ff1246d05805a12028d9840316f6b8368b60d2a824cc14b02d25a46b689e4519dd9963b5786ff9fa0c695fbdff455499a8f6cc88261b498e90223c0abca38dfe188eef0ac4680f6ac172fdf6b4a343cb1f090a8ce427ef4c745a2408f9ffe67c5b8eb17e7cc2e5851db5f5c75c0658afea00dc1552caf7ef745d2e5e057ccd3b177de22d989fe97bcac17471d0e8ee330a6d9788c6927b1991e784ec61deef91afad21895718e3fa751a782cf66c5911e3f2148dffb01e7e09dcbce8053e060df505f1121202017b34010ddbf02e63b40b8e88a73ac75eb239c401f136b255aded2201de167c9b6ee140de2d307712c8db8e958c5bab3de27d6a40e0b1211ccb634ca9204ce1bda71064f3bee1546f97979c9ec07cd5b4cb5befd7cf8ac930ad74111f381c72d18d3cf1aefef073630cc7bfef722650023032d6493fea494b3b01c95790b08609c9c039a1849fbe47042a29e98ce92f87641647db7d610357c087de95218ea357284828925c21ff7685f01f1b0e5ebadef7d1e763c64ee06d29f4ded10075d39", + "blockNumber": "0x89EBFF", + "transactionHash": "0x921a3f836fb873a40aa4f83097e52b69225334c49674dc262b2bb90d27e3a801" + }` + + var log ethtypes.Log + require.NoError(t, json.Unmarshal([]byte(event), &log)) + + logs, err := filterer.ParseValidatorAdded(log) + require.NoError(t, err) + require.NotNil(t, logs) + +} + +func TestMultiClient_ChainID(t *testing.T) { + mc := &MultiClient{ + chainID: big.NewInt(5), + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + cid, err := mc.ChainID(context.Background()) + require.NoError(t, err) + require.Equal(t, big.NewInt(5), cid) +} + +func TestMultiClient_ChainID_NotSet(t *testing.T) { + mc := &MultiClient{ + chainID: nil, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + cid, err := mc.ChainID(context.Background()) + require.NoError(t, err) + require.Nil(t, cid, "expected ChainID to be nil when not set") +} + +func TestMultiClient_Close(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + Close(). + Return(nil). + Times(1) + + mockClient2. + EXPECT(). + Close(). + Return(errors.New("close error")). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1", "mock2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + err := mc.Close() + // Should combine errors if multiple close calls fail. + require.Error(t, err) + require.Contains(t, err.Error(), "close error") +} + +func TestMultiClient_Close_MultiClient(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + mockClient3 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + Close(). + Return(nil). + Times(1) + + mockClient2. + EXPECT(). + Close(). + Return(errors.New("close error")). + Times(1) + + mockClient3. + EXPECT(). + Close(). + Return(nil). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2", "mockNode3"}, + clients: []SingleClientProvider{mockClient1, mockClient2, mockClient3}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + err := mc.Close() + // Should combine errors if multiple close calls fail. + require.Error(t, err) + require.Contains(t, err.Error(), "close error") +} + +func TestMultiClient_Call_Concurrency(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient := NewMockSingleClientProvider(ctrl) + + mockClient. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mockClient. + EXPECT(). + BlockByNumber(gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, num *big.Int) (*ethtypes.Block, error) { + return ðtypes.Block{}, nil + }). + Times(10) + + mc := &MultiClient{ + nodeAddrs: []string{"mock1"}, + clients: []SingleClientProvider{mockClient}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + var wg sync.WaitGroup + wg.Add(10) + + for i := 0; i < 10; i++ { + go func() { + defer wg.Done() + _, err := mc.BlockByNumber(context.Background(), big.NewInt(1234)) + require.NoError(t, err) + }() + } + + wg.Wait() +} + +func TestMultiClient_Call_AllClientsFail(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + Times(1) + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + Times(1) + + mockClient1. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), gomock.Any()). + Return(uint64(0), fmt.Errorf("streaming error")). + Times(1) + + mockClient2. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), gomock.Any()). + Return(uint64(201), fmt.Errorf("another streaming error")). + Times(1) + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + // Define a simple function to simulate a call + f := func(client SingleClientProvider) (any, error) { + return client.streamLogsToChan(context.TODO(), nil, 200) + } + + _, err := mc.call(context.Background(), f) + require.Error(t, err) + require.Contains(t, err.Error(), "all clients failed") +} + +func TestMultiClient_ReconnectionLimit(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockClient1 := NewMockSingleClientProvider(ctrl) + mockClient2 := NewMockSingleClientProvider(ctrl) + + mockClient1. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mockClient1. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), gomock.Any()). + Return(uint64(200), fmt.Errorf("streaming error")). + AnyTimes() + + mockClient2. + EXPECT(). + Healthy(gomock.Any()). + Return(nil). + AnyTimes() + + mockClient2. + EXPECT(). + streamLogsToChan(gomock.Any(), gomock.Any(), gomock.Any()). + Return(uint64(200), fmt.Errorf("streaming error")). + Times(1) + + hook := &fatalHook{} + + mc := &MultiClient{ + nodeAddrs: []string{"mockNode1", "mockNode2"}, + clients: []SingleClientProvider{mockClient1, mockClient2}, + logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), + closed: make(chan struct{}), + } + + logsCh := mc.StreamLogs(ctx, 200) + + _, open := <-logsCh + require.False(t, open, "logs channel should be closed after reconnection attempts limit") +} + +type fatalHook struct { + mu sync.Mutex + called bool +} + +func (hook *fatalHook) OnWrite(*zapcore.CheckedEntry, []zapcore.Field) { + hook.mu.Lock() + defer hook.mu.Unlock() + hook.called = true +} + +// dummySubscription is a stub implementing ethereum.Subscription. +type dummySubscription struct{} + +func (d *dummySubscription) Unsubscribe() {} +func (d *dummySubscription) Err() <-chan error { + return make(chan error) +} diff --git a/eth/executionclient/options.go b/eth/executionclient/options.go index 0fed3ba73f..c8e8d134e9 100644 --- a/eth/executionclient/options.go +++ b/eth/executionclient/options.go @@ -9,6 +9,9 @@ import ( // Option defines an ExecutionClient configuration option. type Option func(*ExecutionClient) +// OptionMulti defines a MultiClient configuration option. +type OptionMulti func(client *MultiClient) + // WithLogger enables logging. func WithLogger(logger *zap.Logger) Option { return func(s *ExecutionClient) { @@ -16,6 +19,13 @@ func WithLogger(logger *zap.Logger) Option { } } +// WithLoggerMulti enables logging. +func WithLoggerMulti(logger *zap.Logger) OptionMulti { + return func(s *MultiClient) { + s.logger = logger.Named("execution_client_multi") + } +} + // WithFollowDistance sets finalization offset (a block at this offset into the past // from the head block will be considered as very likely finalized). func WithFollowDistance(offset uint64) Option { @@ -24,6 +34,14 @@ func WithFollowDistance(offset uint64) Option { } } +// WithFollowDistanceMulti sets finalization offset (a block at this offset into the past +// from the head block will be considered as very likely finalized). +func WithFollowDistanceMulti(offset uint64) OptionMulti { + return func(s *MultiClient) { + s.followDistance = offset + } +} + // WithConnectionTimeout sets timeout for network connection to eth1 node. func WithConnectionTimeout(timeout time.Duration) Option { return func(s *ExecutionClient) { @@ -31,6 +49,13 @@ func WithConnectionTimeout(timeout time.Duration) Option { } } +// WithConnectionTimeoutMulti sets timeout for network connection to eth1 node. +func WithConnectionTimeoutMulti(timeout time.Duration) OptionMulti { + return func(s *MultiClient) { + s.connectionTimeout = timeout + } +} + // WithReconnectionInitialInterval sets initial reconnection interval. func WithReconnectionInitialInterval(interval time.Duration) Option { return func(s *ExecutionClient) { @@ -38,6 +63,13 @@ func WithReconnectionInitialInterval(interval time.Duration) Option { } } +// WithReconnectionInitialIntervalMulti sets initial reconnection interval. +func WithReconnectionInitialIntervalMulti(interval time.Duration) OptionMulti { + return func(s *MultiClient) { + s.reconnectionInitialInterval = interval + } +} + // WithReconnectionMaxInterval sets max reconnection interval. func WithReconnectionMaxInterval(interval time.Duration) Option { return func(s *ExecutionClient) { @@ -45,6 +77,27 @@ func WithReconnectionMaxInterval(interval time.Duration) Option { } } +// WithReconnectionMaxIntervalMulti sets max reconnection interval. +func WithReconnectionMaxIntervalMulti(interval time.Duration) OptionMulti { + return func(s *MultiClient) { + s.reconnectionMaxInterval = interval + } +} + +// WithHealthInvalidationInterval sets health invalidation interval. 0 disables caching. +func WithHealthInvalidationInterval(interval time.Duration) Option { + return func(s *ExecutionClient) { + s.healthInvalidationInterval = interval + } +} + +// WithHealthInvalidationIntervalMulti sets health invalidation interval. +func WithHealthInvalidationIntervalMulti(interval time.Duration) OptionMulti { + return func(s *MultiClient) { + s.healthInvalidationInterval = interval + } +} + // WithLogBatchSize sets log batch size. func WithLogBatchSize(size uint64) Option { return func(s *ExecutionClient) { @@ -52,9 +105,23 @@ func WithLogBatchSize(size uint64) Option { } } +// WithLogBatchSizeMulti sets log batch size. +func WithLogBatchSizeMulti(size uint64) OptionMulti { + return func(s *MultiClient) { + s.logBatchSize = size + } +} + // WithSyncDistanceTolerance sets the number of blocks that is acceptable to lag behind. func WithSyncDistanceTolerance(count uint64) Option { return func(s *ExecutionClient) { s.syncDistanceTolerance = count } } + +// WithSyncDistanceToleranceMulti sets the number of blocks that is acceptable to lag behind. +func WithSyncDistanceToleranceMulti(count uint64) OptionMulti { + return func(s *MultiClient) { + s.syncDistanceTolerance = count + } +} diff --git a/operator/duties/scheduler.go b/operator/duties/scheduler.go index 5362191ccd..dd470f1f81 100644 --- a/operator/duties/scheduler.go +++ b/operator/duties/scheduler.go @@ -14,9 +14,9 @@ import ( ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/prysmaticlabs/prysm/v4/async/event" "github.com/sourcegraph/conc/pool" + spectypes "github.com/ssvlabs/ssv-spec/types" "go.uber.org/zap" - spectypes "github.com/ssvlabs/ssv-spec/types" "github.com/ssvlabs/ssv/beacon/goclient" "github.com/ssvlabs/ssv/logging" "github.com/ssvlabs/ssv/logging/fields" diff --git a/operator/node.go b/operator/node.go index 22b44a0f1f..6bdc12c51d 100644 --- a/operator/node.go +++ b/operator/node.go @@ -31,7 +31,7 @@ type Options struct { CustomDomainType string `yaml:"CustomDomainType" env:"CUSTOM_DOMAIN_TYPE" env-default:"" env-description:"Override the SSV domain type. This is used to isolate the node from the rest of the network. Do not set unless you know what you are doing. This would be incremented by 1 for Alan, for example: 0x01020304 becomes 0x01020305 post-fork."` Network networkconfig.NetworkConfig BeaconNode beaconprotocol.BeaconNode // TODO: consider renaming to ConsensusClient - ExecutionClient *executionclient.ExecutionClient + ExecutionClient executionclient.Provider P2PNetwork network.P2PNetwork Context context.Context DB basedb.Database @@ -49,7 +49,7 @@ type Node struct { validatorsCtrl validator.Controller validatorOptions validator.ControllerOptions consensusClient beaconprotocol.BeaconNode - executionClient *executionclient.ExecutionClient + executionClient executionclient.Provider net network.P2PNetwork storage storage.Storage qbftStorage *qbftstorage.ParticipantStores diff --git a/protocol/v2/blockchain/beacon/client.go b/protocol/v2/blockchain/beacon/client.go index 74deb615a2..7bbbf1212c 100644 --- a/protocol/v2/blockchain/beacon/client.go +++ b/protocol/v2/blockchain/beacon/client.go @@ -63,7 +63,7 @@ type BeaconNode interface { type Options struct { Context context.Context Network Network - BeaconNodeAddr string `yaml:"BeaconNodeAddr" env:"BEACON_NODE_ADDR" env-required:"true"` + BeaconNodeAddr string `yaml:"BeaconNodeAddr" env:"BEACON_NODE_ADDR" env-required:"true" env-description:"Beacon node address. Supports multiple comma-separated addresses'"` SyncDistanceTolerance uint64 `yaml:"SyncDistanceTolerance" env:"BEACON_SYNC_DISTANCE_TOLERANCE" env-default:"4" env-description:"The number of out-of-sync slots we can tolerate"` CommonTimeout time.Duration // Optional.