From d4ff4578bbdcc5283c8a6535d4300906fd712c49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Milo=C5=A1=20=C5=BDivkovi=C4=87?= Date: Sun, 17 Nov 2024 11:16:26 +0900 Subject: [PATCH] Fixup module tests --- tm2/pkg/bft/blockchain/reactor_test.go | 111 ++++++++--- tm2/pkg/bft/consensus/reactor_test.go | 70 ++++--- tm2/pkg/bft/consensus/state_test.go | 4 +- tm2/pkg/bft/mempool/reactor_test.go | 143 ++------------ tm2/pkg/bft/node/node.go | 9 +- tm2/pkg/internal/p2p/p2p.go | 249 +++++++++++++++++++++++++ tm2/pkg/p2p/types/netaddress.go | 31 +-- tm2/pkg/p2p/types/node_info.go | 32 ++-- tm2/pkg/p2p/types/node_info_test.go | 18 +- 9 files changed, 445 insertions(+), 222 deletions(-) create mode 100644 tm2/pkg/internal/p2p/p2p.go diff --git a/tm2/pkg/bft/blockchain/reactor_test.go b/tm2/pkg/bft/blockchain/reactor_test.go index c2fd66712db..1bc2df59055 100644 --- a/tm2/pkg/bft/blockchain/reactor_test.go +++ b/tm2/pkg/bft/blockchain/reactor_test.go @@ -1,14 +1,13 @@ package blockchain import ( + "context" "log/slog" "os" "sort" "testing" "time" - "github.com/stretchr/testify/assert" - abci "github.com/gnolang/gno/tm2/pkg/bft/abci/types" "github.com/gnolang/gno/tm2/pkg/bft/appconn" cfg "github.com/gnolang/gno/tm2/pkg/bft/config" @@ -20,9 +19,12 @@ import ( tmtime "github.com/gnolang/gno/tm2/pkg/bft/types/time" "github.com/gnolang/gno/tm2/pkg/db/memdb" "github.com/gnolang/gno/tm2/pkg/errors" + p2pTesting "github.com/gnolang/gno/tm2/pkg/internal/p2p" "github.com/gnolang/gno/tm2/pkg/log" "github.com/gnolang/gno/tm2/pkg/p2p" + p2pTypes "github.com/gnolang/gno/tm2/pkg/p2p/types" "github.com/gnolang/gno/tm2/pkg/testutils" + "github.com/stretchr/testify/assert" ) var config *cfg.Config @@ -125,15 +127,35 @@ func TestNoBlockResponse(t *testing.T) { maxBlockHeight := int64(65) - reactorPairs := make([]BlockchainReactorPair, 2) + var ( + reactorPairs = make([]BlockchainReactorPair, 2) + options = make(map[int][]p2p.SwitchOption) + ) - reactorPairs[0] = newBlockchainReactor(log.NewTestingLogger(t), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.NewTestingLogger(t), genDoc, privVals, 0) + for i := range reactorPairs { + height := int64(0) + if i == 0 { + height = maxBlockHeight + } - p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.MultiplexSwitch) *p2p.MultiplexSwitch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s - }, p2p.Connect2Switches) + reactorPairs[i] = newBlockchainReactor(log.NewTestingLogger(t), genDoc, privVals, height) + + options[i] = []p2p.SwitchOption{ + p2p.WithReactor("BLOCKCHAIN", reactorPairs[i].reactor), + } + } + + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFn() + + testingCfg := p2pTesting.TestingConfig{ + Count: 2, + P2PCfg: config.P2P, + SwitchOptions: options, + Channels: []byte{BlockchainChannel}, + } + + p2pTesting.MakeConnectedPeers(t, ctx, testingCfg) defer func() { for _, r := range reactorPairs { @@ -194,17 +216,35 @@ func TestFlappyBadBlockStopsPeer(t *testing.T) { otherChain.app.Stop() }() - reactorPairs := make([]BlockchainReactorPair, 4) + var ( + reactorPairs = make([]BlockchainReactorPair, 4) + options = make(map[int][]p2p.SwitchOption) + ) + + for i := range reactorPairs { + height := int64(0) + if i == 0 { + height = maxBlockHeight + } + + reactorPairs[i] = newBlockchainReactor(log.NewNoopLogger(), genDoc, privVals, height) - reactorPairs[0] = newBlockchainReactor(log.NewNoopLogger(), genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(log.NewNoopLogger(), genDoc, privVals, 0) - reactorPairs[2] = newBlockchainReactor(log.NewNoopLogger(), genDoc, privVals, 0) - reactorPairs[3] = newBlockchainReactor(log.NewNoopLogger(), genDoc, privVals, 0) + options[i] = []p2p.SwitchOption{ + p2p.WithReactor("BLOCKCHAIN", reactorPairs[i].reactor), + } + } - switches := p2p.MakeConnectedSwitches(config.P2P, 4, func(i int, s *p2p.MultiplexSwitch) *p2p.MultiplexSwitch { - s.AddReactor("BLOCKCHAIN", reactorPairs[i].reactor) - return s - }, p2p.Connect2Switches) + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFn() + + testingCfg := p2pTesting.TestingConfig{ + Count: 4, + P2PCfg: config.P2P, + SwitchOptions: options, + Channels: []byte{BlockchainChannel}, + } + + switches, transports := p2pTesting.MakeConnectedPeers(t, ctx, testingCfg) defer func() { for _, r := range reactorPairs { @@ -222,7 +262,7 @@ func TestFlappyBadBlockStopsPeer(t *testing.T) { } // at this time, reactors[0-3] is the newest - assert.Equal(t, 3, reactorPairs[1].reactor.Switch.Peers().Size()) + assert.Equal(t, 3, len(reactorPairs[1].reactor.Switch.Peers().List())) // mark reactorPairs[3] is an invalid peer reactorPairs[3].reactor.store = otherChain.reactor.store @@ -230,24 +270,41 @@ func TestFlappyBadBlockStopsPeer(t *testing.T) { lastReactorPair := newBlockchainReactor(log.NewNoopLogger(), genDoc, privVals, 0) reactorPairs = append(reactorPairs, lastReactorPair) - switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.MultiplexSwitch) *p2p.MultiplexSwitch { - s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].reactor) - return s - }, p2p.Connect2Switches)...) + persistentPeers := make([]*p2pTypes.NetAddress, 0, len(transports)) - for i := 0; i < len(reactorPairs)-1; i++ { - p2p.Connect2Switches(switches, i, len(reactorPairs)-1) + for _, tr := range transports { + addr := tr.NetAddress() + persistentPeers = append(persistentPeers, &addr) } + for i, opt := range options { + opt = append(opt, p2p.WithPersistentPeers(persistentPeers)) + + options[i] = opt + } + + ctx, cancelFn = context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFn() + + testingCfg = p2pTesting.TestingConfig{ + Count: 1, + P2PCfg: config.P2P, + SwitchOptions: options, + Channels: []byte{BlockchainChannel}, + } + + sw, _ := p2pTesting.MakeConnectedPeers(t, ctx, testingCfg) + switches = append(switches, sw...) + for { - if lastReactorPair.reactor.pool.IsCaughtUp() || lastReactorPair.reactor.Switch.Peers().Size() == 0 { + if lastReactorPair.reactor.pool.IsCaughtUp() || len(lastReactorPair.reactor.Switch.Peers().List()) == 0 { break } time.Sleep(1 * time.Second) } - assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs)-1) + assert.True(t, len(lastReactorPair.reactor.Switch.Peers().List()) < len(reactorPairs)-1) } func TestBcBlockRequestMessageValidateBasic(t *testing.T) { diff --git a/tm2/pkg/bft/consensus/reactor_test.go b/tm2/pkg/bft/consensus/reactor_test.go index a9fb1968212..c864eb36360 100644 --- a/tm2/pkg/bft/consensus/reactor_test.go +++ b/tm2/pkg/bft/consensus/reactor_test.go @@ -1,14 +1,13 @@ package consensus import ( + "context" "fmt" "log/slog" "sync" "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/gnolang/gno/tm2/pkg/amino" "github.com/gnolang/gno/tm2/pkg/bft/abci/example/kvstore" cfg "github.com/gnolang/gno/tm2/pkg/bft/config" @@ -18,27 +17,39 @@ import ( "github.com/gnolang/gno/tm2/pkg/bitarray" "github.com/gnolang/gno/tm2/pkg/crypto/tmhash" "github.com/gnolang/gno/tm2/pkg/events" + p2pTesting "github.com/gnolang/gno/tm2/pkg/internal/p2p" "github.com/gnolang/gno/tm2/pkg/log" osm "github.com/gnolang/gno/tm2/pkg/os" "github.com/gnolang/gno/tm2/pkg/p2p" - "github.com/gnolang/gno/tm2/pkg/p2p/mock" "github.com/gnolang/gno/tm2/pkg/testutils" + "github.com/stretchr/testify/assert" ) // ---------------------------------------------- // in-process testnets -func startConsensusNet(css []*ConsensusState, n int) ([]*ConsensusReactor, []<-chan events.Event, []events.EventSwitch, []*p2p.MultiplexSwitch) { +func startConsensusNet( + t *testing.T, + css []*ConsensusState, + n int, +) ([]*ConsensusReactor, []<-chan events.Event, []events.EventSwitch, []*p2p.MultiplexSwitch) { + t.Helper() + reactors := make([]*ConsensusReactor, n) blocksSubs := make([]<-chan events.Event, 0) eventSwitches := make([]events.EventSwitch, n) p2pSwitches := ([]*p2p.MultiplexSwitch)(nil) + options := make(map[int][]p2p.SwitchOption) for i := 0; i < n; i++ { /*logger, err := tmflags.ParseLogLevel("consensus:info,*:error", logger, "info") if err != nil { t.Fatal(err)}*/ reactors[i] = NewConsensusReactor(css[i], true) // so we dont start the consensus states reactors[i].SetLogger(css[i].Logger) + options[i] = []p2p.SwitchOption{ + p2p.WithReactor("CONSENSUS", reactors[i]), + } + // evsw is already started with the cs eventSwitches[i] = css[i].evsw reactors[i].SetEventSwitch(eventSwitches[i]) @@ -51,11 +62,22 @@ func startConsensusNet(css []*ConsensusState, n int) ([]*ConsensusReactor, []<-c } } // make connected switches and start all reactors - p2pSwitches = p2p.MakeConnectedSwitches(config.P2P, n, func(i int, s *p2p.MultiplexSwitch) *p2p.MultiplexSwitch { - s.AddReactor("CONSENSUS", reactors[i]) - s.SetLogger(reactors[i].conS.Logger.With("module", "p2p")) - return s - }, p2p.Connect2Switches) + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFn() + + testingCfg := p2pTesting.TestingConfig{ + P2PCfg: config.P2P, + Count: n, + SwitchOptions: options, + Channels: []byte{ + StateChannel, + DataChannel, + VoteChannel, + VoteSetBitsChannel, + }, + } + + p2pSwitches, _ = p2pTesting.MakeConnectedPeers(t, ctx, testingCfg) // now that everyone is connected, start the state machines // If we started the state machines before everyone was connected, @@ -68,11 +90,15 @@ func startConsensusNet(css []*ConsensusState, n int) ([]*ConsensusReactor, []<-c return reactors, blocksSubs, eventSwitches, p2pSwitches } -func stopConsensusNet(logger *slog.Logger, reactors []*ConsensusReactor, eventSwitches []events.EventSwitch, p2pSwitches []*p2p.MultiplexSwitch) { +func stopConsensusNet( + logger *slog.Logger, + reactors []*ConsensusReactor, + eventSwitches []events.EventSwitch, + p2pSwitches []*p2p.MultiplexSwitch, +) { logger.Info("stopConsensusNet", "n", len(reactors)) - for i, r := range reactors { + for i, _ := range reactors { logger.Info("stopConsensusNet: Stopping ConsensusReactor", "i", i) - r.Switch.Stop() } for i, b := range eventSwitches { logger.Info("stopConsensusNet: Stopping evsw", "i", i) @@ -92,7 +118,7 @@ func TestReactorBasic(t *testing.T) { N := 4 css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) defer cleanup() - reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(css, N) + reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(t, css, N) defer stopConsensusNet(log.NewTestingLogger(t), reactors, eventSwitches, p2pSwitches) // wait till everyone makes the first new block timeoutWaitGroup(t, N, func(j int) { @@ -112,7 +138,7 @@ func TestReactorCreatesBlockWhenEmptyBlocksFalse(t *testing.T) { c.Consensus.CreateEmptyBlocks = false }) defer cleanup() - reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(css, N) + reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(t, css, N) defer stopConsensusNet(log.NewTestingLogger(t), reactors, eventSwitches, p2pSwitches) // send a tx @@ -132,12 +158,12 @@ func TestReactorReceiveDoesNotPanicIfAddPeerHasntBeenCalledYet(t *testing.T) { N := 1 css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) defer cleanup() - reactors, _, eventSwitches, p2pSwitches := startConsensusNet(css, N) + reactors, _, eventSwitches, p2pSwitches := startConsensusNet(t, css, N) defer stopConsensusNet(log.NewTestingLogger(t), reactors, eventSwitches, p2pSwitches) var ( reactor = reactors[0] - peer = mock.NewPeer(nil) + peer = p2pTesting.NewPeer(t) msg = amino.MustMarshalAny(&HasVoteMessage{Height: 1, Round: 1, Index: 1, Type: types.PrevoteType}) ) @@ -156,12 +182,12 @@ func TestReactorReceivePanicsIfInitPeerHasntBeenCalledYet(t *testing.T) { N := 1 css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) defer cleanup() - reactors, _, eventSwitches, p2pSwitches := startConsensusNet(css, N) + reactors, _, eventSwitches, p2pSwitches := startConsensusNet(t, css, N) defer stopConsensusNet(log.NewTestingLogger(t), reactors, eventSwitches, p2pSwitches) var ( reactor = reactors[0] - peer = mock.NewPeer(nil) + peer = p2pTesting.NewPeer(t) msg = amino.MustMarshalAny(&HasVoteMessage{Height: 1, Round: 1, Index: 1, Type: types.PrevoteType}) ) @@ -182,7 +208,7 @@ func TestFlappyReactorRecordsVotesAndBlockParts(t *testing.T) { N := 4 css, cleanup := randConsensusNet(N, "consensus_reactor_test", newMockTickerFunc(true), newCounter) defer cleanup() - reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(css, N) + reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(t, css, N) defer stopConsensusNet(log.NewTestingLogger(t), reactors, eventSwitches, p2pSwitches) // wait till everyone makes the first new block @@ -210,7 +236,7 @@ func TestReactorVotingPowerChange(t *testing.T) { css, cleanup := randConsensusNet(nVals, "consensus_voting_power_changes_test", newMockTickerFunc(true), newPersistentKVStore) defer cleanup() - reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(css, nVals) + reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(t, css, nVals) defer stopConsensusNet(logger, reactors, eventSwitches, p2pSwitches) // map of active validators @@ -276,7 +302,7 @@ func TestReactorValidatorSetChanges(t *testing.T) { logger := log.NewTestingLogger(t) - reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(css, nPeers) + reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(t, css, nPeers) defer stopConsensusNet(logger, reactors, eventSwitches, p2pSwitches) // map of active validators @@ -375,7 +401,7 @@ func TestReactorWithTimeoutCommit(t *testing.T) { css[i].config.SkipTimeoutCommit = false } - reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(css, N-1) + reactors, blocksSubs, eventSwitches, p2pSwitches := startConsensusNet(t, css, N-1) defer stopConsensusNet(log.NewTestingLogger(t), reactors, eventSwitches, p2pSwitches) // wait till everyone makes the first new block diff --git a/tm2/pkg/bft/consensus/state_test.go b/tm2/pkg/bft/consensus/state_test.go index 201cf8906b3..8c340d12eae 100644 --- a/tm2/pkg/bft/consensus/state_test.go +++ b/tm2/pkg/bft/consensus/state_test.go @@ -1733,7 +1733,7 @@ func TestStateOutputsBlockPartsStats(t *testing.T) { // create dummy peer cs, _ := randConsensusState(1) - peer := p2pmock.NewPeer(nil) + peer := p2pmock.Peer{} // 1) new block part parts := types.NewPartSetFromData(random.RandBytes(100), 10) @@ -1777,7 +1777,7 @@ func TestStateOutputVoteStats(t *testing.T) { cs, vss := randConsensusState(2) // create dummy peer - peer := p2pmock.NewPeer(nil) + peer := &p2pmock.Peer{} vote := signVote(vss[1], types.PrecommitType, []byte("test"), types.PartSetHeader{}) diff --git a/tm2/pkg/bft/mempool/reactor_test.go b/tm2/pkg/bft/mempool/reactor_test.go index 6f84c7d7951..2d20fb252e2 100644 --- a/tm2/pkg/bft/mempool/reactor_test.go +++ b/tm2/pkg/bft/mempool/reactor_test.go @@ -3,29 +3,23 @@ package mempool import ( "context" "fmt" - "net" "sync" "testing" "time" "github.com/fortytw2/leaktest" - "github.com/gnolang/gno/tm2/pkg/log" - "github.com/gnolang/gno/tm2/pkg/p2p/conn" - "github.com/gnolang/gno/tm2/pkg/p2p/events" - p2pTypes "github.com/gnolang/gno/tm2/pkg/p2p/types" - "github.com/gnolang/gno/tm2/pkg/versionset" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" - "github.com/gnolang/gno/tm2/pkg/bft/abci/example/kvstore" memcfg "github.com/gnolang/gno/tm2/pkg/bft/mempool/config" "github.com/gnolang/gno/tm2/pkg/bft/proxy" "github.com/gnolang/gno/tm2/pkg/bft/types" "github.com/gnolang/gno/tm2/pkg/errors" + p2pTesting "github.com/gnolang/gno/tm2/pkg/internal/p2p" + "github.com/gnolang/gno/tm2/pkg/log" "github.com/gnolang/gno/tm2/pkg/p2p" p2pcfg "github.com/gnolang/gno/tm2/pkg/p2p/config" + p2pTypes "github.com/gnolang/gno/tm2/pkg/p2p/types" "github.com/gnolang/gno/tm2/pkg/testutils" + "github.com/stretchr/testify/assert" ) // testP2PConfig returns a configuration for testing the peer-to-peer layer @@ -72,134 +66,19 @@ func makeAndConnectReactors(t *testing.T, mconfig *memcfg.MempoolConfig, pconfig } // "Simulate" the networking layer - makeConnectedSwitches(t, pconfig, n, options) - - return reactors -} - -// makeConnectedSwitches creates a cluster of peers, with the given options. -// Used to simulate the networking layer for the specific module -func makeConnectedSwitches( - t *testing.T, - cfg *p2pcfg.P2PConfig, - n int, - opts map[int][]p2p.SwitchOption, -) []*p2p.MultiplexSwitch { - t.Helper() - - var ( - sws = make([]*p2p.MultiplexSwitch, 0, n) - ts = make([]*p2p.MultiplexTransport, 0, n) - addrs = make([]*p2pTypes.NetAddress, 0, n) - ) - - // Generate the switches - for i := range n { - var ( - key = p2pTypes.GenerateNodeKey() - tcpAddr = &net.TCPAddr{ - IP: net.ParseIP("127.0.0.1"), - Port: 0, // random port - } - ) - - addr, err := p2pTypes.NewNetAddress(key.ID(), tcpAddr) - require.NoError(t, err) - - info := p2pTypes.NodeInfo{ - VersionSet: versionset.VersionSet{ - versionset.VersionInfo{ - Name: "p2p", - Version: "v0.0.0", - }, - }, - NetAddress: addr, - Network: "testing", - Software: "p2ptest", - Version: "v1.2.3-rc.0-deadbeef", - Channels: []byte{MempoolChannel}, - Moniker: fmt.Sprintf("node-%d", i), - Other: p2pTypes.NodeInfoOther{ - TxIndex: "off", - RPCAddress: fmt.Sprintf("127.0.0.1:%d", 0), - }, - } - - // Create the multiplex transport - multiplexTransport := p2p.NewMultiplexTransport( - info, - *key, - conn.MConfigFromP2P(cfg), - log.NewNoopLogger(), - ) - - // Start the transport - require.NoError(t, multiplexTransport.Listen(*info.NetAddress)) - - t.Cleanup(func() { - assert.NoError(t, multiplexTransport.Close()) - }) - - dialAddr := multiplexTransport.NetAddress() - - addrs = append(addrs, &dialAddr) - ts = append(ts, multiplexTransport) - } - ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) defer cancelFn() - g, _ := errgroup.WithContext(ctx) - - for i := range n { - // Make sure the switches connect to each other. - // Set up event listeners to make sure - // the setup method blocks until switches are connected - opts[i] = append(opts[i], p2p.WithPersistentPeers(addrs)) - - multiplexSwitch := p2p.NewMultiplexSwitch(ts[i], opts[i]...) - - ch, unsubFn := multiplexSwitch.Subscribe(func(event events.Event) bool { - return event.Type() == events.PeerConnected - }) - - // Start the switch - require.NoError(t, multiplexSwitch.Start()) - - sws = append(sws, multiplexSwitch) - - g.Go(func() error { - defer func() { - unsubFn() - }() - - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - - connectedPeers := make(map[p2pTypes.ID]struct{}) - - for { - select { - case evRaw := <-ch: - ev := evRaw.(events.PeerConnectedEvent) - - connectedPeers[ev.PeerID] = struct{}{} - - if len(connectedPeers) == n-1 { - return nil - } - case <-timer.C: - return errors.New("timed out waiting for peers to connect") - } - } - }) - - sws[i].DialPeers(addrs...) + cfg := p2pTesting.TestingConfig{ + Count: n, + P2PCfg: pconfig, + SwitchOptions: options, + Channels: []byte{MempoolChannel}, } - require.NoError(t, g.Wait()) + p2pTesting.MakeConnectedPeers(t, ctx, cfg) - return sws + return reactors } func waitForTxsOnReactors( diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index 8ca3d60fc04..aa22250e522 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -12,6 +12,8 @@ import ( "sync" "time" + goErrors "errors" + "github.com/gnolang/gno/tm2/pkg/bft/appconn" "github.com/gnolang/gno/tm2/pkg/bft/state/eventstore/file" "github.com/gnolang/gno/tm2/pkg/p2p/conn" @@ -853,8 +855,13 @@ func makeNodeInfo( } nodeInfo.NetAddress = addr + // Validate the node info err = nodeInfo.Validate() - return nodeInfo, err + if !goErrors.Is(err, p2pTypes.ErrUnspecifiedIP) { + return p2pTypes.NodeInfo{}, fmt.Errorf("unable to validate node info, %w", err) + } + + return nodeInfo, nil } // ------------------------------------------------------------------------------ diff --git a/tm2/pkg/internal/p2p/p2p.go b/tm2/pkg/internal/p2p/p2p.go new file mode 100644 index 00000000000..ed3f2f0f70e --- /dev/null +++ b/tm2/pkg/internal/p2p/p2p.go @@ -0,0 +1,249 @@ +// Package p2p contains testing code that is moved over, and adapted from p2p/test_utils.go. +// This isn't a good way to simulate the networking layer in TM2 modules. +// It actually isn't a good way to simulate the networking layer, in anything. +// +// Code is carried over to keep the testing code of p2p-dependent modules happy +// and "working". We should delete this entire package the second TM2 module unit tests don't +// need to rely on a live p2p cluster to pass. +package p2p + +import ( + "context" + "crypto/rand" + "errors" + "fmt" + "net" + "testing" + "time" + + "github.com/gnolang/gno/tm2/pkg/log" + "github.com/gnolang/gno/tm2/pkg/p2p" + p2pcfg "github.com/gnolang/gno/tm2/pkg/p2p/config" + "github.com/gnolang/gno/tm2/pkg/p2p/conn" + "github.com/gnolang/gno/tm2/pkg/p2p/events" + p2pTypes "github.com/gnolang/gno/tm2/pkg/p2p/types" + "github.com/gnolang/gno/tm2/pkg/service" + "github.com/gnolang/gno/tm2/pkg/versionset" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// TestingConfig is the P2P cluster testing config +type TestingConfig struct { + P2PCfg *p2pcfg.P2PConfig // the common p2p configuration + Count int // the size of the cluster + SwitchOptions map[int][]p2p.SwitchOption // multiplex switch options + Channels []byte // the common p2p peer multiplex channels +} + +// MakeConnectedPeers creates a cluster of peers, with the given options. +// Used to simulate the networking layer for a TM2 module +func MakeConnectedPeers( + t *testing.T, + ctx context.Context, + cfg TestingConfig, +) ([]*p2p.MultiplexSwitch, []*p2p.MultiplexTransport) { + t.Helper() + + // Initialize collections for switches, transports, and addresses. + var ( + sws = make([]*p2p.MultiplexSwitch, 0, cfg.Count) + ts = make([]*p2p.MultiplexTransport, 0, cfg.Count) + addrs = make([]*p2pTypes.NetAddress, 0, cfg.Count) + ) + + createTransport := func(index int) *p2p.MultiplexTransport { + // Generate a fresh key + key := p2pTypes.GenerateNodeKey() + + addr, err := p2pTypes.NewNetAddress( + key.ID(), + &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 0, // random free port + }, + ) + require.NoError(t, err) + + info := p2pTypes.NodeInfo{ + VersionSet: versionset.VersionSet{ + versionset.VersionInfo{Name: "p2p", Version: "v0.0.0"}, + }, + NetAddress: addr, + Network: "testing", + Software: "p2ptest", + Version: "v1.2.3-rc.0-deadbeef", + Channels: cfg.Channels, + Moniker: fmt.Sprintf("node-%d", index), + Other: p2pTypes.NodeInfoOther{ + TxIndex: "off", + RPCAddress: fmt.Sprintf("127.0.0.1:%d", 0), + }, + } + + transport := p2p.NewMultiplexTransport( + info, + *key, + conn.MConfigFromP2P(cfg.P2PCfg), + log.NewNoopLogger(), + ) + + require.NoError(t, transport.Listen(*info.NetAddress)) + t.Cleanup(func() { assert.NoError(t, transport.Close()) }) + + return transport + } + + // Create transports and gather addresses + for i := 0; i < cfg.Count; i++ { + transport := createTransport(i) + addr := transport.NetAddress() + + addrs = append(addrs, &addr) + ts = append(ts, transport) + } + + // Connect switches and ensure all peers are connected + connectPeers := func(switchIndex int) error { + multiplexSwitch := p2p.NewMultiplexSwitch( + ts[switchIndex], + cfg.SwitchOptions[switchIndex]..., + ) + + ch, unsubFn := multiplexSwitch.Subscribe(func(event events.Event) bool { + return event.Type() == events.PeerConnected + }) + defer unsubFn() + + // Start the switch + require.NoError(t, multiplexSwitch.Start()) + + // Save it + sws = append(sws, multiplexSwitch) + + if cfg.Count == 1 { + // No peers to dial, switch is alone + return nil + } + + // Async dial the other peers + multiplexSwitch.DialPeers(addrs...) + + // Set up an exit timer + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + connectedPeers := make(map[p2pTypes.ID]struct{}) + + for { + select { + case evRaw := <-ch: + ev := evRaw.(events.PeerConnectedEvent) + + connectedPeers[ev.PeerID] = struct{}{} + + if len(connectedPeers) == cfg.Count-1 { + return nil + } + case <-timer.C: + return errors.New("timed out waiting for peer switches to connect") + } + } + } + + g, _ := errgroup.WithContext(ctx) + for i := 0; i < cfg.Count; i++ { + g.Go(func() error { return connectPeers(i) }) + } + + require.NoError(t, g.Wait()) + + return sws, ts +} + +// createRoutableAddr generates a valid, routable NetAddress for the given node ID using a secure random IP +func createRoutableAddr(t *testing.T, id p2pTypes.ID) *p2pTypes.NetAddress { + generateIP := func() string { + ip := make([]byte, 4) + + _, err := rand.Read(ip) + require.NoError(t, err) + + return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3]) + } + + for { + addrStr := fmt.Sprintf("%s@%s:26656", id, generateIP()) + + netAddr, err := p2pTypes.NewNetAddressFromString(addrStr) + require.NoError(t, err) + + if netAddr.Routable() { + return netAddr + } + } +} + +// Peer is a live peer, utilized for testing purposes +type Peer struct { + *service.BaseService + ip net.IP + id p2pTypes.ID + addr *p2pTypes.NetAddress + kv map[string]any + + Outbound, Persistent, Private bool +} + +// NewPeer creates and starts a new mock peer. +// It generates a new routable address for the peer +func NewPeer(t *testing.T) *Peer { + t.Helper() + + var ( + nodeKey = p2pTypes.GenerateNodeKey() + netAddr = createRoutableAddr(t, nodeKey.ID()) + ) + + mp := &Peer{ + ip: netAddr.IP, + id: nodeKey.ID(), + addr: netAddr, + kv: make(map[string]interface{}), + } + + mp.BaseService = service.NewBaseService(nil, "MockPeer", mp) + + require.NoError(t, mp.Start()) + + return mp +} + +func (mp *Peer) FlushStop() { mp.Stop() } +func (mp *Peer) TrySend(_ byte, _ []byte) bool { return true } +func (mp *Peer) Send(_ byte, _ []byte) bool { return true } +func (mp *Peer) NodeInfo() p2pTypes.NodeInfo { + return p2pTypes.NodeInfo{ + NetAddress: mp.addr, + } +} +func (mp *Peer) Status() conn.ConnectionStatus { return conn.ConnectionStatus{} } +func (mp *Peer) ID() p2pTypes.ID { return mp.id } +func (mp *Peer) IsOutbound() bool { return mp.Outbound } +func (mp *Peer) IsPersistent() bool { return mp.Persistent } +func (mp *Peer) IsPrivate() bool { return mp.Private } +func (mp *Peer) Get(key string) interface{} { + if value, ok := mp.kv[key]; ok { + return value + } + return nil +} + +func (mp *Peer) Set(key string, value interface{}) { + mp.kv[key] = value +} +func (mp *Peer) RemoteIP() net.IP { return mp.ip } +func (mp *Peer) SocketAddr() *p2pTypes.NetAddress { return mp.addr } +func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (mp *Peer) CloseConn() error { return nil } diff --git a/tm2/pkg/p2p/types/netaddress.go b/tm2/pkg/p2p/types/netaddress.go index 746a287067b..25dbc8def0b 100644 --- a/tm2/pkg/p2p/types/netaddress.go +++ b/tm2/pkg/p2p/types/netaddress.go @@ -21,12 +21,12 @@ const ( ) var ( - errInvalidTCPAddress = errors.New("invalid TCP address") - errUnsetIPAddress = errors.New("unset IP address") - errInvalidIP = errors.New("invalid IP address") - errUnspecifiedIP = errors.New("unspecified IP address") - errInvalidNetAddress = errors.New("invalid net address") - errEmptyHost = errors.New("empty host address") + ErrInvalidTCPAddress = errors.New("invalid TCP address") + ErrUnsetIPAddress = errors.New("unset IP address") + ErrInvalidIP = errors.New("invalid IP address") + ErrUnspecifiedIP = errors.New("unspecified IP address") + ErrInvalidNetAddress = errors.New("invalid net address") + ErrEmptyHost = errors.New("empty host address") ) // NetAddress defines information about a peer on the network @@ -53,7 +53,7 @@ func NewNetAddress(id ID, addr net.Addr) (*NetAddress, error) { // Make sure the address is valid tcpAddr, ok := addr.(*net.TCPAddr) if !ok { - return nil, errInvalidTCPAddress + return nil, ErrInvalidTCPAddress } // Validate the ID @@ -82,7 +82,7 @@ func NewNetAddressFromString(idaddr string) (*NetAddress, error) { ) if len(spl) != 2 { - return nil, errInvalidNetAddress + return nil, ErrInvalidNetAddress } var ( @@ -102,7 +102,7 @@ func NewNetAddressFromString(idaddr string) (*NetAddress, error) { } if host == "" { - return nil, errEmptyHost + return nil, ErrEmptyHost } ip := net.ParseIP(host) @@ -258,17 +258,22 @@ func (na *NetAddress) Validate() error { // Make sure the IP is set if na.IP == nil { - return errUnsetIPAddress + return ErrUnsetIPAddress } // Make sure the IP is valid if len(na.IP) != 4 && len(na.IP) != 16 { - return errInvalidIP + return ErrInvalidIP } // Check if the IP is unspecified - if na.IP.IsUnspecified() || na.RFC3849() || na.IP.Equal(net.IPv4bcast) { - return errUnspecifiedIP + if na.IP.IsUnspecified() { + return ErrUnspecifiedIP + } + + // Check if the IP conforms to standards, or is a broadcast + if na.RFC3849() || na.IP.Equal(net.IPv4bcast) { + return ErrInvalidIP } return nil diff --git a/tm2/pkg/p2p/types/node_info.go b/tm2/pkg/p2p/types/node_info.go index c511eab6545..b3881e90fe3 100644 --- a/tm2/pkg/p2p/types/node_info.go +++ b/tm2/pkg/p2p/types/node_info.go @@ -14,14 +14,14 @@ const ( ) var ( - errInvalidNetworkAddress = errors.New("invalid node network address") - errInvalidVersion = errors.New("invalid node version") - errInvalidMoniker = errors.New("invalid node moniker") - errInvalidRPCAddress = errors.New("invalid node RPC address") - errExcessiveChannels = errors.New("excessive node channels") - errDuplicateChannels = errors.New("duplicate node channels") - errIncompatibleNetworks = errors.New("incompatible networks") - errNoCommonChannels = errors.New("no common channels") + ErrInvalidNetworkAddress = errors.New("invalid node network address") + ErrInvalidVersion = errors.New("invalid node version") + ErrInvalidMoniker = errors.New("invalid node moniker") + ErrInvalidRPCAddress = errors.New("invalid node RPC address") + ErrExcessiveChannels = errors.New("excessive node channels") + ErrDuplicateChannels = errors.New("duplicate node channels") + ErrIncompatibleNetworks = errors.New("incompatible networks") + ErrNoCommonChannels = errors.New("no common channels") ) // NodeInfo is the basic node information exchanged @@ -59,7 +59,7 @@ type NodeInfoOther struct { func (info NodeInfo) Validate() error { // Validate the network address if info.NetAddress == nil { - return errInvalidNetworkAddress + return ErrInvalidNetworkAddress } if err := info.NetAddress.Validate(); err != nil { @@ -70,18 +70,18 @@ func (info NodeInfo) Validate() error { if len(info.Version) > 0 && (!strings.IsASCIIText(info.Version) || strings.ASCIITrim(info.Version) == "") { - return errInvalidVersion + return ErrInvalidVersion } // Validate Channels - ensure max and check for duplicates. if len(info.Channels) > maxNumChannels { - return errExcessiveChannels + return ErrExcessiveChannels } channelMap := make(map[byte]struct{}, len(info.Channels)) for _, ch := range info.Channels { if _, ok := channelMap[ch]; ok { - return errDuplicateChannels + return ErrDuplicateChannels } // Mark the channel as present @@ -90,13 +90,13 @@ func (info NodeInfo) Validate() error { // Validate Moniker. if !strings.IsASCIIText(info.Moniker) || strings.ASCIITrim(info.Moniker) == "" { - return errInvalidMoniker + return ErrInvalidMoniker } // XXX: Should we be more strict about address formats? rpcAddr := info.Other.RPCAddress if len(rpcAddr) > 0 && (!strings.IsASCIIText(rpcAddr) || strings.ASCIITrim(rpcAddr) == "") { - return errInvalidRPCAddress + return ErrInvalidRPCAddress } return nil @@ -118,7 +118,7 @@ func (info NodeInfo) CompatibleWith(other NodeInfo) error { // Make sure nodes are on the same network if info.Network != other.Network { - return errIncompatibleNetworks + return ErrIncompatibleNetworks } // Make sure there is at least 1 channel in common @@ -138,7 +138,7 @@ func (info NodeInfo) CompatibleWith(other NodeInfo) error { } if !commonFound { - return errNoCommonChannels + return ErrNoCommonChannels } return nil diff --git a/tm2/pkg/p2p/types/node_info_test.go b/tm2/pkg/p2p/types/node_info_test.go index d44c72bb20b..aa1e93a4aa6 100644 --- a/tm2/pkg/p2p/types/node_info_test.go +++ b/tm2/pkg/p2p/types/node_info_test.go @@ -35,7 +35,7 @@ func TestNodeInfo_Validate(t *testing.T) { { "unset net address", nil, - errInvalidNetworkAddress, + ErrInvalidNetworkAddress, }, { "zero net address ID", @@ -50,7 +50,7 @@ func TestNodeInfo_Validate(t *testing.T) { ID: GenerateNodeKey().ID(), IP: net.IP([]byte{0x00}), }, - errInvalidIP, + ErrInvalidIP, }, } @@ -97,7 +97,7 @@ func TestNodeInfo_Validate(t *testing.T) { Version: testCase.version, } - assert.ErrorIs(t, info.Validate(), errInvalidVersion) + assert.ErrorIs(t, info.Validate(), ErrInvalidVersion) }) } }) @@ -136,7 +136,7 @@ func TestNodeInfo_Validate(t *testing.T) { Moniker: testCase.moniker, } - assert.ErrorIs(t, info.Validate(), errInvalidMoniker) + assert.ErrorIs(t, info.Validate(), ErrInvalidMoniker) }) } }) @@ -174,7 +174,7 @@ func TestNodeInfo_Validate(t *testing.T) { }, } - assert.ErrorIs(t, info.Validate(), errInvalidRPCAddress) + assert.ErrorIs(t, info.Validate(), ErrInvalidRPCAddress) }) } }) @@ -190,7 +190,7 @@ func TestNodeInfo_Validate(t *testing.T) { { "too many channels", make([]byte, maxNumChannels+1), - errExcessiveChannels, + ErrExcessiveChannels, }, { "duplicate channels", @@ -199,7 +199,7 @@ func TestNodeInfo_Validate(t *testing.T) { byte(20), byte(10), }, - errDuplicateChannels, + ErrDuplicateChannels, }, } @@ -293,7 +293,7 @@ func TestNodeInfo_CompatibleWith(t *testing.T) { } ) - assert.ErrorIs(t, infoTwo.CompatibleWith(*infoOne), errIncompatibleNetworks) + assert.ErrorIs(t, infoTwo.CompatibleWith(*infoOne), ErrIncompatibleNetworks) }) t.Run("no common channels", func(t *testing.T) { @@ -327,7 +327,7 @@ func TestNodeInfo_CompatibleWith(t *testing.T) { } ) - assert.ErrorIs(t, infoTwo.CompatibleWith(*infoOne), errNoCommonChannels) + assert.ErrorIs(t, infoTwo.CompatibleWith(*infoOne), ErrNoCommonChannels) }) t.Run("fully compatible node infos", func(t *testing.T) {