Skip to content

Commit

Permalink
Improve Peer Score Algorithm (#248)
Browse files Browse the repository at this point in the history
* feat: improve peer scoring algo

* debug

* debug

* more debug

* debug TryDiaNext

* remove log

* fix score type

* rever block sync logic

* rever block sync logic

* rever block sync logic

* Add block request log

* Add apply block latency

* add processEpeerEvent log back

* update unit test

* update unit test

---------

Co-authored-by: yzang2019 <zymfrank@gmail.com>
2 people authored and Kbhat1 committed Nov 13, 2024
1 parent d877ef1 commit f6d399d
Showing 6 changed files with 128 additions and 70 deletions.
8 changes: 5 additions & 3 deletions internal/blocksync/pool.go
Original file line number Diff line number Diff line change
@@ -327,8 +327,11 @@ func (pool *BlockPool) AddBlock(peerID types.NodeID, block *types.Block, extComm
if peer != nil {
peer.decrPending(blockSize)
}
} else if setBlockResult < 0 {
err := errors.New("bpr requester peer is different from original peer")

// Increment the number of consecutive successful block syncs for the peer
pool.peerManager.IncrementBlockSyncs(peerID)
} else {
err := errors.New("requester is different or block already exists")
pool.sendError(err, peerID)
return fmt.Errorf("%w (peer: %s, requester: %s, block height: %d)", err, peerID, requester.getPeerID(), block.Height)
}
@@ -358,7 +361,6 @@ func (pool *BlockPool) SetPeerRange(peerID types.NodeID, base int64, height int6

blockSyncPeers := pool.peerManager.GetBlockSyncPeers()
if len(blockSyncPeers) > 0 && !blockSyncPeers[peerID] {
pool.logger.Info(fmt.Sprintf("Skip adding peer %s for blocksync, num of blocksync peers: %d, num of pool peers: %d", peerID, len(blockSyncPeers), len(pool.peers)))
return
}

9 changes: 9 additions & 0 deletions internal/blocksync/reactor.go
Original file line number Diff line number Diff line change
@@ -258,6 +258,10 @@ func (r *Reactor) handleMessage(ctx context.Context, envelope *p2p.Envelope, blo
return r.respondToPeer(ctx, msg, envelope.From, blockSyncCh)
case *bcproto.BlockResponse:
block, err := types.BlockFromProto(msg.Block)

r.logger.Info("received block response from peer",
"peer", envelope.From,
"height", block.Height)
if err != nil {
r.logger.Error("failed to convert block from proto",
"peer", envelope.From,
@@ -495,6 +499,7 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh
var (
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)
switchToConsensusTicker = time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
lastApplyBlockTime = time.Now()

blocksSynced = uint64(0)

@@ -695,7 +700,11 @@ func (r *Reactor) poolRoutine(ctx context.Context, stateSynced bool, blockSyncCh

// TODO: Same thing for app - but we would need a way to get the hash
// without persisting the state.
r.logger.Info(fmt.Sprintf("Requesting block %d from peer took %s", first.Height, time.Since(lastApplyBlockTime)))
startTime := time.Now()
state, err = r.blockExec.ApplyBlock(ctx, state, firstID, first, nil)
r.logger.Info(fmt.Sprintf("ApplyBlock %d took %s", first.Height, time.Since(startTime)))
lastApplyBlockTime = time.Now()
if err != nil {
panic(fmt.Sprintf("failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
62 changes: 45 additions & 17 deletions internal/p2p/peermanager.go
Original file line number Diff line number Diff line change
@@ -25,8 +25,6 @@ import (
const (
// retryNever is returned by retryDelay() when retries are disabled.
retryNever time.Duration = math.MaxInt64
// DefaultMutableScore is the default score for a peer during initialization
DefaultMutableScore int64 = 10
)

// PeerStatus is a peer status.
@@ -47,9 +45,10 @@ const (
type PeerScore uint8

const (
PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers
PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers
MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1
PeerScoreUnconditional PeerScore = math.MaxUint8 // unconditional peers, 255
PeerScorePersistent PeerScore = PeerScoreUnconditional - 1 // persistent peers, 254
MaxPeerScoreNotPersistent PeerScore = PeerScorePersistent - 1 // not persistent peers, 253
DefaultMutableScore PeerScore = MaxPeerScoreNotPersistent - 10 // mutable score, 243
)

// PeerUpdate is a peer update event sent via PeerUpdates.
@@ -598,6 +597,7 @@ func (m *PeerManager) DialFailed(ctx context.Context, address NodeAddress) error

addressInfo.LastDialFailure = time.Now().UTC()
addressInfo.DialFailures++
peer.ConsecSuccessfulBlocks = 0
// We need to invalidate the cache after score changed
m.store.ranked = nil
if err := m.store.Set(peer); err != nil {
@@ -845,7 +845,15 @@ func (m *PeerManager) Disconnected(ctx context.Context, peerID types.NodeID) {

// Update score and invalidate cache if a peer got disconnected
if _, ok := m.store.peers[peerID]; ok {
m.store.peers[peerID].NumOfDisconnections++
// check for potential overflow
if m.store.peers[peerID].NumOfDisconnections < math.MaxInt64 {
m.store.peers[peerID].NumOfDisconnections++
} else {
fmt.Printf("Warning: NumOfDisconnections for peer %s has reached its maximum value\n", peerID)
m.store.peers[peerID].NumOfDisconnections = 0
}

m.store.peers[peerID].ConsecSuccessfulBlocks = 0
m.store.ranked = nil
}

@@ -992,16 +1000,16 @@ func (m *PeerManager) processPeerEvent(ctx context.Context, pu PeerUpdate) {
return
}

if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}

switch pu.Status {
case PeerStatusBad:
m.store.peers[pu.NodeID].MutableScore--
case PeerStatusGood:
m.store.peers[pu.NodeID].MutableScore++
}

if _, ok := m.store.peers[pu.NodeID]; !ok {
m.store.peers[pu.NodeID] = &peerInfo{}
}
// Invalidate the cache after score changed
m.store.ranked = nil
}
@@ -1350,7 +1358,9 @@ type peerInfo struct {
Height int64
FixedScore PeerScore // mainly for tests

MutableScore int64 // updated by router
MutableScore PeerScore // updated by router

ConsecSuccessfulBlocks int64
}

// peerInfoFromProto converts a Protobuf PeerInfo message to a peerInfo,
@@ -1408,27 +1418,35 @@ func (p *peerInfo) Copy() peerInfo {
// Score calculates a score for the peer. Higher-scored peers will be
// preferred over lower scores.
func (p *peerInfo) Score() PeerScore {
// Use predetermined scores if set
if p.FixedScore > 0 {
return p.FixedScore
}
if p.Unconditional {
return PeerScoreUnconditional
}

score := p.MutableScore
score := int64(p.MutableScore)
if p.Persistent || p.BlockSync {
score = int64(PeerScorePersistent)
}

// Add points for block sync performance
score += p.ConsecSuccessfulBlocks / 5

// Penalize for dial failures with time decay
for _, addr := range p.AddressInfo {
// DialFailures is reset when dials succeed, so this
// is either the number of dial failures or 0.
score -= int64(addr.DialFailures)
failureScore := float64(addr.DialFailures) * math.Exp(-0.1*float64(time.Since(addr.LastDialFailure).Hours()))
score -= int64(failureScore)
}

// We consider lowering the score for every 3 disconnection events
score -= p.NumOfDisconnections / 3
// Penalize for disconnections with time decay
timeSinceLastDisconnect := time.Since(p.LastConnected)
decayFactor := math.Exp(-0.1 * timeSinceLastDisconnect.Hours())
effectiveDisconnections := int64(float64(p.NumOfDisconnections) * decayFactor)
score -= effectiveDisconnections / 3

// Cap score for non-persistent peers
if !p.Persistent && score > int64(MaxPeerScoreNotPersistent) {
score = int64(MaxPeerScoreNotPersistent)
}
@@ -1535,3 +1553,13 @@ func (m *PeerManager) MarkReadyConnected(nodeId types.NodeID) {
m.ready[nodeId] = true
m.connected[nodeId] = true
}

func (m *PeerManager) IncrementBlockSyncs(peerID types.NodeID) {
m.mtx.Lock()
defer m.mtx.Unlock()

if peer, ok := m.store.peers[peerID]; ok {
peer.ConsecSuccessfulBlocks++
m.store.ranked = nil
}
}
38 changes: 21 additions & 17 deletions internal/p2p/peermanager_scoring_test.go
Original file line number Diff line number Diff line change
@@ -2,11 +2,12 @@ package p2p

import (
"context"
"github.com/tendermint/tendermint/libs/log"
"strings"
"testing"
"time"

"github.com/tendermint/tendermint/libs/log"

"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tm-db"

@@ -44,26 +45,29 @@ func TestPeerScoring(t *testing.T) {
NodeID: id,
Status: PeerStatusGood,
})
require.EqualValues(t, defaultScore+int64(i), peerManager.Scores()[id])
require.EqualValues(t, defaultScore+PeerScore(i), peerManager.Scores()[id])
}
// watch the corresponding decreases respond to update
for i := 1; i < 10; i++ {
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusBad,
})
require.EqualValues(t, DefaultMutableScore+int64(9)-int64(i), peerManager.Scores()[id])
require.EqualValues(t, DefaultMutableScore+PeerScore(9)-PeerScore(i), peerManager.Scores()[id])
}

// Dial failure should decrease score
_ = peerManager.DialFailed(ctx, NodeAddress{NodeID: id, Protocol: "memory"})
require.EqualValues(t, DefaultMutableScore-1, peerManager.Scores()[id])
addr := NodeAddress{NodeID: id, Protocol: "memory"}
_ = peerManager.DialFailed(ctx, addr)
_ = peerManager.DialFailed(ctx, addr)
_ = peerManager.DialFailed(ctx, addr)
require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id])

// Disconnect every 3 times should also decrease score
for i := 1; i < 7; i++ {
peerManager.Disconnected(ctx, id)
}
require.EqualValues(t, DefaultMutableScore-3, peerManager.Scores()[id])
require.EqualValues(t, DefaultMutableScore-2, peerManager.Scores()[id])
})
t.Run("AsynchronousIncrement", func(t *testing.T) {
start := peerManager.Scores()[id]
@@ -92,18 +96,18 @@ func TestPeerScoring(t *testing.T) {
"startAt=%d score=%d", start, peerManager.Scores()[id])
})
t.Run("TestNonPersistantPeerUpperBound", func(t *testing.T) {
start := int64(peerManager.Scores()[id] + 1)
for i := start; i <= int64(PeerScorePersistent)+start; i++ {
peerManager.processPeerEvent(ctx, PeerUpdate{
NodeID: id,
Status: PeerStatusGood,
})
// Reset peer state to remove any previous penalties
peerManager.store.peers[id] = &peerInfo{
ID: id,
MutableScore: DefaultMutableScore,
}

if i >= int64(PeerScorePersistent) {
require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id])
} else {
require.EqualValues(t, i, peerManager.Scores()[id])
}
// Add successful blocks to increase score
for i := 0; i < 100; i++ {
peerManager.IncrementBlockSyncs(id)
}

// Score should be capped at MaxPeerScoreNotPersistent
require.EqualValues(t, MaxPeerScoreNotPersistent, peerManager.Scores()[id])
})
}
75 changes: 42 additions & 33 deletions internal/p2p/peermanager_test.go
Original file line number Diff line number Diff line change
@@ -172,7 +172,7 @@ func TestNewPeerManager_Persistence(t *testing.T) {
require.Equal(t, map[types.NodeID]p2p.PeerScore{
aID: p2p.PeerScorePersistent,
bID: 1,
cID: 10,
cID: p2p.DefaultMutableScore,
}, peerManager.Scores())

// Creating a new peer manager with the same database should retain the
@@ -198,7 +198,7 @@ func TestNewPeerManager_Persistence(t *testing.T) {
peerManager.DialFailed(ctx, bAddresses[0])
require.Equal(t, map[types.NodeID]p2p.PeerScore{
aID: 0,
bID: p2p.PeerScorePersistent - 1,
bID: p2p.PeerScorePersistent,
cID: 1,
}, peerManager.Scores())
}
@@ -242,7 +242,7 @@ func TestNewPeerManager_Unconditional(t *testing.T) {
require.Equal(t, map[types.NodeID]p2p.PeerScore{
aID: p2p.PeerScoreUnconditional,
bID: 1,
cID: 10,
cID: p2p.DefaultMutableScore,
}, peerManager.Scores())

// Creating a new peer manager with the same database should retain the
@@ -680,7 +680,7 @@ func TestPeerManager_TryDialNext_UpgradeReservesPeer(t *testing.T) {
c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}

peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11},
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1, c.NodeID: p2p.DefaultMutableScore + 1},
MaxConnected: 1,
MaxConnectedUpgrade: 2,
}, p2p.NopMetrics())
@@ -846,13 +846,17 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
c := p2p.NodeAddress{Protocol: "memory", NodeID: types.NodeID(strings.Repeat("c", 40))}

peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11},
PeerScores: map[types.NodeID]p2p.PeerScore{
a.NodeID: p2p.DefaultMutableScore - 1, // Set lower score for a to make it upgradeable
b.NodeID: p2p.DefaultMutableScore + 1, // Higher score for b to attempt upgrade
c.NodeID: p2p.DefaultMutableScore + 1, // Same high score for c to attempt upgrade after b fails
},
MaxConnected: 1,
MaxConnectedUpgrade: 2,
}, p2p.NopMetrics())
require.NoError(t, err)

// Add a and connect to it.
// Add and connect to peer a (lower scored)
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
@@ -861,25 +865,21 @@ func TestPeerManager_DialFailed_UnreservePeer(t *testing.T) {
require.Equal(t, a, dial)
require.NoError(t, peerManager.Dialed(a))

// Add b and start dialing it. This will claim a for upgrading.
// Add both higher scored peers b and c
added, err = peerManager.Add(b)
require.NoError(t, err)
require.True(t, added)
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, b, dial)

// Adding c and dialing it will fail, even though it could upgrade a and we
// have free upgrade slots, because a is the only connected peer that can be
// upgraded and b is already trying to upgrade it.
added, err = peerManager.Add(c)
added, err = peerManager.Add(c) // Add c before attempting upgrade
require.NoError(t, err)
require.True(t, added)

// Attempt to dial b for upgrade
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.Empty(t, dial)
require.Equal(t, b, dial)

// Failing b's dial will now make c available for dialing.
// When b's dial fails, the upgrade slot should be freed
// allowing c to attempt upgrade of the same peer (a)
require.NoError(t, peerManager.DialFailed(ctx, b))
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
@@ -963,11 +963,16 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
MaxConnected: 2,
MaxConnectedUpgrade: 1,
PeerScores: map[types.NodeID]p2p.PeerScore{c.NodeID: 11, d.NodeID: 11},
PeerScores: map[types.NodeID]p2p.PeerScore{
a.NodeID: p2p.DefaultMutableScore - 1, // Lower score for a
b.NodeID: p2p.DefaultMutableScore - 1, // Lower score for b
c.NodeID: p2p.DefaultMutableScore + 1, // Higher score for c to upgrade
d.NodeID: p2p.DefaultMutableScore + 1, // Higher score for d to upgrade
},
}, p2p.NopMetrics())
require.NoError(t, err)

// Dialing a and b is fine.
// Connect to lower scored peers a and b
added, err := peerManager.Add(a)
require.NoError(t, err)
require.True(t, added)
@@ -978,20 +983,24 @@ func TestPeerManager_Dialed_MaxConnectedUpgrade(t *testing.T) {
require.True(t, added)
require.NoError(t, peerManager.Dialed(b))

// Starting an upgrade of c should be fine.
// Add both higher scored peers c and d
added, err = peerManager.Add(c)
require.NoError(t, err)
require.True(t, added)
added, err = peerManager.Add(d)
require.NoError(t, err)
require.True(t, added)

// Start upgrade with c
dial, err := peerManager.TryDialNext()
require.NoError(t, err)
require.Equal(t, c, dial)
require.NoError(t, peerManager.Dialed(c))

// Trying to mark d dialed should fail, since there are no more upgrade
// slots and a/b haven't been evicted yet.
added, err = peerManager.Add(d)
// Try to dial d - should fail since we're at upgrade capacity
dial, err = peerManager.TryDialNext()
require.NoError(t, err)
require.True(t, added)
require.Zero(t, dial)
require.Error(t, peerManager.Dialed(d))
}

@@ -1013,7 +1022,7 @@ func TestPeerManager_Dialed_Upgrade(t *testing.T) {
peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
MaxConnected: 1,
MaxConnectedUpgrade: 2,
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11, c.NodeID: 11},
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1, c.NodeID: p2p.DefaultMutableScore + 1},
}, p2p.NopMetrics())
require.NoError(t, err)

@@ -1237,8 +1246,8 @@ func TestPeerManager_Accepted_MaxConnectedUpgrade(t *testing.T) {

peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: map[types.NodeID]p2p.PeerScore{
c.NodeID: 11,
d.NodeID: 12,
c.NodeID: p2p.DefaultMutableScore + 1,
d.NodeID: p2p.DefaultMutableScore + 2,
},
MaxConnected: 1,
MaxConnectedUpgrade: 1,
@@ -1285,8 +1294,8 @@ func TestPeerManager_Accepted_Upgrade(t *testing.T) {

peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: map[types.NodeID]p2p.PeerScore{
b.NodeID: 11,
c.NodeID: 11,
b.NodeID: p2p.DefaultMutableScore + 1,
c.NodeID: p2p.DefaultMutableScore + 1,
},
MaxConnected: 1,
MaxConnectedUpgrade: 2,
@@ -1328,8 +1337,8 @@ func TestPeerManager_Accepted_UpgradeDialing(t *testing.T) {

peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
PeerScores: map[types.NodeID]p2p.PeerScore{
b.NodeID: 11,
c.NodeID: 11,
b.NodeID: p2p.DefaultMutableScore + 1,
c.NodeID: p2p.DefaultMutableScore + 1,
},
MaxConnected: 1,
MaxConnectedUpgrade: 2,
@@ -1504,7 +1513,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeDialed(t *testing.T) {
peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
MaxConnected: 1,
MaxConnectedUpgrade: 1,
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11},
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1},
}, p2p.NopMetrics())
require.NoError(t, err)

@@ -1545,7 +1554,7 @@ func TestPeerManager_EvictNext_WakeOnUpgradeAccepted(t *testing.T) {
peerManager, err := p2p.NewPeerManager(log.NewNopLogger(), selfID, dbm.NewMemDB(), p2p.PeerManagerOptions{
MaxConnected: 1,
MaxConnectedUpgrade: 1,
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: 11},
PeerScores: map[types.NodeID]p2p.PeerScore{b.NodeID: p2p.DefaultMutableScore + 1},
}, p2p.NopMetrics())
require.NoError(t, err)

6 changes: 6 additions & 0 deletions internal/state/execution.go
Original file line number Diff line number Diff line change
@@ -332,13 +332,19 @@ func (blockExec *BlockExecutor) ApplyBlock(
defer commitSpan.End()
}
// Lock mempool, commit app state, update mempoool.
commitStart := time.Now()
retainHeight, err := blockExec.Commit(ctx, state, block, fBlockRes.TxResults)
if err != nil {
return state, fmt.Errorf("commit failed for application: %w", err)
}
if commitSpan != nil {
commitSpan.End()
}
if time.Since(commitStart) > 1000*time.Millisecond {
blockExec.logger.Info("commit in blockExec",
"duration", time.Since(commitStart),
"height", block.Height)
}

// Update evpool with the latest state.
blockExec.evpool.Update(ctx, state, block.Evidence)

0 comments on commit f6d399d

Please sign in to comment.