diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 28a73b925c..6cde980607 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -3,6 +3,7 @@ package executionclient import ( "context" "fmt" + "math" "math/big" "sync" "sync/atomic" @@ -192,7 +193,7 @@ func (mc *MultiClient) FetchHistoricalLogs(ctx context.Context, fromBlock uint64 return nil, nil } - _, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f) + _, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f, false) if err != nil { return nil, nil, err } @@ -233,7 +234,7 @@ func (mc *MultiClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan return nil, nil } - _, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f) + _, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f, true) if err != nil && !errors.Is(err, ErrClosed) && !errors.Is(err, context.Canceled) { // NOTE: There are unit tests that trigger Fatal and override its behavior. // Therefore, the code must call `return` afterward. @@ -288,7 +289,7 @@ func (mc *MultiClient) BlockByNumber(ctx context.Context, blockNumber *big.Int) f := func(client SingleClientProvider) (any, error) { return client.BlockByNumber(ctx, blockNumber) } - res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f) + res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f, false) if err != nil { return nil, err } @@ -301,7 +302,7 @@ func (mc *MultiClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) f := func(client SingleClientProvider) (any, error) { return client.HeaderByNumber(ctx, blockNumber) } - res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f) + res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f, false) if err != nil { return nil, err } @@ -313,7 +314,7 @@ func (mc *MultiClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filte f := func(client SingleClientProvider) (any, error) { return client.SubscribeFilterLogs(ctx, q, ch) } - res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f) + res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f, false) if err != nil { return nil, err } @@ -325,7 +326,7 @@ func (mc *MultiClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ( f := func(client SingleClientProvider) (any, error) { return client.FilterLogs(ctx, q) } - res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f) + res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f, false) if err != nil { return nil, err } @@ -361,7 +362,12 @@ func (mc *MultiClient) Close() error { return multiErr } -func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error)) (any, error) { +// call calls f for all clients until it succeeds. +// If forever is false, it tries all clients only once and if no client is available then it returns an error. +// If forever is true, it iterates clients forever. +// It's used in StreamLogs because it's called once per the node lifetime, +// and it's possible that clients go up and down several times, therefore there's no limit. +func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error), forever bool) (any, error) { if len(mc.clients) == 1 { return f(mc.clients[0]) // no need for mutex because one client is always non-nil } @@ -370,7 +376,11 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi // starting from the most likely healthy client (currentClientIndex). startingIndex := int(mc.currentClientIndex.Load()) var allErrs error - for i := range mc.clients { + limit := len(mc.clients) + if forever { + limit = math.MaxInt32 + } + for i := 0; i < limit; i++ { clientIndex := (startingIndex + i) % len(mc.clients) nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging. diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go index 8e85a8a482..c0aca9852a 100644 --- a/eth/executionclient/multi_client_test.go +++ b/eth/executionclient/multi_client_test.go @@ -1330,7 +1330,7 @@ func TestMultiClient_Call_AllClientsFail(t *testing.T) { return client.streamLogsToChan(context.TODO(), nil, 200) } - _, err := mc.call(context.Background(), f) + _, err := mc.call(context.Background(), f, false) require.Error(t, err) require.Contains(t, err.Error(), "all clients failed") }