Skip to content

Commit

Permalink
Add debug log when downcoring a peer for bad response.
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae committed Dec 20, 2024
1 parent 96b31a9 commit 7e71977
Show file tree
Hide file tree
Showing 18 changed files with 86 additions and 61 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- `Finished building block`: Display error only if not nil.
- Added support to update target and max blob count to different values per hard fork config.
- Log before blob filesystem cache warm-up.
-
- Debug log when downscoring a peer for bad response reason.

### Changed

- Process light client finality updates only for new finalized epochs instead of doing it for every block.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (s *Service) listenForNewNodes() {
wg.Add(1)
go func(info *peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with peer")
}
wg.Done()
}(peerInfo)
Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/p2p/handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p
// Only log disconnections if we were fully connected.
if priorState == peers.Connected {
activePeersCount := len(s.peers.Active())
log.WithField("remainingActivePeers", activePeersCount).Debug("Peer disconnected")
log.WithFields(logrus.Fields{
"remainingActivePeers": activePeersCount,
"direction": conn.Stat().Direction.String(),
"peerID": peerID,
}).Debug("Peer disconnected")
}
}()
},
Expand Down
10 changes: 7 additions & 3 deletions beacon-chain/p2p/peers/scorers/bad_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,22 @@ func (s *BadResponsesScorer) countNoLock(pid peer.ID) (int, error) {

// Increment increments the number of bad responses we have received from the given remote peer.
// If peer doesn't exist this method is no-op.
func (s *BadResponsesScorer) Increment(pid peer.ID) {
func (s *BadResponsesScorer) Increment(pid peer.ID) int {
const defaultBadResponses = 1

s.store.Lock()
defer s.store.Unlock()

peerData, ok := s.store.PeerData(pid)
if !ok {
s.store.SetPeerData(pid, &peerdata.PeerData{
BadResponses: 1,
BadResponses: defaultBadResponses,
})
return
return defaultBadResponses
}
peerData.BadResponses++

return peerData.BadResponses
}

// IsBadPeer states if the peer is to be considered bad.
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func (s *Service) connectWithAllTrustedPeers(multiAddrs []multiaddr.Multiaddr) {
// make each dial non-blocking
go func(info peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with trusted peer")
}
}(info)
}
Expand All @@ -459,7 +459,7 @@ func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr) {
// make each dial non-blocking
go func(info peer.AddrInfo) {
if err := s.connectWithPeer(s.ctx, info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with peer")
}
}(info)
}
Expand All @@ -478,8 +478,8 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error
ctx, cancel := context.WithTimeout(ctx, maxDialTimeout)
defer cancel()
if err := s.host.Connect(ctx, info); err != nil {
s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
return err
score := s.Peers().Scorers().BadResponsesScorer().Increment(info.ID)
return errors.Wrapf(err, "connect to peer %s - new bad responses score: %d", info.ID, score)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/subnets.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode.
wg.Add(1)
go func() {
if err := s.connectWithPeer(ctx, *info); err != nil {
log.WithError(err).Tracef("Could not connect with peer %s", info.String())
log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with peer")
}

wg.Done()
Expand Down
8 changes: 2 additions & 6 deletions beacon-chain/sync/backfill/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ func (s *Service) importBatches(ctx context.Context) {
}
_, err := s.batchImporter(ctx, current, ib, s.store)
if err != nil {
log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import")
s.downscore(ib)
score := s.p2p.Peers().Scorers().BadResponsesScorer().Increment(ib.blockPid)
log.WithError(err).WithFields(ib.logFields()).WithField("newBlockPidBadResponsesScore", score).Debug("Backfill batch failed to import")
s.batchSeq.update(ib.withState(batchErrRetryable))
// If a batch fails, the subsequent batches are no longer considered importable.
break
Expand Down Expand Up @@ -330,10 +330,6 @@ func (s *Service) initBatches() error {
return nil
}

func (s *Service) downscore(b batch) {
s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid)
}

func (*Service) Stop() error {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/batch_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *Service) validateWithBatchVerifier(ctx context.Context, message string,
// If verification fails we fallback to individual verification
// of each signature set.
if resErr != nil {
log.WithError(resErr).Tracef("Could not perform batch verification of %s", message)
log.WithError(resErr).Debugf("Could not perform batch verification of %s", message)
verified, err := set.Verify()
if err != nil {
verErr := errors.Wrapf(err, "Could not verify %s", message)
Expand Down
6 changes: 4 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,10 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
}
if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) {
// Peer returned invalid data, penalize.
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid)
log.WithField("pid", response.pid).Debug("Peer is penalized for invalid blocks")
score := q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid)
log.
WithFields(logrus.Fields{"pid": response.pid, "newBadResponsesScore": score}).
Debug("Peer is penalized for invalid blocks")
}
return m.state, response.err
}
Expand Down
9 changes: 4 additions & 5 deletions beacon-chain/sync/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,8 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error {
amt = 1
}
if amt > uint64(remaining) {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
score := l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
return errors.Wrapf(p2ptypes.ErrRateLimited, "new bad responses score: %d", score)
}
return nil
}
Expand All @@ -135,9 +134,9 @@ func (l *limiter) validateRawRpcRequest(stream network.Stream) error {
// Treat each request as a minimum of 1.
amt := int64(1)
if amt > remaining {
l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
score := l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p)
return p2ptypes.ErrRateLimited
return errors.Wrapf(p2ptypes.ErrRateLimited, "new bad responses score: %d", score)
}
return nil
}
Expand Down
23 changes: 16 additions & 7 deletions beacon-chain/sync/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)

var (
Expand Down Expand Up @@ -252,9 +254,9 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
logStreamErrors(err, topic)
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
logStreamErrors(err, topic, remotePeer, score)
return
}
if err := handle(ctx, msg, stream); err != nil {
Expand All @@ -272,9 +274,9 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
return
}
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
logStreamErrors(err, topic)
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
logStreamErrors(err, topic, remotePeer, score)
tracing.AnnotateError(span, err)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return
}
if err := handle(ctx, nTyp.Elem().Interface(), stream); err != nil {
Expand All @@ -288,13 +290,20 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
})
}

func logStreamErrors(err error, topic string) {
func logStreamErrors(err error, topic string, remotePeer peer.ID, badResponsesScore int) {
log := log.WithFields(logrus.Fields{
"topic": topic,
"peer": remotePeer.String(),
"newBadResponsesScore": badResponsesScore,
})
if isUnwantedError(err) {
log.WithError(err).Debug("Unwanted error")
return
}

if strings.Contains(topic, p2p.RPCGoodByeTopicV1) {
log.WithError(err).WithField("topic", topic).Trace("Could not decode goodbye stream message")
log.WithError(err).Debug("Could not decode goodbye stream message")
return
}
log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message")
log.WithError(err).Debug("Could not decode stream message")
}
6 changes: 3 additions & 3 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
tracing.AnnotateError(span, err)
return err
return errors.Wrapf(err, "new bad responses score: %d", score)
}
available := s.validateRangeAvailability(rp)
if !available {
log.Debug("error in validating range availability")
log.Debug("Error in validating range availability")
s.writeErrorResponseToStream(responseCodeResourceUnavailable, p2ptypes.ErrResourceUnavailable.Error(), stream)
tracing.AnnotateError(span, err)
return nil
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc_beacon_blocks_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{

currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
if uint64(len(blockRoots)) > params.MaxRequestBlock(currentEpoch) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream)
return errors.New("requested more than the max block limit")
return errors.Errorf("requested more than the max block limit - new bad responses score: %d", score)
}
s.rateLimiter.add(stream, int64(len(blockRoots)))

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc_blob_sidecars_by_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
rp, err := validateBlobsByRange(r, s.cfg.chain.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
tracing.AnnotateError(span, err)
return err
return errors.Wrapf(err, "new bad responses score: %d", score)
}

// Ticker to stagger out large requests.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/rpc_blob_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface

blobIdents := *ref
if err := validateBlobByRootRequest(blobIdents); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
return err
return errors.Wrapf(err, "new bad responses score: %d", score)
}
// Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups.
sort.Sort(blobIdents)
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/sync/rpc_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta
// Read the METADATA response from the peer.
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
if err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return nil, errors.Wrap(err, "read status code")
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return nil, errors.Wrapf(err, "read status code for metadata request, new bad responses score: %d", score)
}

if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return nil, errors.New(errMsg)
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return nil, errors.Errorf("%s, new bad responses score: %d", errMsg, score)
}

// Get the genesis validators root.
Expand Down Expand Up @@ -179,8 +179,8 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta

// Decode the metadata from the peer.
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return nil, err
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return nil, errors.Wrapf(err, "decode metadata, new bad responses score: %d", score)
}

return msg, nil
Expand Down
10 changes: 6 additions & 4 deletions beacon-chain/sync/rpc_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
err = errors.Wrapf(err, "new bad responses score: %d", score)
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
}

Expand Down Expand Up @@ -141,8 +142,8 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {

// If the peer responded with an error, increment the bad responses scorer.
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return errors.Errorf("code: %d - %s", code, errMsg)
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return errors.Errorf("code: %d, new bad responses score: %d - %s", code, score, errMsg)
}

// Decode the sequence number from the peer.
Expand All @@ -156,7 +157,8 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
err = errors.Wrapf(err, "new bad responses score: %d", score)
}

return errors.Wrap(err, "validate sequence number")
Expand Down
Loading

0 comments on commit 7e71977

Please sign in to comment.