Skip to content

Commit

Permalink
iterate clients forever in StreamLogs
Browse files Browse the repository at this point in the history
  • Loading branch information
nkryuchkov committed Jan 27, 2025
1 parent ab91668 commit 99405c9
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
26 changes: 18 additions & 8 deletions eth/executionclient/multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package executionclient
import (
"context"
"fmt"
"math"
"math/big"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -192,7 +193,7 @@ func (mc *MultiClient) FetchHistoricalLogs(ctx context.Context, fromBlock uint64
return nil, nil
}

_, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f)
_, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f, false)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -233,7 +234,7 @@ func (mc *MultiClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan
return nil, nil
}

_, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f)
_, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f, true)
if err != nil && !errors.Is(err, ErrClosed) && !errors.Is(err, context.Canceled) {
// NOTE: There are unit tests that trigger Fatal and override its behavior.
// Therefore, the code must call `return` afterward.
Expand Down Expand Up @@ -288,7 +289,7 @@ func (mc *MultiClient) BlockByNumber(ctx context.Context, blockNumber *big.Int)
f := func(client SingleClientProvider) (any, error) {
return client.BlockByNumber(ctx, blockNumber)
}
res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f)
res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f, false)
if err != nil {
return nil, err
}
Expand All @@ -301,7 +302,7 @@ func (mc *MultiClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int)
f := func(client SingleClientProvider) (any, error) {
return client.HeaderByNumber(ctx, blockNumber)
}
res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f)
res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f, false)
if err != nil {
return nil, err
}
Expand All @@ -313,7 +314,7 @@ func (mc *MultiClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filte
f := func(client SingleClientProvider) (any, error) {
return client.SubscribeFilterLogs(ctx, q, ch)
}
res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f)
res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f, false)
if err != nil {
return nil, err
}
Expand All @@ -325,7 +326,7 @@ func (mc *MultiClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (
f := func(client SingleClientProvider) (any, error) {
return client.FilterLogs(ctx, q)
}
res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f)
res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -361,7 +362,12 @@ func (mc *MultiClient) Close() error {
return multiErr
}

func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error)) (any, error) {
// call calls f for all clients until it succeeds.
// If forever is false, it tries all clients only once and if no client is available then it returns an error.
// If forever is true, it iterates clients forever.
// It's used in StreamLogs because it's called once per the node lifetime,
// and it's possible that clients go up and down several times, therefore there's no limit.
func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error), forever bool) (any, error) {
if len(mc.clients) == 1 {
return f(mc.clients[0]) // no need for mutex because one client is always non-nil
}
Expand All @@ -370,7 +376,11 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi
// starting from the most likely healthy client (currentClientIndex).
startingIndex := int(mc.currentClientIndex.Load())
var allErrs error
for i := range mc.clients {
limit := len(mc.clients)
if forever {
limit = math.MaxInt32
}
for i := 0; i < limit; i++ {
clientIndex := (startingIndex + i) % len(mc.clients)
nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging.

Expand Down
2 changes: 1 addition & 1 deletion eth/executionclient/multi_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ func TestMultiClient_Call_AllClientsFail(t *testing.T) {
return client.streamLogsToChan(context.TODO(), nil, 200)
}

_, err := mc.call(context.Background(), f)
_, err := mc.call(context.Background(), f, false)
require.Error(t, err)
require.Contains(t, err.Error(), "all clients failed")
}
Expand Down

0 comments on commit 99405c9

Please sign in to comment.