Skip to content

Commit

Permalink
Add p2p.Network component (#2283)
Browse files Browse the repository at this point in the history
Signed-off-by: Joshua Kim <[email protected]>
Co-authored-by: Stephen Buttolph <[email protected]>
  • Loading branch information
joshua-kim and StephenButtolph authored Nov 30, 2023
1 parent 0ab2046 commit de3b16c
Show file tree
Hide file tree
Showing 12 changed files with 871 additions and 633 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/Microsoft/go-winio v0.5.2
github.com/NYTimes/gziphandler v1.1.1
github.com/ava-labs/coreth v0.12.9-rc.5
github.com/ava-labs/coreth v0.12.9-rc.7
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/coreth v0.12.9-rc.5 h1:xYBgNm1uOPfUdUNm8+fS8ellHnEd4qfFNb6uZHo9tqI=
github.com/ava-labs/coreth v0.12.9-rc.5/go.mod h1:rECKQfGFDeodrwGPlJSvFUJDbVr30jSMIVjQLi6pNX4=
github.com/ava-labs/coreth v0.12.9-rc.7 h1:AlCmXnrJwo0NxlEXQHysQgRQSCA14PZW6iyJmeVYB34=
github.com/ava-labs/coreth v0.12.9-rc.7/go.mod h1:yrf2vEah4Fgj6sJ4UpHewo4DLolwdpf2bJuLRT80PGw=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34 h1:mg9Uw6oZFJKytJxgxnl3uxZOs/SB8CVHg6Io4Tf99Zc=
github.com/ava-labs/ledger-avalanche/go v0.0.0-20231102202641-ae2ebdaeac34/go.mod h1:pJxaT9bUgeRNVmNRgtCHb7sFDIRKy7CzTQVi8gGNT6g=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
7 changes: 3 additions & 4 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ type CrossChainAppResponseCallback func(
type Client struct {
handlerID uint64
handlerPrefix []byte
router *Router
router *router
sender common.AppSender
// nodeSampler is used to select nodes to route AppRequestAny to
nodeSampler NodeSampler
options *clientOptions
}

// AppRequestAny issues an AppRequest to an arbitrary node decided by Client.
Expand All @@ -56,7 +55,7 @@ func (c *Client) AppRequestAny(
appRequestBytes []byte,
onResponse AppResponseCallback,
) error {
sampled := c.nodeSampler.Sample(ctx, 1)
sampled := c.options.nodeSampler.Sample(ctx, 1)
if len(sampled) != 1 {
return ErrNoPeers
}
Expand Down
42 changes: 19 additions & 23 deletions network/p2p/gossip/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (

"github.com/stretchr/testify/require"

"go.uber.org/mock/gomock"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
Expand Down Expand Up @@ -117,10 +115,9 @@ func TestGossiperGossip(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
ctrl := gomock.NewController(t)

responseSender := common.NewMockSender(ctrl)
responseRouter := p2p.NewRouter(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
responseSender := &common.SenderTest{}
responseNetwork := p2p.NewNetwork(logging.NoLog{}, responseSender, prometheus.NewRegistry(), "")
responseBloom, err := NewBloomFilter(1000, 0.01)
require.NoError(err)
responseSet := testSet{
Expand All @@ -130,31 +127,30 @@ func TestGossiperGossip(t *testing.T) {
for _, item := range tt.responder {
require.NoError(responseSet.Add(item))
}
peers := &p2p.Peers{}
require.NoError(peers.Connected(context.Background(), ids.EmptyNodeID, nil))

handler, err := NewHandler[*testTx](responseSet, tt.config, prometheus.NewRegistry())
require.NoError(err)
_, err = responseRouter.RegisterAppProtocol(0x0, handler, peers)
_, err = responseNetwork.NewAppProtocol(0x0, handler)
require.NoError(err)

requestSender := common.NewMockSender(ctrl)
requestRouter := p2p.NewRouter(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")

gossiped := make(chan struct{})
requestSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) {
requestSender := &common.SenderTest{
SendAppRequestF: func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) error {
go func() {
require.NoError(responseRouter.AppRequest(ctx, ids.EmptyNodeID, requestID, time.Time{}, request))
require.NoError(responseNetwork.AppRequest(ctx, ids.EmptyNodeID, requestID, time.Time{}, request))
}()
}).AnyTimes()
return nil
},
}

responseSender.EXPECT().
SendAppResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) {
require.NoError(requestRouter.AppResponse(ctx, nodeID, requestID, appResponseBytes))
close(gossiped)
}).AnyTimes()
requestNetwork := p2p.NewNetwork(logging.NoLog{}, requestSender, prometheus.NewRegistry(), "")
require.NoError(requestNetwork.Connected(context.Background(), ids.EmptyNodeID, nil))

gossiped := make(chan struct{})
responseSender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error {
require.NoError(requestNetwork.AppResponse(ctx, nodeID, requestID, appResponseBytes))
close(gossiped)
return nil
}

bloom, err := NewBloomFilter(1000, 0.01)
require.NoError(err)
Expand All @@ -166,7 +162,7 @@ func TestGossiperGossip(t *testing.T) {
require.NoError(requestSet.Add(item))
}

requestClient, err := requestRouter.RegisterAppProtocol(0x0, nil, peers)
requestClient, err := requestNetwork.NewAppProtocol(0x0, nil)
require.NoError(err)

config := Config{
Expand Down
188 changes: 188 additions & 0 deletions network/p2p/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package p2p

import (
"context"
"encoding/binary"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/version"
)

var (
_ validators.Connector = (*Network)(nil)
_ common.AppHandler = (*Network)(nil)
_ NodeSampler = (*peerSampler)(nil)
)

// ClientOption configures Client
type ClientOption interface {
apply(options *clientOptions)
}

type clientOptionFunc func(options *clientOptions)

func (o clientOptionFunc) apply(options *clientOptions) {
o(options)
}

// WithValidatorSampling configures Client.AppRequestAny to sample validators
func WithValidatorSampling(validators *Validators) ClientOption {
return clientOptionFunc(func(options *clientOptions) {
options.nodeSampler = validators
})
}

// clientOptions holds client-configurable values
type clientOptions struct {
// nodeSampler is used to select nodes to route Client.AppRequestAny to
nodeSampler NodeSampler
}

// NewNetwork returns an instance of Network
func NewNetwork(
log logging.Logger,
sender common.AppSender,
metrics prometheus.Registerer,
namespace string,
) *Network {
return &Network{
Peers: &Peers{},
log: log,
sender: sender,
metrics: metrics,
namespace: namespace,
router: newRouter(log, sender, metrics, namespace),
}
}

// Network exposes networking state and supports building p2p application
// protocols
type Network struct {
Peers *Peers

log logging.Logger
sender common.AppSender
metrics prometheus.Registerer
namespace string

router *router
}

func (n *Network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID uint32, deadline time.Time, request []byte) error {
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

func (n *Network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

func (n *Network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

func (n *Network) AppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error {
return n.router.AppGossip(ctx, nodeID, msg)
}

func (n *Network) CrossChainAppRequest(ctx context.Context, chainID ids.ID, requestID uint32, deadline time.Time, request []byte) error {
return n.router.CrossChainAppRequest(ctx, chainID, requestID, deadline, request)
}

func (n *Network) CrossChainAppResponse(ctx context.Context, chainID ids.ID, requestID uint32, response []byte) error {
return n.router.CrossChainAppResponse(ctx, chainID, requestID, response)
}

func (n *Network) CrossChainAppRequestFailed(ctx context.Context, chainID ids.ID, requestID uint32) error {
return n.router.CrossChainAppRequestFailed(ctx, chainID, requestID)
}

func (n *Network) Connected(_ context.Context, nodeID ids.NodeID, _ *version.Application) error {
n.Peers.add(nodeID)
return nil
}

func (n *Network) Disconnected(_ context.Context, nodeID ids.NodeID) error {
n.Peers.remove(nodeID)
return nil
}

// NewAppProtocol reserves an identifier for an application protocol handler and
// returns a Client that can be used to send messages for the corresponding
// protocol.
func (n *Network) NewAppProtocol(handlerID uint64, handler Handler, options ...ClientOption) (*Client, error) {
if err := n.router.addHandler(handlerID, handler); err != nil {
return nil, err
}

client := &Client{
handlerID: handlerID,
handlerPrefix: binary.AppendUvarint(nil, handlerID),
sender: n.sender,
router: n.router,
options: &clientOptions{
nodeSampler: &peerSampler{
peers: n.Peers,
},
},
}

for _, option := range options {
option.apply(client.options)
}

return client, nil
}

// Peers contains metadata about the current set of connected peers
type Peers struct {
lock sync.RWMutex
set set.SampleableSet[ids.NodeID]
}

func (p *Peers) add(nodeID ids.NodeID) {
p.lock.Lock()
defer p.lock.Unlock()

p.set.Add(nodeID)
}

func (p *Peers) remove(nodeID ids.NodeID) {
p.lock.Lock()
defer p.lock.Unlock()

p.set.Remove(nodeID)
}

func (p *Peers) has(nodeID ids.NodeID) bool {
p.lock.RLock()
defer p.lock.RUnlock()

return p.set.Contains(nodeID)
}

// Sample returns a pseudo-random sample of up to limit Peers
func (p *Peers) Sample(limit int) []ids.NodeID {
p.lock.RLock()
defer p.lock.RUnlock()

return p.set.Sample(limit)
}

type peerSampler struct {
peers *Peers
}

func (p peerSampler) Sample(_ context.Context, limit int) []ids.NodeID {
return p.peers.Sample(limit)
}
Loading

0 comments on commit de3b16c

Please sign in to comment.