Skip to content

Commit

Permalink
feat(shrex/peer-manager): add metrics to peer manager (#1924)
Browse files Browse the repository at this point in the history
## Overview
Introduce metrics to peer manager.

Resolves #1780
  • Loading branch information
walldiss authored Apr 26, 2023
1 parent b10c447 commit 2d5b385
Show file tree
Hide file tree
Showing 8 changed files with 368 additions and 55 deletions.
2 changes: 1 addition & 1 deletion das/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (d *DASer) InitMetrics() error {
)

if err != nil {
return fmt.Errorf("regestering metrics callback: %w", err)
return fmt.Errorf("registering metrics callback: %w", err)
}

return nil
Expand Down
2 changes: 2 additions & 0 deletions nodebuilder/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
modheader "github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/state"
)

Expand Down Expand Up @@ -77,6 +78,7 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti
opts = fx.Options(
baseComponents,
fx.Invoke(das.WithMetrics),
fx.Invoke(share.WithPeerManagerMetrics),
// add more monitoring here
)
case node.Bridge:
Expand Down
11 changes: 11 additions & 0 deletions nodebuilder/share/opts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package share

import (
"github.com/celestiaorg/celestia-node/share/p2p/peers"
)

// WithPeerManagerMetrics is a utility function that is expected to be
// "invoked" by the fx lifecycle.
func WithPeerManagerMetrics(m *peers.Manager) error {
return m.WithMetrics()
}
2 changes: 1 addition & 1 deletion share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
cancel()
switch {
case getErr == nil:
setStatus(peers.ResultSuccess)
setStatus(peers.ResultNoop)
return nd, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
Expand Down
117 changes: 65 additions & 52 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@ import (
)

const (
// ResultSuccess indicates operation was successful and no extra action is required
ResultSuccess result = iota
// ResultNoop indicates operation was successful and no extra action is required
ResultNoop result = "result_noop"
// ResultSynced will save the status of pool as "synced" and will remove peers from it
ResultSynced
ResultSynced = "result_synced"
// ResultCooldownPeer will put returned peer on cooldown, meaning it won't be available by Peer
// method for some time
ResultCooldownPeer
ResultCooldownPeer = "result_cooldown_peer"
// ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from
// any p2p communication in future by libp2p Gater
ResultBlacklistPeer
ResultBlacklistPeer = "result_blacklist_peer"
)

type result string

var log = logging.Logger("shrex/peer-manager")

// Manager keeps track of peers coming from shrex.Sub and from discovery
Expand All @@ -61,6 +63,8 @@ type Manager struct {
// hashes that are not in the chain
blacklistedHashes map[string]bool

metrics *metrics

cancel context.CancelFunc
done chan struct{}
}
Expand All @@ -69,8 +73,6 @@ type Manager struct {
// peer from Peer method
type DoneFunc func(result)

type result int

type syncPool struct {
*pool

Expand Down Expand Up @@ -134,7 +136,8 @@ func (m *Manager) Start(startCtx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel

err := m.shrexSub.AddValidator(m.Validate)
validatorFn := m.metrics.validationObserver(m.Validate)
err := m.shrexSub.AddValidator(validatorFn)
if err != nil {
return fmt.Errorf("registering validator: %w", err)
}
Expand Down Expand Up @@ -174,72 +177,75 @@ func (m *Manager) Stop(ctx context.Context) error {
func (m *Manager) Peer(
ctx context.Context, datahash share.DataHash,
) (peer.ID, DoneFunc, error) {
logger := log.With("hash", datahash.String())
p := m.validatedPool(datahash.String())

// first, check if a peer is available for the given datahash
peerID, ok := p.tryGet()
if ok {
// some pools could still have blacklisted peers in storage
if m.isBlacklistedPeer(peerID) {
logger.Debugw("removing blacklisted peer from pool",
log.Debugw("removing blacklisted peer from pool",
"peer", peerID.String())
p.remove(peerID)
return m.Peer(ctx, datahash)
}
logger.Debugw("get peer from shrexsub pool",
"peer", peerID.String(),
"pool_size", p.size())
return peerID, m.doneFunc(datahash, peerID, false), nil
return m.newPeer(datahash, peerID, sourceShrexSub, p.len(), 0)
}

// if no peer for datahash is currently available, try to use full node
// obtained from discovery
peerID, ok = m.fullNodes.tryGet()
if ok {
logger.Debugw("got peer from full nodes pool",
"peer", peerID.String(),
"pool_size", m.fullNodes.size())
return peerID, m.doneFunc(datahash, peerID, true), nil
return m.newPeer(datahash, peerID, sourceFullNodes, m.fullNodes.len(), 0)
}

// no peers are available right now, wait for the first one
start := time.Now()
select {
case peerID = <-p.next(ctx):
logger.Debugw("got peer from shrexSub pool after wait",
"peer", peerID.String(),
"pool_size", p.size(),
"after (s)", time.Since(start))
return peerID, m.doneFunc(datahash, peerID, false), nil
return m.newPeer(datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
logger.Debugw("got peer from full nodes pool after wait",
"peer", peerID.String(),
"pool_size", m.fullNodes.size(),
"after (s)", time.Since(start))
return peerID, m.doneFunc(datahash, peerID, true), nil
return m.newPeer(datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start))
case <-ctx.Done():
return "", nil, ctx.Err()
}
}

func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull bool) DoneFunc {
func (m *Manager) newPeer(
datahash share.DataHash,
peerID peer.ID,
source peerSource,
poolSize int,
waitTime time.Duration) (peer.ID, DoneFunc, error) {
log.Debugw("got peer",
"hash", datahash.String(),
"peer", peerID.String(),
"source", source,
"pool_size", poolSize,
"wait (s)", waitTime)
m.metrics.observeGetPeer(source, poolSize, waitTime)
return peerID, m.doneFunc(datahash, peerID, source), nil
}

func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerSource) DoneFunc {
return func(result result) {
log.Debugw("set peer status",
"peer", peerID,
log.Debugw("set peer result",
"hash", datahash.String(),
"peer", peerID,
"source", source,
"result", result)
m.metrics.observeDoneResult(source, result)
switch result {
case ResultSuccess:
case ResultNoop:
case ResultSynced:
m.getOrCreatePool(datahash.String()).markSynced()
m.markPoolAsSynced(datahash.String())
case ResultCooldownPeer:
m.getOrCreatePool(datahash.String()).putOnCooldown(peerID)
if fromFull {
if source == sourceFullNodes {
m.fullNodes.putOnCooldown(peerID)
}
case ResultBlacklistPeer:
m.blacklistPeers(peerID)
m.blacklistPeers(reasonMisbehave, peerID)
}
}
}
Expand Down Expand Up @@ -309,6 +315,16 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
return pubsub.ValidationIgnore
}

func (m *Manager) validatedPool(datahash string) *syncPool {
p := m.getOrCreatePool(datahash)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated",
"hash", datahash,
"after (s)", time.Since(p.createdAt))
}
return p
}

func (m *Manager) getOrCreatePool(datahash string) *syncPool {
m.lock.Lock()
defer m.lock.Unlock()
Expand All @@ -325,8 +341,11 @@ func (m *Manager) getOrCreatePool(datahash string) *syncPool {
return p
}

func (m *Manager) blacklistPeers(peerIDs ...peer.ID) {
log.Debugw("blacklisting peers", "peers", peerIDs)
func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) {
log.Debugw("blacklisting peers",
"peers", peerIDs,
"reason", reason)
m.metrics.observeBlacklistPeers(reason, len(peerIDs))

if !m.params.EnableBlackListing {
return
Expand Down Expand Up @@ -356,14 +375,6 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool {
return m.blacklistedHashes[hash.String()]
}

func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "hash", hashStr)
}
return p
}

func (m *Manager) GC(ctx context.Context) {
ticker := time.NewTicker(m.params.GcInterval)
defer ticker.Stop()
Expand All @@ -378,7 +389,7 @@ func (m *Manager) GC(ctx context.Context) {

blacklist = m.cleanUp()
if len(blacklist) > 0 {
m.blacklistPeers(blacklist...)
m.blacklistPeers(reasonInvalidHash, blacklist...)
}
}
}
Expand Down Expand Up @@ -420,13 +431,15 @@ func (m *Manager) cleanUp() []peer.ID {
return blacklist
}

func (p *syncPool) markSynced() {
p.isSynced.Store(true)
old := (*unsafe.Pointer)(unsafe.Pointer(&p.pool))
// release pointer to old pool to free up memory
atomic.StorePointer(old, unsafe.Pointer(newPool(time.Second)))
func (m *Manager) markPoolAsSynced(datahash string) {
p := m.getOrCreatePool(datahash)
if p.isSynced.CompareAndSwap(false, true) {
p.isSynced.Store(true)
old := (*unsafe.Pointer)(unsafe.Pointer(&p.pool))
// release pointer to old pool to free up memory
atomic.StorePointer(old, unsafe.Pointer(newPool(time.Second)))
}
}

func (p *syncPool) add(peers ...peer.ID) {
if !p.isSynced.Load() {
p.pool.add(peers...)
Expand Down
Loading

0 comments on commit 2d5b385

Please sign in to comment.