Skip to content

Commit

Permalink
Merge branch 'stage' into override-spec-beacon-config
Browse files Browse the repository at this point in the history
# Conflicts:
#	operator/duties/sync_committee_test.go
  • Loading branch information
nkryuchkov committed Jan 24, 2025
2 parents a9d701e + 9ad77c8 commit c5454c9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 41 deletions.
7 changes: 7 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: 2

updates:
- package-ecosystem: github-actions
directory: /
schedule:
interval: weekly
2 changes: 1 addition & 1 deletion .github/workflows/spec-alignment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:

- name: Upload output.diff
if: failure()
uses: actions/upload-artifact@v3
uses: actions/upload-artifact@v4
with:
name: output.diff
path: ./scripts/spec-alignment/output.diff
75 changes: 47 additions & 28 deletions eth/eventsyncer/event_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,36 +105,55 @@ func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int)

// SyncHistory reads and processes historical events since the given fromBlock.
func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastProcessedBlock uint64, err error) {
fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock)
if errors.Is(err, executionclient.ErrNothingToSync) {
// Nothing to sync, should keep ongoing sync from the given fromBlock.
return 0, executionclient.ErrNothingToSync
const maxTries = 3
var prevProcessedBlock uint64
for i := 0; i < maxTries; i++ {
fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock)
if errors.Is(err, executionclient.ErrNothingToSync) {
// Nothing to sync, should keep ongoing sync from the given fromBlock.
return 0, executionclient.ErrNothingToSync
}
if err != nil {
return 0, fmt.Errorf("failed to fetch historical events: %w", err)
}

lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false)
if err != nil {
return 0, fmt.Errorf("handle historical block events: %w", err)
}
// TODO: (Alan) should it really be here?
if err := <-fetchError; err != nil {
return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err)
}
if lastProcessedBlock == 0 {
return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0")
}
if lastProcessedBlock < fromBlock {
// Event replay: this should never happen!
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
}

if lastProcessedBlock == prevProcessedBlock {
// Not advancing, so can't sync any further.
break
}
prevProcessedBlock = lastProcessedBlock

err = es.blockBelowThreshold(ctx, new(big.Int).SetUint64(lastProcessedBlock))
if err == nil {
// Successfully synced up to a fresh block.
es.logger.Info("finished syncing historical events",
zap.Uint64("from_block", fromBlock),
zap.Uint64("last_processed_block", lastProcessedBlock))

return lastProcessedBlock, nil
}

fromBlock = lastProcessedBlock + 1
es.logger.Info("finished syncing up to a stale block, resuming", zap.Uint64("from_block", fromBlock))
}
if err != nil {
return 0, fmt.Errorf("failed to fetch historical events: %w", err)
}

lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false)
if err != nil {
return 0, fmt.Errorf("handle historical block events: %w", err)
}
// TODO: (Alan) should it really be here?
if err := <-fetchError; err != nil {
return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err)
}
if lastProcessedBlock == 0 {
return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0")
}
if lastProcessedBlock < fromBlock {
// Event replay: this should never happen!
return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock)
}

es.logger.Info("finished syncing historical events",
zap.Uint64("from_block", fromBlock),
zap.Uint64("last_processed_block", lastProcessedBlock))

return lastProcessedBlock, nil
return 0, fmt.Errorf("highest block is too old (%d)", lastProcessedBlock)
}

// SyncOngoing streams and processes ongoing events as they come since the given fromBlock.
Expand Down
7 changes: 6 additions & 1 deletion network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,12 @@ func newDiscV5Service(pctx context.Context, logger *zap.Logger, discOpts *Option
publishLock: make(chan struct{}, 1),
}

logger.Debug("configuring discv5 discovery", zap.Any("discOpts", discOpts))
logger.Debug(
"configuring discv5 discovery",
zap.Any("discV5Opts", discOpts.DiscV5Opts),
zap.Any("hostAddress", discOpts.HostAddress),
zap.Any("hostDNS", discOpts.HostDNS),
)
if err := dvs.initDiscV5Listener(logger, discOpts); err != nil {
return nil, err
}
Expand Down
21 changes: 11 additions & 10 deletions network/p2p/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ type Config struct {

TCPPort uint16 `yaml:"TcpPort" env:"TCP_PORT" env-default:"13001" env-description:"TCP port for p2p transport"`
UDPPort uint16 `yaml:"UdpPort" env:"UDP_PORT" env-default:"12001" env-description:"UDP port for discovery"`
HostAddress string `yaml:"HostAddress" env:"HOST_ADDRESS" env-description:"External ip node is exposed for discovery"`
HostDNS string `yaml:"HostDNS" env:"HOST_DNS" env-description:"External DNS node is exposed for discovery"`
HostAddress string `yaml:"HostAddress" env:"HOST_ADDRESS" env-description:"External ip node is exposed for discovery, can be overridden by HostDNS"`
HostDNS string `yaml:"HostDNS" env:"HOST_DNS" env-description:"External DNS node is exposed for discovery, overrides HostAddress if both are specified"`

RequestTimeout time.Duration `yaml:"RequestTimeout" env:"P2P_REQUEST_TIMEOUT" env-default:"10s"`
MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"25" env-description:"Maximum number of returned objects in a batch"`
Expand Down Expand Up @@ -149,24 +149,25 @@ func (c *Config) configureAddrs(logger *zap.Logger, opts []libp2p.Option) ([]lib
}
opts = append(opts, libp2p.ListenAddrs(addrs...))

// AddrFactory for host address if provided
if c.HostAddress != "" {
// note, only one of (HostDNS, HostAddress) can be used with libp2p - if multiple of these
// are set we have to prioritize between them.
if c.HostDNS != "" {
// AddrFactory for DNS address if provided
opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
external, err := commons.BuildMultiAddress(c.HostAddress, "tcp", uint(c.TCPPort), "")
external, err := ma.NewMultiaddr(fmt.Sprintf("/dns4/%s/tcp/%d", c.HostDNS, c.TCPPort))
if err != nil {
logger.Error("unable to create external multiaddress", zap.Error(err))
} else {
addrs = append(addrs, external)
}
return addrs
}))
}
// AddrFactory for DNS address if provided
if c.HostDNS != "" {
} else if c.HostAddress != "" {
// AddrFactory for host address if provided
opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr {
external, err := ma.NewMultiaddr(fmt.Sprintf("/dns4/%s/tcp/%d", c.HostDNS, c.TCPPort))
external, err := commons.BuildMultiAddress(c.HostAddress, "tcp", uint(c.TCPPort), "")
if err != nil {
logger.Warn("unable to create external multiaddress", zap.Error(err))
logger.Error("unable to create external multiaddress", zap.Error(err))
} else {
addrs = append(addrs, external)
}
Expand Down
2 changes: 1 addition & 1 deletion operator/duties/sync_committee_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func TestScheduler_SyncCommittee_Early_Block(t *testing.T) {
}
scheduler.HandleHeadEvent(logger)(e)
waitForDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected)
require.Greater(t, time.Since(startTime), scheduler.network.SlotDuration()/3)
require.Greater(t, time.Since(startTime), time.Duration(float64(scheduler.network.SlotDuration()/3)*0.90)) // 10% margin due to flakiness of the test

// Stop scheduler & wait for graceful exit.
cancel()
Expand Down

0 comments on commit c5454c9

Please sign in to comment.