Skip to content

Commit

Permalink
Observability - refactor peer Discovery metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
oleg-ssvlabs committed Dec 4, 2024
1 parent 23e58e1 commit edaf353
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 18 deletions.
11 changes: 6 additions & 5 deletions network/discovery/dv5_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (dvs *DiscV5Service) Bootstrap(logger *zap.Logger, handler HandleNewPeer) e
fields.ENR(e.Node),
fields.PeerID(e.AddrInfo.ID),
)
err := dvs.checkPeer(logger, e)
err := dvs.checkPeer(dvs.ctx, logger, e)
if err != nil {
if skippedPeers%logFrequency == 0 {
logger.Debug("skipped discovered peer", zap.Error(err))
Expand All @@ -167,9 +167,10 @@ func (dvs *DiscV5Service) Bootstrap(logger *zap.Logger, handler HandleNewPeer) e

var zeroSubnets, _ = records.Subnets{}.FromString(records.ZeroSubnets)

func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
func (dvs *DiscV5Service) checkPeer(ctx context.Context, logger *zap.Logger, e PeerEvent) error {
// Get the peer's domain type, skipping if it mismatches ours.
// TODO: uncomment errors once there are sufficient nodes with domain type.
peerDiscoveriesCounter.Add(ctx, 1)
nodeDomainType, err := records.GetDomainTypeEntry(e.Node.Record(), records.KeyDomainType)
if err != nil {
return errors.Wrap(err, "could not read domain type")
Expand All @@ -190,22 +191,22 @@ func (dvs *DiscV5Service) checkPeer(logger *zap.Logger, e PeerEvent) error {
return fmt.Errorf("could not read subnets: %w", err)
}
if bytes.Equal(zeroSubnets, nodeSubnets) {
recordPeerRejection(ctx, zeroSubnetsReason)
return errors.New("zero subnets")
}

dvs.subnetsIdx.UpdatePeerSubnets(e.AddrInfo.ID, nodeSubnets)

// Filters
if !dvs.limitNodeFilter(e.Node) {
peerRejectionsCounter.Add(dvs.ctx, 1)
recordPeerRejection(ctx, reachedLimitReason)
return errors.New("reached limit")
}
if !dvs.sharedSubnetsFilter(1)(e.Node) {
peerRejectionsCounter.Add(dvs.ctx, 1)
recordPeerRejection(ctx, noSharedSubnetsReason)
return errors.New("no shared subnets")
}

peerDiscoveriesCounter.Add(dvs.ctx, 1)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion network/discovery/dv5_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func TestCheckPeer(t *testing.T) {
for _, test := range tests {
test := test
t.Run(test.name+":run", func(t *testing.T) {
err := dvs.checkPeer(logger, PeerEvent{
err := dvs.checkPeer(context.TODO(), logger, PeerEvent{
Node: test.localNode.Node(),
})
if test.expectedError != nil {
Expand Down
23 changes: 21 additions & 2 deletions network/discovery/observability.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,29 @@
package discovery

import (
"context"
"fmt"

"github.com/ssvlabs/ssv/observability"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/ssvlabs/ssv/observability"
)

const (
observabilityName = "github.com/ssvlabs/ssv/network/discovery"
observabilityNamespace = "ssv.p2p.discovery"
)

type rejectionReason string

const (
reachedLimitReason rejectionReason = "reachedLimit"
noSharedSubnetsReason rejectionReason = "noSharedSubnets"
zeroSubnetsReason rejectionReason = "zeroSubnets"
)

var (
meter = otel.Meter(observabilityName)

Expand All @@ -24,11 +35,19 @@ var (

peerRejectionsCounter = observability.NewMetric(
meter.Int64Counter(
metricName("rejections"),
metricName("peers.rejected"),
metric.WithUnit("{peer}"),
metric.WithDescription("total number of peers rejected during discovery")))
)

func metricName(name string) string {
return fmt.Sprintf("%s.%s", observabilityNamespace, name)
}

func recordPeerRejection(ctx context.Context, reason rejectionReason) {
peerRejectionsCounter.Add(ctx, 1, metric.WithAttributes(peerRejectionReasonAttribute(reason)))
}

func peerRejectionReasonAttribute(reason rejectionReason) attribute.KeyValue {
return attribute.String("ssv.p2p.discovery.rejection_reason", string(reason))
}
20 changes: 10 additions & 10 deletions network/discovery/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,47 +243,47 @@ func TestDiscV5Service_checkPeer(t *testing.T) {
}()

// Valid peer
err := dvs.checkPeer(testLogger, ToPeerEvent(NewTestingNode(t)))
err := dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NewTestingNode(t)))
require.NoError(t, err)

// No domain
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithoutDomain(t)))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithoutDomain(t)))
require.ErrorContains(t, err, "could not read domain type: not found")

// No next domain. No error since it's not enforced
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithoutNextDomain(t)))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithoutNextDomain(t)))
require.NoError(t, err)

// Matching main domain
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithCustomDomains(t, testNetConfig.DomainType(), spectypes.DomainType{})))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithCustomDomains(t, testNetConfig.DomainType(), spectypes.DomainType{})))
require.NoError(t, err)

// Matching next domain
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithCustomDomains(t, spectypes.DomainType{}, testNetConfig.DomainType())))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithCustomDomains(t, spectypes.DomainType{}, testNetConfig.DomainType())))
require.NoError(t, err)

// Mismatching domains
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithCustomDomains(t, spectypes.DomainType{}, spectypes.DomainType{})))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithCustomDomains(t, spectypes.DomainType{}, spectypes.DomainType{})))
require.ErrorContains(t, err, "mismatched domain type: neither 00000000 nor 00000000 match 00000302")

// No subnets
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithoutSubnets(t)))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithoutSubnets(t)))
require.ErrorContains(t, err, "could not read subnets: not found")

// Zero subnets
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithZeroSubnets(t)))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithZeroSubnets(t)))
require.ErrorContains(t, err, "zero subnets")

// Valid peer but reached limit
dvs.conns.(*MockConnection).SetAtLimit(true)
err = dvs.checkPeer(testLogger, ToPeerEvent(NewTestingNode(t)))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NewTestingNode(t)))
require.ErrorContains(t, err, "reached limit")
dvs.conns.(*MockConnection).SetAtLimit(false)

// Valid peer but no common subnet
subnets := make([]byte, len(records.ZeroSubnets))
subnets[10] = 1
err = dvs.checkPeer(testLogger, ToPeerEvent(NodeWithCustomSubnets(t, subnets)))
err = dvs.checkPeer(context.TODO(), testLogger, ToPeerEvent(NodeWithCustomSubnets(t, subnets)))
require.ErrorContains(t, err, "no shared subnets")
}

Expand Down

0 comments on commit edaf353

Please sign in to comment.