Skip to content

Commit

Permalink
retry if historical sync finished on old block
Browse files Browse the repository at this point in the history
  • Loading branch information
anatolie-ssv committed Jan 17, 2025
1 parent 6b27b3a commit de770f6
Showing 1 changed file with 39 additions and 23 deletions.
62 changes: 39 additions & 23 deletions eth/eventsyncer/event_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,29 +105,45 @@ func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int)

// SyncHistory reads and processes historical events since the given fromBlock.
func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastProcessedBlock uint64, err error) {
fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock)
if errors.Is(err, executionclient.ErrNothingToSync) {
// Nothing to sync, should keep ongoing sync from the given fromBlock.
return 0, executionclient.ErrNothingToSync
}
if err != nil {
return 0, fmt.Errorf("failed to fetch historical events: %w", err)
}

lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false)
if err != nil {
return 0, fmt.Errorf("handle historical block events: %w", err)
}
// TODO: (Alan) should it really be here?
if err := <-fetchError; err != nil {
return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err)
}
if lastProcessedBlock == 0 {
return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0")
}
if lastProcessedBlock < fromBlock {
// Event replay: this should never happen!
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
var prevProcessedBlock uint64
for {
fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock)
if errors.Is(err, executionclient.ErrNothingToSync) {
// Nothing to sync, should keep ongoing sync from the given fromBlock.
return 0, executionclient.ErrNothingToSync
}

Check warning on line 114 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L112-L114

Added lines #L112 - L114 were not covered by tests
if err != nil {
return 0, fmt.Errorf("failed to fetch historical events: %w", err)
}

Check warning on line 117 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L116-L117

Added lines #L116 - L117 were not covered by tests

lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false)
if err != nil {
return 0, fmt.Errorf("handle historical block events: %w", err)
}

Check warning on line 122 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L121-L122

Added lines #L121 - L122 were not covered by tests
// TODO: (Alan) should it really be here?
if err := <-fetchError; err != nil {
return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err)
}

Check warning on line 126 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L125-L126

Added lines #L125 - L126 were not covered by tests
if lastProcessedBlock == 0 {
return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0")
}

Check warning on line 129 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L128-L129

Added lines #L128 - L129 were not covered by tests
if lastProcessedBlock < fromBlock {
// Event replay: this should never happen!
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
}

Check warning on line 133 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L131-L133

Added lines #L131 - L133 were not covered by tests

if lastProcessedBlock == prevProcessedBlock {
break

Check warning on line 136 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L136

Added line #L136 was not covered by tests
}
prevProcessedBlock = lastProcessedBlock

err = es.blockBelowThreshold(ctx, new(big.Int).SetUint64(lastProcessedBlock))
if err == nil {
break
}

fromBlock = lastProcessedBlock + 1
es.logger.Info("finished syncing on an old block, retrying", zap.Uint64("from_block", fromBlock))

Check warning on line 146 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L145-L146

Added lines #L145 - L146 were not covered by tests
}

es.logger.Info("finished syncing historical events",
Expand Down

0 comments on commit de770f6

Please sign in to comment.