From d2aa4461db8a5571917d5396f34d534934c25615 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 20 Dec 2024 12:08:02 +0100 Subject: [PATCH] Add debug log when downcoring a peer for bad response. --- CHANGELOG.md | 3 +- beacon-chain/p2p/discovery.go | 2 +- beacon-chain/p2p/handshake.go | 6 +++- .../p2p/peers/scorers/bad_responses.go | 10 +++++-- beacon-chain/p2p/service.go | 8 +++--- beacon-chain/p2p/subnets.go | 2 +- beacon-chain/sync/backfill/service.go | 8 ++---- beacon-chain/sync/batch_verifier.go | 2 +- .../sync/initial-sync/blocks_queue.go | 6 ++-- beacon-chain/sync/rate_limiter.go | 9 +++--- beacon-chain/sync/rpc.go | 23 ++++++++++----- .../sync/rpc_beacon_blocks_by_range.go | 6 ++-- .../sync/rpc_beacon_blocks_by_root.go | 4 +-- .../sync/rpc_blob_sidecars_by_range.go | 4 +-- .../sync/rpc_blob_sidecars_by_root.go | 4 +-- beacon-chain/sync/rpc_metadata.go | 12 ++++---- beacon-chain/sync/rpc_ping.go | 10 ++++--- beacon-chain/sync/rpc_status.go | 28 ++++++++++++------- 18 files changed, 86 insertions(+), 61 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 036f80f883aa..a2ea5432c434 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,8 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - `Finished building block`: Display error only if not nil. - Added support to update target and max blob count to different values per hard fork config. - Log before blob filesystem cache warm-up. -- +- Debug log when downscoring a peer for bad response reason. + ### Changed - Process light client finality updates only for new finalized epochs instead of doing it for every block. diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index e5689efbdecd..8f2e8f2d87f0 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -349,7 +349,7 @@ func (s *Service) listenForNewNodes() { wg.Add(1) go func(info *peer.AddrInfo) { if err := s.connectWithPeer(s.ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with peer") } wg.Done() }(peerInfo) diff --git a/beacon-chain/p2p/handshake.go b/beacon-chain/p2p/handshake.go index df19f861ee5d..4e44080a4d26 100644 --- a/beacon-chain/p2p/handshake.go +++ b/beacon-chain/p2p/handshake.go @@ -214,7 +214,11 @@ func (s *Service) AddDisconnectionHandler(handler func(ctx context.Context, id p // Only log disconnections if we were fully connected. if priorState == peers.Connected { activePeersCount := len(s.peers.Active()) - log.WithField("remainingActivePeers", activePeersCount).Debug("Peer disconnected") + log.WithFields(logrus.Fields{ + "remainingActivePeers": activePeersCount, + "direction": conn.Stat().Direction.String(), + "peerID": peerID, + }).Debug("Peer disconnected") } }() }, diff --git a/beacon-chain/p2p/peers/scorers/bad_responses.go b/beacon-chain/p2p/peers/scorers/bad_responses.go index 9e834e25780f..a3a8f73bb214 100644 --- a/beacon-chain/p2p/peers/scorers/bad_responses.go +++ b/beacon-chain/p2p/peers/scorers/bad_responses.go @@ -101,18 +101,22 @@ func (s *BadResponsesScorer) countNoLock(pid peer.ID) (int, error) { // Increment increments the number of bad responses we have received from the given remote peer. // If peer doesn't exist this method is no-op. -func (s *BadResponsesScorer) Increment(pid peer.ID) { +func (s *BadResponsesScorer) Increment(pid peer.ID) int { + const defaultBadResponses = 1 + s.store.Lock() defer s.store.Unlock() peerData, ok := s.store.PeerData(pid) if !ok { s.store.SetPeerData(pid, &peerdata.PeerData{ - BadResponses: 1, + BadResponses: defaultBadResponses, }) - return + return defaultBadResponses } peerData.BadResponses++ + + return peerData.BadResponses } // IsBadPeer states if the peer is to be considered bad. diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index 877ec4a0c012..8b2d4f1a8746 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -443,7 +443,7 @@ func (s *Service) connectWithAllTrustedPeers(multiAddrs []multiaddr.Multiaddr) { // make each dial non-blocking go func(info peer.AddrInfo) { if err := s.connectWithPeer(s.ctx, info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with trusted peer") } }(info) } @@ -459,7 +459,7 @@ func (s *Service) connectWithAllPeers(multiAddrs []multiaddr.Multiaddr) { // make each dial non-blocking go func(info peer.AddrInfo) { if err := s.connectWithPeer(s.ctx, info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with peer") } }(info) } @@ -478,8 +478,8 @@ func (s *Service) connectWithPeer(ctx context.Context, info peer.AddrInfo) error ctx, cancel := context.WithTimeout(ctx, maxDialTimeout) defer cancel() if err := s.host.Connect(ctx, info); err != nil { - s.Peers().Scorers().BadResponsesScorer().Increment(info.ID) - return err + score := s.Peers().Scorers().BadResponsesScorer().Increment(info.ID) + return errors.Wrapf(err, "connect to peer %s", info.ID, score) } return nil } diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 74eb898bfd31..121e13d1fc2f 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -113,7 +113,7 @@ func (s *Service) dialPeer(ctx context.Context, wg *sync.WaitGroup, node *enode. wg.Add(1) go func() { if err := s.connectWithPeer(ctx, *info); err != nil { - log.WithError(err).Tracef("Could not connect with peer %s", info.String()) + log.WithError(err).WithField("peerID", info.ID).Debug("Could not connect with peer") } wg.Done() diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index 0dc0df6dddd0..a40f8adb4da8 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -208,8 +208,8 @@ func (s *Service) importBatches(ctx context.Context) { } _, err := s.batchImporter(ctx, current, ib, s.store) if err != nil { - log.WithError(err).WithFields(ib.logFields()).Debug("Backfill batch failed to import") - s.downscore(ib) + score := s.p2p.Peers().Scorers().BadResponsesScorer().Increment(ib.blockPid) + log.WithError(err).WithFields(ib.logFields()).WithField("newBlockPidBadResponsesScore", score).Debug("Backfill batch failed to import") s.batchSeq.update(ib.withState(batchErrRetryable)) // If a batch fails, the subsequent batches are no longer considered importable. break @@ -330,10 +330,6 @@ func (s *Service) initBatches() error { return nil } -func (s *Service) downscore(b batch) { - s.p2p.Peers().Scorers().BadResponsesScorer().Increment(b.blockPid) -} - func (*Service) Stop() error { return nil } diff --git a/beacon-chain/sync/batch_verifier.go b/beacon-chain/sync/batch_verifier.go index 0edb70524553..26121f1ed385 100644 --- a/beacon-chain/sync/batch_verifier.go +++ b/beacon-chain/sync/batch_verifier.go @@ -62,7 +62,7 @@ func (s *Service) validateWithBatchVerifier(ctx context.Context, message string, // If verification fails we fallback to individual verification // of each signature set. if resErr != nil { - log.WithError(resErr).Tracef("Could not perform batch verification of %s", message) + log.WithError(resErr).Debugf("Could not perform batch verification of %s", message) verified, err := set.Verify() if err != nil { verErr := errors.Wrapf(err, "Could not verify %s", message) diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 44461bd4fd7b..40ed90e397d9 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -337,8 +337,10 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn { } if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) { // Peer returned invalid data, penalize. - q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid) - log.WithField("pid", response.pid).Debug("Peer is penalized for invalid blocks") + score := q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid) + log. + WithFields(logrus.Fields{"pid": response.pid, "newBadResponsesScore": score}). + Debug("Peer is penalized for invalid blocks") } return m.state, response.err } diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index 5d088f5002a1..8f17fcd7653a 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -112,9 +112,8 @@ func (l *limiter) validateRequest(stream network.Stream, amt uint64) error { amt = 1 } if amt > uint64(remaining) { - l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) - writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p) - return p2ptypes.ErrRateLimited + score := l.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) + return errors.Wrapf(p2ptypes.ErrRateLimited, "new bad responses score: %d", score) } return nil } @@ -135,9 +134,9 @@ func (l *limiter) validateRawRpcRequest(stream network.Stream) error { // Treat each request as a minimum of 1. amt := int64(1) if amt > remaining { - l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + score := l.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrRateLimited.Error(), stream, l.p2p) - return p2ptypes.ErrRateLimited + return errors.Wrapf(p2ptypes.ErrRateLimited, "new bad responses score: %d", score) } return nil } diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 5c0d73ddbb38..56aa73722648 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -9,6 +9,7 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" @@ -19,6 +20,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" ) var ( @@ -252,9 +254,9 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { return } if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { - logStreamErrors(err, topic) tracing.AnnotateError(span, err) - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) + logStreamErrors(err, topic, remotePeer, score) return } if err := handle(ctx, msg, stream); err != nil { @@ -272,9 +274,9 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { return } if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { - logStreamErrors(err, topic) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) + logStreamErrors(err, topic, remotePeer, score) tracing.AnnotateError(span, err) - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) return } if err := handle(ctx, nTyp.Elem().Interface(), stream); err != nil { @@ -288,13 +290,20 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { }) } -func logStreamErrors(err error, topic string) { +func logStreamErrors(err error, topic string, remotePeer peer.ID, badResponsesScore int) { + log := log.WithFields(logrus.Fields{ + "topic": topic, + "peer": remotePeer.String(), + "newBadResponsesScore": badResponsesScore, + }) if isUnwantedError(err) { + log.WithError(err).Debug("Unwanted error") return } + if strings.Contains(topic, p2p.RPCGoodByeTopicV1) { - log.WithError(err).WithField("topic", topic).Trace("Could not decode goodbye stream message") + log.WithError(err).Debug("Could not decode goodbye stream message") return } - log.WithError(err).WithField("topic", topic).Debug("Could not decode stream message") + log.WithError(err).Debug("Could not decode stream message") } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index 1bc9ad7f1b4c..c5c6071d40ef 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -43,13 +43,13 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot()) if err != nil { s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) tracing.AnnotateError(span, err) - return err + return errors.Wrapf(err, "new bad responses score: %d", score) } available := s.validateRangeAvailability(rp) if !available { - log.Debug("error in validating range availability") + log.Debug("Error in validating range availability") s.writeErrorResponseToStream(responseCodeResourceUnavailable, p2ptypes.ErrResourceUnavailable.Error(), stream) tracing.AnnotateError(span, err) return nil diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index ea643ac2b3c4..edc8c1b25bff 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -94,9 +94,9 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ currentEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot()) if uint64(len(blockRoots)) > params.MaxRequestBlock(currentEpoch) { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, "requested more than the max block limit", stream) - return errors.New("requested more than the max block limit") + return errors.Errorf("requested more than the max block limit - new bad responses score: %d", score) } s.rateLimiter.add(stream, int64(len(blockRoots))) diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_range.go b/beacon-chain/sync/rpc_blob_sidecars_by_range.go index a5e179513d65..1c8c675e48eb 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_range.go @@ -81,9 +81,9 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa rp, err := validateBlobsByRange(r, s.cfg.chain.CurrentSlot()) if err != nil { s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) tracing.AnnotateError(span, err) - return err + return errors.Wrapf(err, "new bad responses score: %d", score) } // Ticker to stagger out large requests. diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root.go b/beacon-chain/sync/rpc_blob_sidecars_by_root.go index d49040776b32..067674860b68 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root.go @@ -35,9 +35,9 @@ func (s *Service) blobSidecarByRootRPCHandler(ctx context.Context, msg interface blobIdents := *ref if err := validateBlobByRootRequest(blobIdents); err != nil { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) - return err + return errors.Wrapf(err, "new bad responses score: %d", score) } // Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups. sort.Sort(blobIdents) diff --git a/beacon-chain/sync/rpc_metadata.go b/beacon-chain/sync/rpc_metadata.go index 475b2c9c1179..11e93e3a1c00 100644 --- a/beacon-chain/sync/rpc_metadata.go +++ b/beacon-chain/sync/rpc_metadata.go @@ -139,13 +139,13 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta // Read the METADATA response from the peer. code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding()) if err != nil { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) - return nil, errors.Wrap(err, "read status code") + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) + return nil, errors.Wrapf(err, "read status code for metadata request, new bad responses score: %d", score) } if code != 0 { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) - return nil, errors.New(errMsg) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) + return nil, errors.Errorf("%s, new bad responses score: %d", errMsg, score) } // Get the genesis validators root. @@ -179,8 +179,8 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, peerID peer.ID) (meta // Decode the metadata from the peer. if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) - return nil, err + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + return nil, errors.Wrapf(err, "decode metadata, new bad responses score: %d", score) } return msg, nil diff --git a/beacon-chain/sync/rpc_ping.go b/beacon-chain/sync/rpc_ping.go index ec7e2c448a8d..e142ff8d88e5 100644 --- a/beacon-chain/sync/rpc_ping.go +++ b/beacon-chain/sync/rpc_ping.go @@ -43,7 +43,8 @@ func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pc if err != nil { // Descore peer for giving us a bad sequence number. if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) + err = errors.Wrapf(err, "new bad responses score: %d", score) s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream) } @@ -141,8 +142,8 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error { // If the peer responded with an error, increment the bad responses scorer. if code != 0 { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) - return errors.Errorf("code: %d - %s", code, errMsg) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) + return errors.Errorf("code: %d, new bad responses score: %d - %s", code, score, errMsg) } // Decode the sequence number from the peer. @@ -156,7 +157,8 @@ func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error { if err != nil { // Descore peer for giving us a bad sequence number. if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID) + err = errors.Wrapf(err, "new bad responses score: %d", score) } return errors.Wrap(err, "validate sequence number") diff --git a/beacon-chain/sync/rpc_status.go b/beacon-chain/sync/rpc_status.go index 34be4c402079..70df6856a9b5 100644 --- a/beacon-chain/sync/rpc_status.go +++ b/beacon-chain/sync/rpc_status.go @@ -62,8 +62,12 @@ func (s *Service) maintainPeerStatuses() { } if prysmTime.Now().After(lastUpdated.Add(interval)) { if err := s.reValidatePeer(s.ctx, id); err != nil { - log.WithField("peer", id).WithError(err).Debug("Could not revalidate peer") - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id) + log. + WithFields(logrus.Fields{ + "peer": id, + "newBadResponsesScore": score, + }).WithError(err).Debug("Could not revalidate peer") } } }(pid) @@ -161,18 +165,18 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding()) if err != nil { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) - return err + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + return errors.Wrapf(err, "read status code for status request, new bad responses score: %d", score) } if code != 0 { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id) - return errors.New(errMsg) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(id) + return errors.Errorf(errMsg+" new bad responses score: %d", score) } msg := &pb.Status{} if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil { - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) - return err + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + return errors.Wrapf(err, "decode with max length, new bad responses score: %d", score) } // If validation fails, validation error is logged, and peer status scorer will mark peer as bad. @@ -187,7 +191,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error { func (s *Service) reValidatePeer(ctx context.Context, id peer.ID) error { s.cfg.p2p.Peers().Scorers().PeerStatusScorer().SetHeadSlot(s.cfg.chain.HeadSlot()) if err := s.sendRPCStatusRequest(ctx, id); err != nil { - return err + return errors.Wrap(err, "revalidate peer") } // Do not return an error for ping requests. if err := s.sendPingRequest(ctx, id); err != nil && !isUnwantedError(err) { @@ -237,7 +241,11 @@ func (s *Service) statusRPCHandler(ctx context.Context, msg interface{}, stream return nil default: respCode = responseCodeInvalidRequest - s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) + score := s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer) + log.WithError(err).WithFields(logrus.Fields{ + "peer": remotePeer, + "newBadResponsesscore": score, + }).Debug("Could not validate status message") } originalErr := err