diff --git a/eth/eventsyncer/event_syncer.go b/eth/eventsyncer/event_syncer.go index 06498834f9..4ee6335efc 100644 --- a/eth/eventsyncer/event_syncer.go +++ b/eth/eventsyncer/event_syncer.go @@ -105,29 +105,45 @@ func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int) // SyncHistory reads and processes historical events since the given fromBlock. func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastProcessedBlock uint64, err error) { - fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock) - if errors.Is(err, executionclient.ErrNothingToSync) { - // Nothing to sync, should keep ongoing sync from the given fromBlock. - return 0, executionclient.ErrNothingToSync - } - if err != nil { - return 0, fmt.Errorf("failed to fetch historical events: %w", err) - } - - lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false) - if err != nil { - return 0, fmt.Errorf("handle historical block events: %w", err) - } - // TODO: (Alan) should it really be here? - if err := <-fetchError; err != nil { - return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err) - } - if lastProcessedBlock == 0 { - return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0") - } - if lastProcessedBlock < fromBlock { - // Event replay: this should never happen! - return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock) + var prevProcessedBlock uint64 + for { + fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock) + if errors.Is(err, executionclient.ErrNothingToSync) { + // Nothing to sync, should keep ongoing sync from the given fromBlock. + return 0, executionclient.ErrNothingToSync + } + if err != nil { + return 0, fmt.Errorf("failed to fetch historical events: %w", err) + } + + lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false) + if err != nil { + return 0, fmt.Errorf("handle historical block events: %w", err) + } + // TODO: (Alan) should it really be here? + if err := <-fetchError; err != nil { + return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err) + } + if lastProcessedBlock == 0 { + return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0") + } + if lastProcessedBlock < fromBlock { + // Event replay: this should never happen! + return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock) + } + + if lastProcessedBlock == prevProcessedBlock { + break + } + prevProcessedBlock = lastProcessedBlock + + err = es.blockBelowThreshold(ctx, new(big.Int).SetUint64(lastProcessedBlock)) + if err == nil { + break + } + + fromBlock = lastProcessedBlock + 1 + es.logger.Info("finished syncing on an old block, retrying", zap.Uint64("from_block", fromBlock)) } es.logger.Info("finished syncing historical events",