Skip to content

Commit

Permalink
discovery: propose peers based on their synergy
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Jan 30, 2025
1 parent 14d4c71 commit 82415ed
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 13 deletions.
124 changes: 112 additions & 12 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"math/bits"
"math/rand"
"os"
"slices"
Expand Down Expand Up @@ -174,16 +175,16 @@ func (n *p2pNetwork) PeersByTopic() ([]peer.ID, map[string][]peer.ID) {
for _, tpc := range tpcs {
peerz[tpc], err = n.topicsCtrl.Peers(tpc)
if err != nil {
n.interfaceLogger.Error("Cant get peers from topics")
n.interfaceLogger.Error("Cant get peers for specified topic", zap.String("topic", tpc), zap.Error(err))

Check warning on line 178 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L178

Added line #L178 was not covered by tests
return nil, nil
}
}
allpeers, err := n.topicsCtrl.Peers("")
allPeers, err := n.topicsCtrl.Peers("")
if err != nil {
n.interfaceLogger.Error("Cant all peers")
n.interfaceLogger.Error("Cant list all peers", zap.Error(err))

Check warning on line 184 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L184

Added line #L184 was not covered by tests
return nil, nil
}
return allpeers, peerz
return allPeers, peerz
}

// Close implements io.Closer
Expand Down Expand Up @@ -335,24 +336,123 @@ func (n *p2pNetwork) Start(logger *zap.Logger) error {
// leaves some vacant slots for the next iteration - on the next iteration better
// peers might show up (so we don't want to "spend" all of these vacant slots at once)
peersToProposeCnt := max(vacantOutboundSlotCnt/2, 1)
peersToProposeCnt = min(peersToProposeCnt, int(peersByPriority.Size())) // nolint: gosec
minScore, maxScore := math.MaxFloat64, 0.0
for i := 0; i < peersToProposeCnt; i++ {
// also limit how many peers we want to propose accounting for "peer synergy" - which
// is a way to pick peers such that they don't have too many overlapping subnets and
// instead cover more dead/solo subnets for us (this value can't too high for performance
// reasons)
const peersToProposeMaxWithSynergy = 12
peersToProposeCnt = min(peersToProposeCnt, peersToProposeMaxWithSynergy)
candidatePeersToProposeCnt := 2 * peersToProposeCnt

// make sure we don't exceed peersByPriority size, and terminate early if there is no peers
// to propose
peersToProposeCnt = min(peersToProposeCnt, int(peersByPriority.Size())) // nolint: gosec
candidatePeersToProposeCnt = min(candidatePeersToProposeCnt, int(peersByPriority.Size())) // nolint: gosec
if peersToProposeCnt < 1 {
n.interfaceLogger.Info("Not gonna propose discovered peers: no suitable peer candidates")
return
}

Check warning on line 354 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L338-L354

Added lines #L338 - L354 were not covered by tests

candidatePeersToPropose := make([]peers.DiscoveredPeer, 0, candidatePeersToProposeCnt)
minScore, maxScore := math.MaxFloat64, 0.0 // used as extra debugging info
for i := 0; i < candidatePeersToProposeCnt; i++ {
peerCandidate, priority, _ := peersByPriority.Pop()
if minScore > priority {
minScore = priority
}
if maxScore < priority {
maxScore = priority
}
candidatePeersToPropose = append(candidatePeersToPropose, peerCandidate)

Check warning on line 366 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L356-L366

Added lines #L356 - L366 were not covered by tests
}

// SubnetSum represents a sum of 0 or more subnets, each byte at index K corresponds to
// how many of that particular subnet at index K the subnet-sets summed had
type SubnetSum []byte
// addSubnets combines sums a and b to calculate the resulting subnet sum
addSubnets := func(a SubnetSum, b SubnetSum) SubnetSum {
result := SubnetSum{}
for i := 0; i < commons.SubnetsCount; i++ {
result[i] = a[i] + b[i]
}
return result

Check warning on line 378 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L371-L378

Added lines #L371 - L378 were not covered by tests
}
// scoreSubnetSumSynergy estimates how good/bad a SubnetSum is based on how many unhealthy
// (we are mostly really interested in dead and solo subnets) subnets it has, it allows us
// to sum up and estimate the synergy of a bunch of subnets to make sure they don't "overlap"
// too much
scoreSubnetSumSynergy := func(s SubnetSum) float64 {
const deadPriorityMultiplier = 5 // we value resolving dead subnets 5x higher over resolving solo subnets
const maxPossibleScore = commons.SubnetsCount * deadPriorityMultiplier
deadSubnetCnt := 0 // how many dead subnets SubnetSum has
soloSubnetCnt := 0 // how many solo subnets SubnetSum has
for i := 0; i < commons.SubnetsCount; i++ {
if s[i] == 0 {
deadSubnetCnt++
}
if s[i] == 1 {
soloSubnetCnt++
}

Check warning on line 395 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L384-L395

Added lines #L384 - L395 were not covered by tests
}
return float64(maxPossibleScore - (deadPriorityMultiplier*deadSubnetCnt + soloSubnetCnt))

Check warning on line 397 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L397

Added line #L397 was not covered by tests
}
// ownSubnetSum represents subnet sum of peers we already have open connections with
ownSubnetSum := SubnetSum{}
allPeerIDs, err := n.topicsCtrl.Peers("")
if err != nil {
n.interfaceLogger.Error("Cant list all peers", zap.Error(err))
return
}
for _, pID := range allPeerIDs {
pSubnets := n.idx.GetPeerSubnets(pID)
ownSubnetSum = addSubnets(ownSubnetSum, SubnetSum(pSubnets))
}

Check warning on line 409 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L400-L409

Added lines #L400 - L409 were not covered by tests

// iterate over all possible peer sets of max size candidatePeersToProposeCnt (peer-set is
// represented by uint64 number, each 1-value bit represents peer presence while each 0-bit
// represents peer absence in such peer-set) in the range 0 - 2^candidatePeersToProposeCnt
// and find peer-set of size peersToProposeCnt that has the best synergy
bestSynergyScore := 0.0
var bestSynergyPeers []peers.DiscoveredPeer
// candidatePeerSetsLast is the last peer-set represented by 2^candidatePeersToProposeCnt
candidatePeerSetsLast := uint64(1) << candidatePeersToProposeCnt
for candidatePeerSet := uint64(0); candidatePeerSet < candidatePeerSetsLast; candidatePeerSet++ {
// we are only interested in candidates that represent exactly peersToProposeCnt peers
candidateCnt := bits.OnesCount64(candidatePeerSet)
if candidateCnt != peersToProposeCnt {
continue // not interested in this candidate

Check warning on line 423 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L415-L423

Added lines #L415 - L423 were not covered by tests
}

peerMask := uint64(1)
tmpSubnetSum := ownSubnetSum
tmpSynergyPeers := make([]peers.DiscoveredPeer, 0, peersToProposeCnt)
for peerIdx := 0; peerIdx < candidatePeersToProposeCnt; peerIdx, peerMask = peerIdx+1, peerMask<<1 {
candidatePresent := candidatePeerSet & peerMask
if candidatePresent == uint64(0) {
continue // this peerIdx isn't part of candidate

Check warning on line 432 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L426-L432

Added lines #L426 - L432 were not covered by tests
}
p := candidatePeersToPropose[peerIdx]
tmpSynergyPeers = append(tmpSynergyPeers, p)
pSubnets := n.idx.GetPeerSubnets(p.ID)
tmpSubnetSum = addSubnets(tmpSubnetSum, SubnetSum(pSubnets))

Check warning on line 437 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L434-L437

Added lines #L434 - L437 were not covered by tests
}
tmpSynergyScore := scoreSubnetSumSynergy(tmpSubnetSum)

if tmpSynergyScore > bestSynergyScore {
bestSynergyScore = tmpSynergyScore
bestSynergyPeers = tmpSynergyPeers
}

Check warning on line 444 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L439-L444

Added lines #L439 - L444 were not covered by tests
}

// finally, offer best-synergy peers we came up with to connector so it tries to connect these
for _, p := range bestSynergyPeers {
// update retry counter for this peer so we eventually skip it after certain number of retries
peers.DiscoveredPeersPool.Set(peerCandidate.ID, peers.DiscoveredPeer{
AddrInfo: peerCandidate.AddrInfo,
ConnectRetries: peerCandidate.ConnectRetries + 1,
peers.DiscoveredPeersPool.Set(p.ID, peers.DiscoveredPeer{
AddrInfo: p.AddrInfo,
ConnectRetries: p.ConnectRetries + 1,
})
connector <- peerCandidate.AddrInfo // try to connect to best peer
connector <- p.AddrInfo // try to connect to best peer
}

n.interfaceLogger.Info(
"Proposed discovered peers",
zap.Int("count", peersToProposeCnt),
Expand Down
3 changes: 2 additions & 1 deletion network/topics/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ type Controller interface {
Subscribe(logger *zap.Logger, name string) error
// Unsubscribe unsubscribes from the given topic
Unsubscribe(logger *zap.Logger, topicName string, hard bool) error
// Peers returns a list of peers we are connected to in the given topic.
// Peers returns a list of peers we are connected to in the given topic, if topicName
// param is an empty string it returns a list of all peers we are connected to.
Peers(topicName string) ([]peer.ID, error)
// Topics lists all topics this node is subscribed to
Topics() []string
Expand Down

0 comments on commit 82415ed

Please sign in to comment.