Skip to content

Commit

Permalink
concurrent processing of get blocks. batches by 50 default
Browse files Browse the repository at this point in the history
  • Loading branch information
gatsbyz committed Jan 3, 2024
1 parent dc40b6d commit acc52ae
Showing 1 changed file with 51 additions and 15 deletions.
66 changes: 51 additions & 15 deletions cmd/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var (
zero = big.NewInt(0)
observedPendingTxs historicalRange
maxDataPoints = 1000
maxConcurrency = 10
semaphore = make(chan struct{}, maxConcurrency)
)

type (
Expand Down Expand Up @@ -277,26 +279,60 @@ func (ms *monitorStatus) getBlockRange(ctx context.Context, to *big.Int, rpc *et
return nil
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 3 * time.Minute
retryable := func() error {
return rpc.BatchCallContext(ctx, blms)
}
if err := backoff.Retry(retryable, b); err != nil {
err := ms.processBatchesConcurrently(ctx, rpc, blms)
if err != nil {
log.Error().Err(err).Msg("Error processing batches concurrently")
return err
}

ms.BlocksLock.Lock()
defer ms.BlocksLock.Unlock()
for _, b := range blms {
if b.Error != nil {
continue
}
pb := rpctypes.NewPolyBlock(b.Result.(*rpctypes.RawBlockResponse))
ms.BlockCache.Add(pb.Number().String(), pb)
return nil
}

func (ms *monitorStatus) processBatchesConcurrently(ctx context.Context, rpc *ethrpc.Client, blms []ethrpc.BatchElem) error {
subBatchSize := 50
var wg sync.WaitGroup
var batchErr error
batchErrLock := sync.Mutex{}

for i := 0; i < len(blms); i += subBatchSize {
wg.Add(1)
semaphore <- struct{}{}
go func(i int) {
defer wg.Done()
end := i + subBatchSize
if end > len(blms) {
end = len(blms)
}
subBatch := blms[i:end]

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 3 * time.Minute
retryable := func() error {
return rpc.BatchCallContext(ctx, subBatch)
}
if err := backoff.Retry(retryable, b); err != nil {
batchErrLock.Lock()
if batchErr == nil {
batchErr = err
}
batchErrLock.Unlock()
}

for _, elem := range subBatch {
if elem.Error != nil {
log.Error().Str("Method", elem.Method).Interface("Args", elem.Args).Err(elem.Error).Msg("Failed batch element")
} else {
pb := rpctypes.NewPolyBlock(elem.Result.(*rpctypes.RawBlockResponse))
ms.BlockCache.Add(pb.Number().String(), pb)
}
}

<-semaphore
}(i)
}

return nil
wg.Wait()
return batchErr
}

func renderMonitorUI(ctx context.Context, ec *ethclient.Client, ms *monitorStatus, rpc *ethrpc.Client) error {
Expand Down

0 comments on commit acc52ae

Please sign in to comment.