diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000000..d91d61ebbb --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,7 @@ +version: 2 + +updates: + - package-ecosystem: github-actions + directory: / + schedule: + interval: weekly \ No newline at end of file diff --git a/.github/workflows/spec-alignment.yml b/.github/workflows/spec-alignment.yml index 2bd8418be9..dafe6d3914 100644 --- a/.github/workflows/spec-alignment.yml +++ b/.github/workflows/spec-alignment.yml @@ -28,7 +28,7 @@ jobs: - name: Upload output.diff if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: output.diff path: ./scripts/spec-alignment/output.diff diff --git a/eth/eventsyncer/event_syncer.go b/eth/eventsyncer/event_syncer.go index 06498834f9..ee6cb55b9a 100644 --- a/eth/eventsyncer/event_syncer.go +++ b/eth/eventsyncer/event_syncer.go @@ -105,36 +105,55 @@ func (es *EventSyncer) blockBelowThreshold(ctx context.Context, block *big.Int) // SyncHistory reads and processes historical events since the given fromBlock. func (es *EventSyncer) SyncHistory(ctx context.Context, fromBlock uint64) (lastProcessedBlock uint64, err error) { - fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock) - if errors.Is(err, executionclient.ErrNothingToSync) { - // Nothing to sync, should keep ongoing sync from the given fromBlock. - return 0, executionclient.ErrNothingToSync + const maxTries = 3 + var prevProcessedBlock uint64 + for i := 0; i < maxTries; i++ { + fetchLogs, fetchError, err := es.executionClient.FetchHistoricalLogs(ctx, fromBlock) + if errors.Is(err, executionclient.ErrNothingToSync) { + // Nothing to sync, should keep ongoing sync from the given fromBlock. + return 0, executionclient.ErrNothingToSync + } + if err != nil { + return 0, fmt.Errorf("failed to fetch historical events: %w", err) + } + + lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false) + if err != nil { + return 0, fmt.Errorf("handle historical block events: %w", err) + } + // TODO: (Alan) should it really be here? + if err := <-fetchError; err != nil { + return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err) + } + if lastProcessedBlock == 0 { + return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0") + } + if lastProcessedBlock < fromBlock { + // Event replay: this should never happen! + return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock) + } + + if lastProcessedBlock == prevProcessedBlock { + // Not advancing, so can't sync any further. + break + } + prevProcessedBlock = lastProcessedBlock + + err = es.blockBelowThreshold(ctx, new(big.Int).SetUint64(lastProcessedBlock)) + if err == nil { + // Successfully synced up to a fresh block. + es.logger.Info("finished syncing historical events", + zap.Uint64("from_block", fromBlock), + zap.Uint64("last_processed_block", lastProcessedBlock)) + + return lastProcessedBlock, nil + } + + fromBlock = lastProcessedBlock + 1 + es.logger.Info("finished syncing up to a stale block, resuming", zap.Uint64("from_block", fromBlock)) } - if err != nil { - return 0, fmt.Errorf("failed to fetch historical events: %w", err) - } - - lastProcessedBlock, err = es.eventHandler.HandleBlockEventsStream(ctx, fetchLogs, false) - if err != nil { - return 0, fmt.Errorf("handle historical block events: %w", err) - } - // TODO: (Alan) should it really be here? - if err := <-fetchError; err != nil { - return 0, fmt.Errorf("error occurred while fetching historical logs: %w", err) - } - if lastProcessedBlock == 0 { - return 0, fmt.Errorf("handle historical block events: lastProcessedBlock is 0") - } - if lastProcessedBlock < fromBlock { - // Event replay: this should never happen! - return 0, fmt.Errorf("event replay: lastProcessedBlock (%d) is lower than fromBlock (%d)", lastProcessedBlock, fromBlock) - } - - es.logger.Info("finished syncing historical events", - zap.Uint64("from_block", fromBlock), - zap.Uint64("last_processed_block", lastProcessedBlock)) - return lastProcessedBlock, nil + return 0, fmt.Errorf("highest block is too old (%d)", lastProcessedBlock) } // SyncOngoing streams and processes ongoing events as they come since the given fromBlock. diff --git a/network/discovery/dv5_service.go b/network/discovery/dv5_service.go index 8a0fc7b585..f26bd98b09 100644 --- a/network/discovery/dv5_service.go +++ b/network/discovery/dv5_service.go @@ -80,7 +80,12 @@ func newDiscV5Service(pctx context.Context, logger *zap.Logger, discOpts *Option publishLock: make(chan struct{}, 1), } - logger.Debug("configuring discv5 discovery", zap.Any("discOpts", discOpts)) + logger.Debug( + "configuring discv5 discovery", + zap.Any("discV5Opts", discOpts.DiscV5Opts), + zap.Any("hostAddress", discOpts.HostAddress), + zap.Any("hostDNS", discOpts.HostDNS), + ) if err := dvs.initDiscV5Listener(logger, discOpts); err != nil { return nil, err } diff --git a/network/p2p/config.go b/network/p2p/config.go index bbfb7d3ae7..74149daeba 100644 --- a/network/p2p/config.go +++ b/network/p2p/config.go @@ -40,8 +40,8 @@ type Config struct { TCPPort uint16 `yaml:"TcpPort" env:"TCP_PORT" env-default:"13001" env-description:"TCP port for p2p transport"` UDPPort uint16 `yaml:"UdpPort" env:"UDP_PORT" env-default:"12001" env-description:"UDP port for discovery"` - HostAddress string `yaml:"HostAddress" env:"HOST_ADDRESS" env-description:"External ip node is exposed for discovery"` - HostDNS string `yaml:"HostDNS" env:"HOST_DNS" env-description:"External DNS node is exposed for discovery"` + HostAddress string `yaml:"HostAddress" env:"HOST_ADDRESS" env-description:"External ip node is exposed for discovery, can be overridden by HostDNS"` + HostDNS string `yaml:"HostDNS" env:"HOST_DNS" env-description:"External DNS node is exposed for discovery, overrides HostAddress if both are specified"` RequestTimeout time.Duration `yaml:"RequestTimeout" env:"P2P_REQUEST_TIMEOUT" env-default:"10s"` MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"25" env-description:"Maximum number of returned objects in a batch"` @@ -149,10 +149,12 @@ func (c *Config) configureAddrs(logger *zap.Logger, opts []libp2p.Option) ([]lib } opts = append(opts, libp2p.ListenAddrs(addrs...)) - // AddrFactory for host address if provided - if c.HostAddress != "" { + // note, only one of (HostDNS, HostAddress) can be used with libp2p - if multiple of these + // are set we have to prioritize between them. + if c.HostDNS != "" { + // AddrFactory for DNS address if provided opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - external, err := commons.BuildMultiAddress(c.HostAddress, "tcp", uint(c.TCPPort), "") + external, err := ma.NewMultiaddr(fmt.Sprintf("/dns4/%s/tcp/%d", c.HostDNS, c.TCPPort)) if err != nil { logger.Error("unable to create external multiaddress", zap.Error(err)) } else { @@ -160,13 +162,12 @@ func (c *Config) configureAddrs(logger *zap.Logger, opts []libp2p.Option) ([]lib } return addrs })) - } - // AddrFactory for DNS address if provided - if c.HostDNS != "" { + } else if c.HostAddress != "" { + // AddrFactory for host address if provided opts = append(opts, libp2p.AddrsFactory(func(addrs []ma.Multiaddr) []ma.Multiaddr { - external, err := ma.NewMultiaddr(fmt.Sprintf("/dns4/%s/tcp/%d", c.HostDNS, c.TCPPort)) + external, err := commons.BuildMultiAddress(c.HostAddress, "tcp", uint(c.TCPPort), "") if err != nil { - logger.Warn("unable to create external multiaddress", zap.Error(err)) + logger.Error("unable to create external multiaddress", zap.Error(err)) } else { addrs = append(addrs, external) } diff --git a/operator/duties/sync_committee_test.go b/operator/duties/sync_committee_test.go index 1ad8cdd49a..371ffb1f94 100644 --- a/operator/duties/sync_committee_test.go +++ b/operator/duties/sync_committee_test.go @@ -685,7 +685,7 @@ func TestScheduler_SyncCommittee_Early_Block(t *testing.T) { } scheduler.HandleHeadEvent(logger)(e) waitForDutiesExecution(t, logger, fetchDutiesCall, executeDutiesCall, timeout, expected) - require.Greater(t, time.Since(startTime), scheduler.network.SlotDuration()/3) + require.Greater(t, time.Since(startTime), time.Duration(float64(scheduler.network.SlotDuration()/3)*0.90)) // 10% margin due to flakiness of the test // Stop scheduler & wait for graceful exit. cancel()