diff --git a/eth/eventsyncer/event_syncer.go b/eth/eventsyncer/event_syncer.go index 4ee6335efc..fa7c2ea74c 100644 --- a/eth/eventsyncer/event_syncer.go +++ b/eth/eventsyncer/event_syncer.go @@ -105,8 +105,9 @@ func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int) // SyncHistory reads and processes historical events since the given fromBlock. func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastProcessedBlock uint64, err error) { + const maxTries = 3 var prevProcessedBlock uint64 - for { + for i := 0; i < maxTries; i++ { fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock) if errors.Is(err, executionclient.ErrNothingToSync) { // Nothing to sync, should keep ongoing sync from the given fromBlock. @@ -132,25 +133,27 @@ func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastP return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock) } - if lastProcessedBlock == prevProcessedBlock { - break - } - prevProcessedBlock = lastProcessedBlock - err = es.blockBelowThreshold(ctx, new(big.Int).SetUint64(lastProcessedBlock)) if err == nil { + // Successfully synced up to a fresh block. + es.logger.Info("finished syncing historical events", + zap.Uint64("from_block", fromBlock), + zap.Uint64("last_processed_block", lastProcessedBlock)) + + return lastProcessedBlock, nil + } + + if lastProcessedBlock == prevProcessedBlock { + // Not advancing, so can't sync any further. break } + prevProcessedBlock = lastProcessedBlock fromBlock = lastProcessedBlock + 1 - es.logger.Info("finished syncing on an old block, retrying", zap.Uint64("from_block", fromBlock)) + es.logger.Info("finished syncing up to a stale block, resuming", zap.Uint64("from_block", fromBlock)) } - es.logger.Info("finished syncing historical events", - zap.Uint64("from_block", fromBlock), - zap.Uint64("last_processed_block", lastProcessedBlock)) - - return lastProcessedBlock, nil + return 0, fmt.Errorf("highest block is too old (%d)", lastProcessedBlock) } // SyncOngoing streams and processes ongoing events as they come since the given fromBlock.