Skip to content

Commit

Permalink
draft
Browse files Browse the repository at this point in the history
  • Loading branch information
moshe-blox committed Jan 21, 2025
1 parent de770f6 commit 3976971
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions eth/eventsyncer/event_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,9 @@ func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int)

// SyncHistory reads and processes historical events since the given fromBlock.
func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastProcessedBlock uint64, err error) {
const maxTries = 3
var prevProcessedBlock uint64
for {
for i := 0; i < maxTries; i++ {
fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock)
if errors.Is(err, executionclient.ErrNothingToSync) {
// Nothing to sync, should keep ongoing sync from the given fromBlock.
Expand All @@ -132,25 +133,27 @@ func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastP
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
}

Check warning on line 134 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L132-L134

Added lines #L132 - L134 were not covered by tests

if lastProcessedBlock == prevProcessedBlock {
break
}
prevProcessedBlock = lastProcessedBlock

err = es.blockBelowThreshold(ctx, new(big.Int).SetUint64(lastProcessedBlock))
if err == nil {
// Successfully synced up to a fresh block.
es.logger.Info("finished syncing historical events",
zap.Uint64("from_block", fromBlock),
zap.Uint64("last_processed_block", lastProcessedBlock))

return lastProcessedBlock, nil
}

if lastProcessedBlock == prevProcessedBlock {
// Not advancing, so can't sync any further.
break

Check warning on line 148 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L146-L148

Added lines #L146 - L148 were not covered by tests
}
prevProcessedBlock = lastProcessedBlock

fromBlock = lastProcessedBlock + 1
es.logger.Info("finished syncing on an old block, retrying", zap.Uint64("from_block", fromBlock))
es.logger.Info("finished syncing up to a stale block, resuming", zap.Uint64("from_block", fromBlock))

Check warning on line 153 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L150-L153

Added lines #L150 - L153 were not covered by tests
}

es.logger.Info("finished syncing historical events",
zap.Uint64("from_block", fromBlock),
zap.Uint64("last_processed_block", lastProcessedBlock))

return lastProcessedBlock, nil
return 0, fmt.Errorf("highest block is too old (%d)", lastProcessedBlock)

Check warning on line 156 in eth/eventsyncer/event_syncer.go

View check run for this annotation

Codecov / codecov/patch

eth/eventsyncer/event_syncer.go#L156

Added line #L156 was not covered by tests
}

// SyncOngoing streams and processes ongoing events as they come since the given fromBlock.
Expand Down

0 comments on commit 3976971

Please sign in to comment.