diff --git a/gno.land/cmd/gnoland/secrets_get.go b/gno.land/cmd/gnoland/secrets_get.go index 64eb48f7d27..0a0a714f6ee 100644 --- a/gno.land/cmd/gnoland/secrets_get.go +++ b/gno.land/cmd/gnoland/secrets_get.go @@ -12,7 +12,6 @@ import ( "github.com/gnolang/gno/tm2/pkg/bft/privval" "github.com/gnolang/gno/tm2/pkg/commands" osm "github.com/gnolang/gno/tm2/pkg/os" - "github.com/gnolang/gno/tm2/pkg/p2p" "github.com/gnolang/gno/tm2/pkg/p2p/types" ) @@ -200,7 +199,7 @@ func readNodeID(path string) (*nodeIDInfo, error) { // constructP2PAddress constructs the P2P address other nodes can use // to connect directly -func constructP2PAddress(nodeID p2p.ID, listenAddress string) string { +func constructP2PAddress(nodeID types.ID, listenAddress string) string { var ( address string parts = strings.SplitN(listenAddress, "://", 2) diff --git a/tm2/pkg/bft/mempool/reactor.go b/tm2/pkg/bft/mempool/reactor.go index 634d92277c2..acb4e351f3f 100644 --- a/tm2/pkg/bft/mempool/reactor.go +++ b/tm2/pkg/bft/mempool/reactor.go @@ -47,12 +47,12 @@ type mempoolIDs struct { // Reserve searches for the next unused ID and assigns it to the // peer. -func (ids *mempoolIDs) ReserveForPeer(peer p2p.Peer) { +func (ids *mempoolIDs) ReserveForPeer(id p2pTypes.ID) { ids.mtx.Lock() defer ids.mtx.Unlock() curID := ids.nextPeerID() - ids.peerMap[peer.ID()] = curID + ids.peerMap[id] = curID ids.activeIDs[curID] = struct{}{} } @@ -74,23 +74,23 @@ func (ids *mempoolIDs) nextPeerID() uint16 { } // Reclaim returns the ID reserved for the peer back to unused pool. -func (ids *mempoolIDs) Reclaim(peer p2p.Peer) { +func (ids *mempoolIDs) Reclaim(id p2pTypes.ID) { ids.mtx.Lock() defer ids.mtx.Unlock() - removedID, ok := ids.peerMap[peer.ID()] + removedID, ok := ids.peerMap[id] if ok { delete(ids.activeIDs, removedID) - delete(ids.peerMap, peer.ID()) + delete(ids.peerMap, id) } } // GetForPeer returns an ID reserved for the peer. -func (ids *mempoolIDs) GetForPeer(peer p2p.Peer) uint16 { +func (ids *mempoolIDs) GetForPeer(id p2pTypes.ID) uint16 { ids.mtx.RLock() defer ids.mtx.RUnlock() - return ids.peerMap[peer.ID()] + return ids.peerMap[id] } func newMempoolIDs() *mempoolIDs { @@ -140,13 +140,13 @@ func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { // AddPeer implements Reactor. // It starts a broadcast routine ensuring all txs are forwarded to the given peer. func (memR *Reactor) AddPeer(peer p2p.Peer) { - memR.ids.ReserveForPeer(peer) + memR.ids.ReserveForPeer(peer.ID()) go memR.broadcastTxRoutine(peer) } // RemovePeer implements Reactor. func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { - memR.ids.Reclaim(peer) + memR.ids.Reclaim(peer.ID()) // broadcast routine checks if peer is gone and returns } @@ -163,7 +163,7 @@ func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { switch msg := msg.(type) { case *TxMessage: - peerID := memR.ids.GetForPeer(src) + peerID := memR.ids.GetForPeer(src.ID()) err := memR.mempool.CheckTxWithInfo(msg.Tx, nil, TxInfo{SenderID: peerID}) if err != nil { memR.Logger.Info("Could not check tx", "tx", txID(msg.Tx), "err", err) @@ -185,7 +185,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) { return } - peerID := memR.ids.GetForPeer(peer) + peerID := memR.ids.GetForPeer(peer.ID()) var next *clist.CElement for { // In case of both next.NextWaitChan() and peer.Quit() are variable at the same time diff --git a/tm2/pkg/bft/mempool/reactor_test.go b/tm2/pkg/bft/mempool/reactor_test.go index 19403d09b76..d877af3e9cc 100644 --- a/tm2/pkg/bft/mempool/reactor_test.go +++ b/tm2/pkg/bft/mempool/reactor_test.go @@ -1,13 +1,23 @@ package mempool import ( + "context" + "fmt" + "log/slog" "net" + "os" "sync" "testing" "time" "github.com/fortytw2/leaktest" + "github.com/gnolang/gno/tm2/pkg/p2p/conn" + "github.com/gnolang/gno/tm2/pkg/p2p/events" + p2pTypes "github.com/gnolang/gno/tm2/pkg/p2p/types" + "github.com/gnolang/gno/tm2/pkg/versionset" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" "github.com/gnolang/gno/tm2/pkg/bft/abci/example/kvstore" memcfg "github.com/gnolang/gno/tm2/pkg/bft/mempool/config" @@ -17,7 +27,6 @@ import ( "github.com/gnolang/gno/tm2/pkg/log" "github.com/gnolang/gno/tm2/pkg/p2p" p2pcfg "github.com/gnolang/gno/tm2/pkg/p2p/config" - "github.com/gnolang/gno/tm2/pkg/p2p/mock" "github.com/gnolang/gno/tm2/pkg/testutils" ) @@ -39,26 +48,178 @@ func (ps peerState) GetHeight() int64 { } // connect N mempool reactors through N switches -func makeAndConnectReactors(mconfig *memcfg.MempoolConfig, pconfig *p2pcfg.P2PConfig, n int) []*Reactor { - reactors := make([]*Reactor, n) - logger := log.NewNoopLogger() +func makeAndConnectReactors(t *testing.T, mconfig *memcfg.MempoolConfig, pconfig *p2pcfg.P2PConfig, n int) []*Reactor { + t.Helper() + + var ( + reactors = make([]*Reactor, n) + logger = log.NewNoopLogger() + options = make(map[int][]p2p.SwitchOption) + ) + for i := 0; i < n; i++ { app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) mempool, cleanup := newMempoolWithApp(cc) defer cleanup() - reactors[i] = NewReactor(mconfig, mempool) // so we dont start the consensus states - reactors[i].SetLogger(logger.With("validator", i)) + reactor := NewReactor(mconfig, mempool) // so we dont start the consensus states + reactor.SetLogger(logger.With("validator", i)) + + options[i] = []p2p.SwitchOption{ + p2p.WithReactor("MEMPOOL", reactor), + } + + reactors[i] = reactor } - p2p.MakeConnectedSwitches(pconfig, n, func(i int, s *p2p.MultiplexSwitch) *p2p.MultiplexSwitch { - s.AddReactor("MEMPOOL", reactors[i]) - return s - }, p2p.Connect2Switches) + // "Simulate" the networking layer + MakeConnectedSwitches(t, pconfig, n, options) + return reactors } +func MakeConnectedSwitches( + t *testing.T, + cfg *p2pcfg.P2PConfig, + n int, + opts map[int][]p2p.SwitchOption, +) []*p2p.MultiplexSwitch { + t.Helper() + + var ( + sws = make([]*p2p.MultiplexSwitch, 0, n) + ts = make([]*p2p.MultiplexTransport, 0, n) + addrs = make([]*p2pTypes.NetAddress, 0, n) + + // TODO remove + lgs = make([]*slog.Logger, 0, n) + ) + + // Generate the switches + for i := range n { + var ( + key = p2pTypes.GenerateNodeKey() + tcpAddr = &net.TCPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: 0, // random port + } + ) + + addr, err := p2pTypes.NewNetAddress(key.ID(), tcpAddr) + require.NoError(t, err) + + info := p2pTypes.NodeInfo{ + VersionSet: versionset.VersionSet{ + versionset.VersionInfo{ + Name: "p2p", + Version: "v0.0.0", + }, + }, + NetAddress: addr, + Network: "testing", + Software: "p2ptest", + Version: "v1.2.3-rc.0-deadbeef", + Channels: []byte{0x01}, + Moniker: fmt.Sprintf("node-%d", i), + Other: p2pTypes.NodeInfoOther{ + TxIndex: "off", + RPCAddress: fmt.Sprintf("127.0.0.1:%d", 0), + }, + } + + // TODO remove + lg := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + // Create the multiplex transport + multiplexTransport := p2p.NewMultiplexTransport( + info, + *key, + conn.MConfigFromP2P(cfg), + lg, + ) + + // Start the transport + require.NoError(t, multiplexTransport.Listen(*info.NetAddress)) + + t.Cleanup(func() { + assert.NoError(t, multiplexTransport.Close()) + }) + + dialAddr := multiplexTransport.NetAddress() + addrs = append(addrs, &dialAddr) + + ts = append(ts, multiplexTransport) + + // TODO remove + lgs = append(lgs, lg) + } + + ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelFn() + + g, _ := errgroup.WithContext(ctx) + + for i := range n { + // Make sure the switches connect to each other. + // Set up event listeners to make sure + // the setup method blocks until switches are connected + // Create the multiplex switch + newOpts := []p2p.SwitchOption{ + p2p.WithPersistentPeers(addrs), + } + + newOpts = append(newOpts, opts[i]...) + + multiplexSwitch := p2p.NewMultiplexSwitch(ts[i], newOpts...) + + multiplexSwitch.SetLogger(lgs[i].With("sw", i)) // TODO remove + + ch, unsubFn := multiplexSwitch.Subscribe(func(event events.Event) bool { + return event.Type() == events.PeerConnected + }) + + // Start the switch + require.NoError(t, multiplexSwitch.Start()) + + sws = append(sws, multiplexSwitch) + + g.Go(func() error { + defer func() { + unsubFn() + }() + + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + connectedPeers := make(map[p2pTypes.ID]struct{}) + + for { + select { + case evRaw := <-ch: + ev := evRaw.(events.PeerConnectedEvent) + + connectedPeers[ev.PeerID] = struct{}{} + + if len(connectedPeers) == n-1 { + return nil + } + case <-timer.C: + return errors.New("timed out waiting for peers to connect") + } + } + }) + + sws[i].DialPeers(addrs...) + } + + require.NoError(t, g.Wait()) + + fmt.Printf("\n\nDONE\n\n") + + return sws +} + func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) { t.Helper() @@ -120,14 +281,16 @@ func TestReactorBroadcastTxMessage(t *testing.T) { mconfig := memcfg.TestMempoolConfig() pconfig := testP2PConfig() const N = 4 - reactors := makeAndConnectReactors(mconfig, pconfig, N) - defer func() { + reactors := makeAndConnectReactors(t, mconfig, pconfig, N) + t.Cleanup(func() { for _, r := range reactors { - r.Stop() + assert.NoError(t, r.Stop()) } - }() + }) + for _, r := range reactors { for _, peer := range r.Switch.Peers().List() { + fmt.Printf("Setting peer %s\n", peer.ID()) peer.Set(types.PeerStateKey, peerState{1}) } } @@ -144,7 +307,7 @@ func TestReactorNoBroadcastToSender(t *testing.T) { mconfig := memcfg.TestMempoolConfig() pconfig := testP2PConfig() const N = 2 - reactors := makeAndConnectReactors(mconfig, pconfig, N) + reactors := makeAndConnectReactors(t, mconfig, pconfig, N) defer func() { for _, r := range reactors { r.Stop() @@ -169,7 +332,7 @@ func TestFlappyBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) { mconfig := memcfg.TestMempoolConfig() pconfig := testP2PConfig() const N = 2 - reactors := makeAndConnectReactors(mconfig, pconfig, N) + reactors := makeAndConnectReactors(t, mconfig, pconfig, N) defer func() { for _, r := range reactors { r.Stop() @@ -197,7 +360,7 @@ func TestFlappyBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) { mconfig := memcfg.TestMempoolConfig() pconfig := testP2PConfig() const N = 2 - reactors := makeAndConnectReactors(mconfig, pconfig, N) + reactors := makeAndConnectReactors(t, mconfig, pconfig, N) // stop reactors for _, r := range reactors { @@ -214,15 +377,15 @@ func TestMempoolIDsBasic(t *testing.T) { ids := newMempoolIDs() - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) + id := p2pTypes.GenerateNodeKey().ID() - ids.ReserveForPeer(peer) - assert.EqualValues(t, 1, ids.GetForPeer(peer)) - ids.Reclaim(peer) + ids.ReserveForPeer(id) + assert.EqualValues(t, 1, ids.GetForPeer(id)) + ids.Reclaim(id) - ids.ReserveForPeer(peer) - assert.EqualValues(t, 2, ids.GetForPeer(peer)) - ids.Reclaim(peer) + ids.ReserveForPeer(id) + assert.EqualValues(t, 2, ids.GetForPeer(id)) + ids.Reclaim(id) } func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { @@ -236,12 +399,13 @@ func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) { ids := newMempoolIDs() for i := 0; i < maxActiveIDs-1; i++ { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) + id := p2pTypes.GenerateNodeKey().ID() + ids.ReserveForPeer(id) } assert.Panics(t, func() { - peer := mock.NewPeer(net.IP{127, 0, 0, 1}) - ids.ReserveForPeer(peer) + id := p2pTypes.GenerateNodeKey().ID() + + ids.ReserveForPeer(id) }) } diff --git a/tm2/pkg/bft/node/node.go b/tm2/pkg/bft/node/node.go index dd0c541e20b..8ca3d60fc04 100644 --- a/tm2/pkg/bft/node/node.go +++ b/tm2/pkg/bft/node/node.go @@ -132,7 +132,7 @@ type Node struct { privValidator types.PrivValidator // local node's validator key // network - transport *p2p.Transport + transport *p2p.MultiplexTransport sw *p2p.MultiplexSwitch // p2p connections nodeInfo p2pTypes.NodeInfo nodeKey *p2pTypes.NodeKey // our node privkey @@ -458,7 +458,7 @@ func NewNode(config *cfg.Config, p2pLogger.Error("invalid private peer ID", "err", err) } - sw := p2p.NewSwitch( + sw := p2p.NewMultiplexSwitch( transport, p2p.WithReactor("MEMPOOL", mempoolReactor), p2p.WithReactor("BLOCKCHAIN", bcReactor), @@ -601,7 +601,9 @@ func (n *Node) OnStop() { } // now stop the reactors - n.sw.Stop() + if err := n.sw.Stop(); err != nil { + n.Logger.Error("unable to gracefully close switch", "err", err) + } // stop mempool WAL if n.config.Mempool.WalEnabled() { diff --git a/tm2/pkg/bft/rpc/client/local.go b/tm2/pkg/bft/rpc/client/local.go index 59c4216a468..4bc724e7d70 100644 --- a/tm2/pkg/bft/rpc/client/local.go +++ b/tm2/pkg/bft/rpc/client/local.go @@ -106,14 +106,6 @@ func (c *Local) Health() (*ctypes.ResultHealth, error) { return core.Health(c.ctx) } -func (c *Local) DialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) { - return core.UnsafeDialSeeds(c.ctx, seeds) -} - -func (c *Local) DialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { - return core.UnsafeDialPeers(c.ctx, peers, persistent) -} - func (c *Local) BlockchainInfo(minHeight, maxHeight int64) (*ctypes.ResultBlockchainInfo, error) { return core.BlockchainInfo(c.ctx, minHeight, maxHeight) } diff --git a/tm2/pkg/bft/rpc/core/net.go b/tm2/pkg/bft/rpc/core/net.go index 975d5ed822f..f8839b7d91f 100644 --- a/tm2/pkg/bft/rpc/core/net.go +++ b/tm2/pkg/bft/rpc/core/net.go @@ -3,7 +3,6 @@ package core import ( ctypes "github.com/gnolang/gno/tm2/pkg/bft/rpc/core/types" rpctypes "github.com/gnolang/gno/tm2/pkg/bft/rpc/lib/types" - "github.com/gnolang/gno/tm2/pkg/errors" ) // Get network info. @@ -154,10 +153,14 @@ import ( // } // // ``` -func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { - out, in, _ := p2pPeers.NumPeers() +func NetInfo(_ *rpctypes.Context) (*ctypes.ResultNetInfo, error) { + var ( + set = p2pPeers.Peers() + out, in = set.NumOutbound(), set.NumInbound() + ) + peers := make([]ctypes.Peer, 0, out+in) - for _, peer := range p2pPeers.Peers().List() { + for _, peer := range set.List() { nodeInfo := peer.NodeInfo() peers = append(peers, ctypes.Peer{ NodeInfo: nodeInfo, @@ -166,9 +169,7 @@ func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { RemoteIP: peer.RemoteIP().String(), }) } - // TODO: Should we include PersistentPeers and Seeds in here? - // PRO: useful info - // CON: privacy + return &ctypes.ResultNetInfo{ Listening: p2pTransport.IsListening(), Listeners: p2pTransport.Listeners(), @@ -177,33 +178,6 @@ func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { }, nil } -func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialSeeds, error) { - if len(seeds) == 0 { - return &ctypes.ResultDialSeeds{}, errors.New("No seeds provided") - } - logger.Info("DialSeeds", "seeds", seeds) - if err := p2pPeers.DialPeersAsync(seeds); err != nil { - return &ctypes.ResultDialSeeds{}, err - } - return &ctypes.ResultDialSeeds{Log: "Dialing seeds in progress. See /net_info for details"}, nil -} - -func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent bool) (*ctypes.ResultDialPeers, error) { - if len(peers) == 0 { - return &ctypes.ResultDialPeers{}, errors.New("No peers provided") - } - logger.Info("DialPeers", "peers", peers, "persistent", persistent) - if persistent { - if err := p2pPeers.AddPersistentPeers(peers); err != nil { - return &ctypes.ResultDialPeers{}, err - } - } - if err := p2pPeers.DialPeersAsync(peers); err != nil { - return &ctypes.ResultDialPeers{}, err - } - return &ctypes.ResultDialPeers{Log: "Dialing peers in progress. See /net_info for details"}, nil -} - // Get genesis file. // // ```shell diff --git a/tm2/pkg/bft/rpc/core/net_test.go b/tm2/pkg/bft/rpc/core/net_test.go deleted file mode 100644 index 163312e2ed9..00000000000 --- a/tm2/pkg/bft/rpc/core/net_test.go +++ /dev/null @@ -1,77 +0,0 @@ -package core - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - rpctypes "github.com/gnolang/gno/tm2/pkg/bft/rpc/lib/types" - "github.com/gnolang/gno/tm2/pkg/log" - "github.com/gnolang/gno/tm2/pkg/p2p" - p2pcfg "github.com/gnolang/gno/tm2/pkg/p2p/config" -) - -func TestUnsafeDialSeeds(t *testing.T) { - t.Parallel() - - sw := p2p.MakeSwitch(p2pcfg.DefaultP2PConfig(), 1, "testing", "123.123.123", - func(n int, sw *p2p.MultiplexSwitch) *p2p.MultiplexSwitch { return sw }) - err := sw.Start() - require.NoError(t, err) - defer sw.Stop() - - logger = log.NewNoopLogger() - p2pPeers = sw - - testCases := []struct { - seeds []string - isErr bool - }{ - {[]string{}, true}, - {[]string{"g1m6kmam774klwlh4dhmhaatd7al02m0h0jwnyc6@127.0.0.1:41198"}, false}, - {[]string{"127.0.0.1:41198"}, true}, - } - - for _, tc := range testCases { - res, err := UnsafeDialSeeds(&rpctypes.Context{}, tc.seeds) - if tc.isErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, res) - } - } -} - -func TestUnsafeDialPeers(t *testing.T) { - t.Parallel() - - sw := p2p.MakeSwitch(p2pcfg.DefaultP2PConfig(), 1, "testing", "123.123.123", - func(n int, sw *p2p.MultiplexSwitch) *p2p.MultiplexSwitch { return sw }) - err := sw.Start() - require.NoError(t, err) - defer sw.Stop() - - logger = log.NewNoopLogger() - p2pPeers = sw - - testCases := []struct { - peers []string - isErr bool - }{ - {[]string{}, true}, - {[]string{"g1m6kmam774klwlh4dhmhaatd7al02m0h0jwnyc6@127.0.0.1:41198"}, false}, - {[]string{"127.0.0.1:41198"}, true}, - } - - for _, tc := range testCases { - res, err := UnsafeDialPeers(&rpctypes.Context{}, tc.peers, false) - if tc.isErr { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.NotNil(t, res) - } - } -} diff --git a/tm2/pkg/bft/rpc/core/pipe.go b/tm2/pkg/bft/rpc/core/pipe.go index d5cdf2c268c..085fc35da55 100644 --- a/tm2/pkg/bft/rpc/core/pipe.go +++ b/tm2/pkg/bft/rpc/core/pipe.go @@ -43,9 +43,6 @@ type transport interface { } type peers interface { - AddPersistentPeers([]string) error - DialPeersAsync([]string) error - NumPeers() (outbound, inbound, dialig int) Peers() p2p.PeerSet } diff --git a/tm2/pkg/bft/rpc/core/routes.go b/tm2/pkg/bft/rpc/core/routes.go index 8d210f67985..76217a7cbd9 100644 --- a/tm2/pkg/bft/rpc/core/routes.go +++ b/tm2/pkg/bft/rpc/core/routes.go @@ -36,8 +36,6 @@ var Routes = map[string]*rpc.RPCFunc{ func AddUnsafeRoutes() { // control API - Routes["dial_seeds"] = rpc.NewRPCFunc(UnsafeDialSeeds, "seeds") - Routes["dial_peers"] = rpc.NewRPCFunc(UnsafeDialPeers, "peers,persistent") Routes["unsafe_flush_mempool"] = rpc.NewRPCFunc(UnsafeFlushMempool, "") // profiler API diff --git a/tm2/pkg/p2p/discovery/mock_test.go b/tm2/pkg/p2p/discovery/mock_test.go index b4a21827ce7..03919a673d3 100644 --- a/tm2/pkg/p2p/discovery/mock_test.go +++ b/tm2/pkg/p2p/discovery/mock_test.go @@ -4,6 +4,7 @@ import ( "net" "github.com/gnolang/gno/tm2/pkg/p2p" + "github.com/gnolang/gno/tm2/pkg/p2p/events" "github.com/gnolang/gno/tm2/pkg/p2p/types" ) @@ -12,6 +13,7 @@ type ( peersDelegate func() p2p.PeerSet stopPeerForErrorDelegate func(p2p.Peer, error) dialPeersDelegate func(...*types.NetAddress) + subscribeDelegate func(events.EventFilter) (<-chan events.Event, func()) ) type mockSwitch struct { @@ -19,6 +21,7 @@ type mockSwitch struct { peersFn peersDelegate stopPeerForErrorFn stopPeerForErrorDelegate dialPeersFn dialPeersDelegate + subscribeFn subscribeDelegate } func (m *mockSwitch) Broadcast(chID byte, data []byte) { @@ -47,6 +50,14 @@ func (m *mockSwitch) DialPeers(peerAddrs ...*types.NetAddress) { } } +func (m *mockSwitch) Subscribe(filter events.EventFilter) (<-chan events.Event, func()) { + if m.subscribeFn != nil { + m.subscribeFn(filter) + } + + return nil, func() {} +} + type ( addDelegate func(p2p.Peer) removeDelegate func(types.ID) bool @@ -91,14 +102,6 @@ func (m *mockPeerSet) Has(key types.ID) bool { return false } -func (m *mockPeerSet) HasIP(ip net.IP) bool { - if m.hasIPFn != nil { - return m.hasIPFn(ip) - } - - return false -} - func (m *mockPeerSet) Get(key types.ID) p2p.Peer { if m.getFn != nil { return m.getFn(key) diff --git a/tm2/pkg/p2p/events/doc.go b/tm2/pkg/p2p/events/doc.go new file mode 100644 index 00000000000..a624102379e --- /dev/null +++ b/tm2/pkg/p2p/events/doc.go @@ -0,0 +1,3 @@ +// Package events contains a simple p2p event system implementation, that simplifies asynchronous event flows in the +// p2p module. The event subscriptions allow for event filtering, which eases the load on the event notification flow. +package events diff --git a/tm2/pkg/p2p/events/events.go b/tm2/pkg/p2p/events/events.go new file mode 100644 index 00000000000..bf76e27d91e --- /dev/null +++ b/tm2/pkg/p2p/events/events.go @@ -0,0 +1,104 @@ +package events + +import ( + "sync" + + "github.com/rs/xid" +) + +// EventFilter is the filter function used to +// filter incoming p2p events. A false flag will +// consider the event as irrelevant +type EventFilter func(Event) bool + +// Events is the p2p event switch +type Events struct { + subs subscriptions + subscriptionsMux sync.RWMutex +} + +// New creates a new event subscription manager +func New() *Events { + return &Events{ + subs: make(subscriptions), + } +} + +// Subscribe registers a new filtered event listener +func (es *Events) Subscribe(filterFn EventFilter) (<-chan Event, func()) { + es.subscriptionsMux.Lock() + defer es.subscriptionsMux.Unlock() + + // Create a new subscription + id, ch := es.subs.add(filterFn) + + // Create the unsubscribe callback + unsubscribeFn := func() { + es.subscriptionsMux.Lock() + defer es.subscriptionsMux.Unlock() + + es.subs.remove(id) + } + + return ch, unsubscribeFn +} + +// Notify notifies all subscribers of an incoming event [BLOCKING] +func (es *Events) Notify(event Event) { + es.subscriptionsMux.RLock() + defer es.subscriptionsMux.RUnlock() + + es.subs.notify(event) +} + +type ( + // subscriptions holds the corresponding subscription information + subscriptions map[string]subscription // subscription ID -> subscription + + // subscription wraps the subscription notification channel, + // and the event filter + subscription struct { + ch chan Event + filterFn EventFilter + } +) + +// add adds a new subscription to the subscription map. +// Returns the subscription ID, and update channel +func (s *subscriptions) add(filterFn EventFilter) (string, chan Event) { + var ( + id = xid.New().String() + ch = make(chan Event, 1) + ) + + (*s)[id] = subscription{ + ch: ch, + filterFn: filterFn, + } + + return id, ch +} + +// remove removes the given subscription +func (s *subscriptions) remove(id string) { + if sub, exists := (*s)[id]; exists { + // Close the notification channel + close(sub.ch) + } + + // Delete the subscription + delete(*s, id) +} + +// notify notifies all subscription listeners, +// if their filters pass +func (s *subscriptions) notify(event Event) { + // Notify the listeners + for _, sub := range *s { + if !sub.filterFn(event) { + continue + } + + sub.ch <- event + } +} diff --git a/tm2/pkg/p2p/events/events_test.go b/tm2/pkg/p2p/events/events_test.go new file mode 100644 index 00000000000..a0feafceddb --- /dev/null +++ b/tm2/pkg/p2p/events/events_test.go @@ -0,0 +1,94 @@ +package events + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/gnolang/gno/tm2/pkg/p2p/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// generateEvents generates p2p events +func generateEvents(count int) []Event { + events := make([]Event, 0, count) + + for i := range count { + var event Event + + if i%2 == 0 { + event = PeerConnectedEvent{ + PeerID: types.ID(fmt.Sprintf("peer-%d", i)), + } + } else { + event = PeerDisconnectedEvent{ + PeerID: types.ID(fmt.Sprintf("peer-%d", i)), + } + } + + events = append(events, event) + } + + return events +} + +func TestEvents_Subscribe(t *testing.T) { + t.Parallel() + + var ( + capturedEvents []Event + + events = generateEvents(10) + subFn = func(e Event) bool { + return e.Type() == PeerDisconnected + } + ) + + // Create the events manager + e := New() + + // Subscribe to events + ch, unsubFn := e.Subscribe(subFn) + defer unsubFn() + + // Listen for the events + var wg sync.WaitGroup + + wg.Add(1) + + go func() { + defer wg.Done() + + timeout := time.After(5 * time.Second) + + for { + select { + case ev := <-ch: + capturedEvents = append(capturedEvents, ev) + + if len(capturedEvents) == len(events)/2 { + return + } + case <-timeout: + return + } + } + }() + + // Send out the events + for _, ev := range events { + e.Notify(ev) + } + + wg.Wait() + + // Make sure the events were captured + // and filtered properly + require.Len(t, capturedEvents, len(events)/2) + + for _, ev := range capturedEvents { + assert.Equal(t, ev.Type(), PeerDisconnected) + } +} diff --git a/tm2/pkg/p2p/events/types.go b/tm2/pkg/p2p/events/types.go new file mode 100644 index 00000000000..cbaac1816ff --- /dev/null +++ b/tm2/pkg/p2p/events/types.go @@ -0,0 +1,39 @@ +package events + +import ( + "net" + + "github.com/gnolang/gno/tm2/pkg/p2p/types" +) + +type EventType string + +const ( + PeerConnected EventType = "PeerConnected" // emitted when a fresh peer connects + PeerDisconnected EventType = "PeerDisconnected" // emitted when a peer disconnects +) + +// Event is a generic p2p event +type Event interface { + // Type returns the type information for the event + Type() EventType +} + +type PeerConnectedEvent struct { + PeerID types.ID // the ID of the peer + Address net.Addr // the remote address of the peer +} + +func (p PeerConnectedEvent) Type() EventType { + return PeerConnected +} + +type PeerDisconnectedEvent struct { + PeerID types.ID // the ID of the peer + Address net.Addr // the remote address of the peer + Reason error // the disconnect reason, if any +} + +func (p PeerDisconnectedEvent) Type() EventType { + return PeerDisconnected +} diff --git a/tm2/pkg/p2p/mock_test.go b/tm2/pkg/p2p/mock_test.go index 25fbb800e69..d870e3f8133 100644 --- a/tm2/pkg/p2p/mock_test.go +++ b/tm2/pkg/p2p/mock_test.go @@ -98,14 +98,6 @@ func (m *mockSet) Has(key types.ID) bool { return false } -func (m *mockSet) HasIP(ip net.IP) bool { - if m.hasIPFn != nil { - return m.hasIPFn(ip) - } - - return false -} - func (m *mockSet) Get(key types.ID) Peer { if m.getFn != nil { return m.getFn(key) diff --git a/tm2/pkg/p2p/set.go b/tm2/pkg/p2p/set.go index a54ad949657..9905fe0ac66 100644 --- a/tm2/pkg/p2p/set.go +++ b/tm2/pkg/p2p/set.go @@ -1,7 +1,6 @@ package p2p import ( - "net" "sync" "github.com/gnolang/gno/tm2/pkg/p2p/types" @@ -51,22 +50,7 @@ func (s *set) Has(peerKey types.ID) bool { return exists } -// HasIP returns true if the set contains the peer referred to by this IP -// address, otherwise false. -func (s *set) HasIP(peerIP net.IP) bool { - s.mux.RLock() - defer s.mux.RUnlock() - - for _, p := range s.peers { - if p.(Peer).RemoteIP().Equal(peerIP) { - return true - } - } - - return false -} - -// Get looks up a peer by the provtypes.IDed peerKey. Returns nil if peer is not +// Get looks up a peer by the peer ID. Returns nil if peer is not // found. func (s *set) Get(key types.ID) Peer { s.mux.RLock() diff --git a/tm2/pkg/p2p/set_test.go b/tm2/pkg/p2p/set_test.go index fd0428d5af0..ced35538a9b 100644 --- a/tm2/pkg/p2p/set_test.go +++ b/tm2/pkg/p2p/set_test.go @@ -1,7 +1,6 @@ package p2p import ( - "net" "sort" "testing" @@ -63,53 +62,6 @@ func TestSet_Remove(t *testing.T) { } } -func TestSet_HasIP(t *testing.T) { - t.Parallel() - - t.Run("present peer with IP", func(t *testing.T) { - t.Parallel() - - var ( - peers = mock.GeneratePeers(t, 100) - ip = net.ParseIP("0.0.0.0") - - s = newSet() - ) - - // Make sure at least one peer has the set IP - peers[len(peers)/2].RemoteIPFn = func() net.IP { - return ip - } - - // Add the peers - for _, peer := range peers { - s.Add(peer) - } - - // Make sure the peer is present - assert.True(t, s.HasIP(ip)) - }) - - t.Run("missing peer with IP", func(t *testing.T) { - t.Parallel() - - var ( - peers = mock.GeneratePeers(t, 100) - ip = net.ParseIP("0.0.0.0") - - s = newSet() - ) - - // Add the peers - for _, peer := range peers { - s.Add(peer) - } - - // Make sure the peer is not present - assert.False(t, s.HasIP(ip)) - }) -} - func TestSet_Get(t *testing.T) { t.Parallel() diff --git a/tm2/pkg/p2p/switch.go b/tm2/pkg/p2p/switch.go index 1ec18437b4a..2cc16b0122e 100644 --- a/tm2/pkg/p2p/switch.go +++ b/tm2/pkg/p2p/switch.go @@ -9,6 +9,7 @@ import ( "github.com/gnolang/gno/tm2/pkg/p2p/config" "github.com/gnolang/gno/tm2/pkg/p2p/conn" "github.com/gnolang/gno/tm2/pkg/p2p/dial" + "github.com/gnolang/gno/tm2/pkg/p2p/events" "github.com/gnolang/gno/tm2/pkg/p2p/types" "github.com/gnolang/gno/tm2/pkg/service" "github.com/gnolang/gno/tm2/pkg/telemetry" @@ -66,10 +67,11 @@ type MultiplexSwitch struct { transport Transport dialQueue *dial.Queue + events *events.Events } -// NewSwitch creates a new MultiplexSwitch with the given config. -func NewSwitch( +// NewMultiplexSwitch creates a new MultiplexSwitch with the given config. +func NewMultiplexSwitch( transport Transport, opts ...SwitchOption, ) *MultiplexSwitch { @@ -80,6 +82,7 @@ func NewSwitch( peers: newSet(), transport: transport, dialQueue: dial.NewQueue(), + events: events.New(), maxInboundPeers: defaultCfg.MaxNumInboundPeers, maxOutboundPeers: defaultCfg.MaxNumOutboundPeers, } @@ -110,6 +113,12 @@ func NewSwitch( return sw } +// Subscribe registers to live events happening on the p2p Switch. +// Returns the notification channel, along with an unsubscribe method +func (sw *MultiplexSwitch) Subscribe(filterFn events.EventFilter) (<-chan events.Event, func()) { + return sw.events.Subscribe(filterFn) +} + // --------------------------------------------------------------------- // Service start/stop @@ -239,6 +248,12 @@ func (sw *MultiplexSwitch) stopAndRemovePeer(peer Peer, err error) { // RemovePeer is finished. // https://github.com/tendermint/classic/issues/3338 sw.peers.Remove(peer.ID()) + + sw.events.Notify(events.PeerDisconnectedEvent{ + Address: peer.RemoteAddr(), + PeerID: peer.ID(), + Reason: err, + }) } // --------------------------------------------------------------------- @@ -278,7 +293,8 @@ func (sw *MultiplexSwitch) runDialLoop(ctx context.Context) { peerAddr := item.Address // Check if the peer is already connected - if sw.Peers().Has(peerAddr.ID) || sw.Peers().HasIP(peerAddr.IP) { + ps := sw.Peers() + if ps.Has(peerAddr.ID) { sw.Logger.Warn( "ignoring dial request for existing peer", "id", peerAddr.ID, @@ -329,7 +345,7 @@ func (sw *MultiplexSwitch) runDialLoop(ctx context.Context) { // runRedialLoop starts the persistent peer redial loop func (sw *MultiplexSwitch) runRedialLoop(ctx context.Context) { - ticker := time.NewTicker(time.Second * 10) + ticker := time.NewTicker(time.Second * 1) // TODO make option defer ticker.Stop() // redialFn goes through the persistent peer list @@ -511,6 +527,11 @@ func (sw *MultiplexSwitch) addPeer(p Peer) error { sw.Logger.Info("Added peer", "peer", p) + sw.events.Notify(events.PeerConnectedEvent{ + Address: p.RemoteAddr(), + PeerID: p.ID(), + }) + return nil } diff --git a/tm2/pkg/p2p/switch_test.go b/tm2/pkg/p2p/switch_test.go index 992c547eab4..d2d8cf06b30 100644 --- a/tm2/pkg/p2p/switch_test.go +++ b/tm2/pkg/p2p/switch_test.go @@ -30,7 +30,7 @@ func TestMultiplexSwitch_Options(t *testing.T) { } ) - sw := NewSwitch(nil, WithReactor(name, mockReactor)) + sw := NewMultiplexSwitch(nil, WithReactor(name, mockReactor)) assert.Equal(t, mockReactor, sw.reactors[name]) }) @@ -40,7 +40,7 @@ func TestMultiplexSwitch_Options(t *testing.T) { peers := generateNetAddr(t, 10) - sw := NewSwitch(nil, WithPersistentPeers(peers)) + sw := NewMultiplexSwitch(nil, WithPersistentPeers(peers)) for _, p := range peers { assert.True(t, sw.isPersistentPeer(p.ID)) @@ -59,7 +59,7 @@ func TestMultiplexSwitch_Options(t *testing.T) { ids = append(ids, p.ID) } - sw := NewSwitch(nil, WithPrivatePeers(ids)) + sw := NewMultiplexSwitch(nil, WithPrivatePeers(ids)) for _, p := range peers { assert.True(t, sw.isPrivatePeer(p.ID)) @@ -71,7 +71,7 @@ func TestMultiplexSwitch_Options(t *testing.T) { maxInbound := uint64(500) - sw := NewSwitch(nil, WithMaxInboundPeers(maxInbound)) + sw := NewMultiplexSwitch(nil, WithMaxInboundPeers(maxInbound)) assert.Equal(t, maxInbound, sw.maxInboundPeers) }) @@ -81,7 +81,7 @@ func TestMultiplexSwitch_Options(t *testing.T) { maxOutbound := uint64(500) - sw := NewSwitch(nil, WithMaxOutboundPeers(maxOutbound)) + sw := NewMultiplexSwitch(nil, WithMaxOutboundPeers(maxOutbound)) assert.Equal(t, maxOutbound, sw.maxOutboundPeers) }) @@ -103,7 +103,7 @@ func TestMultiplexSwitch_Broadcast(t *testing.T) { } peers = mock.GeneratePeers(t, 10) - sw = NewSwitch(mockTransport) + sw = NewMultiplexSwitch(mockTransport) ) require.NoError(t, sw.OnStart()) @@ -139,7 +139,7 @@ func TestMultiplexSwitch_Peers(t *testing.T) { var ( peers = mock.GeneratePeers(t, 10) - sw = NewSwitch(nil) + sw = NewMultiplexSwitch(nil) ) // Create a new peer set @@ -178,7 +178,7 @@ func TestMultiplexSwitch_StopPeer(t *testing.T) { }, } - sw = NewSwitch(mockTransport) + sw = NewMultiplexSwitch(mockTransport) ) // Create a new peer set @@ -208,7 +208,7 @@ func TestMultiplexSwitch_StopPeer(t *testing.T) { }, } - sw = NewSwitch(mockTransport) + sw = NewMultiplexSwitch(mockTransport) ) // Make sure the peer is persistent @@ -281,7 +281,7 @@ func TestMultiplexSwitch_DialLoop(t *testing.T) { }, } - sw = NewSwitch(mockTransport) + sw = NewMultiplexSwitch(mockTransport) ) sw.peers = mockSet @@ -344,7 +344,7 @@ func TestMultiplexSwitch_DialLoop(t *testing.T) { }, } - sw = NewSwitch(mockTransport) + sw = NewMultiplexSwitch(mockTransport) ) sw.peers = mockSet @@ -399,7 +399,7 @@ func TestMultiplexSwitch_DialLoop(t *testing.T) { }, } - sw = NewSwitch(mockTransport) + sw = NewMultiplexSwitch(mockTransport) ) // Prepare the dial queue @@ -461,7 +461,7 @@ func TestMultiplexSwitch_AcceptLoop(t *testing.T) { }, } - sw = NewSwitch( + sw = NewMultiplexSwitch( mockTransport, WithMaxInboundPeers(maxInbound), ) @@ -517,7 +517,7 @@ func TestMultiplexSwitch_AcceptLoop(t *testing.T) { }, } - sw = NewSwitch( + sw = NewMultiplexSwitch( mockTransport, WithMaxInboundPeers(maxInbound), ) @@ -583,7 +583,7 @@ func TestMultiplexSwitch_RedialLoop(t *testing.T) { } // Create the switch - sw := NewSwitch( + sw := NewMultiplexSwitch( nil, WithPersistentPeers(addrs), ) @@ -649,7 +649,7 @@ func TestMultiplexSwitch_RedialLoop(t *testing.T) { } // Create the switch - sw := NewSwitch( + sw := NewMultiplexSwitch( mockTransport, WithPersistentPeers(addrs), ) @@ -731,7 +731,7 @@ func TestMultiplexSwitch_DialPeers(t *testing.T) { } } - sw := NewSwitch(mockTransport) + sw := NewMultiplexSwitch(mockTransport) // Dial the peers sw.DialPeers(p.NodeInfo().NetAddress) @@ -763,7 +763,7 @@ func TestMultiplexSwitch_DialPeers(t *testing.T) { } ) - sw := NewSwitch( + sw := NewMultiplexSwitch( mockTransport, WithMaxOutboundPeers(maxOutbound), ) @@ -803,7 +803,7 @@ func TestMultiplexSwitch_DialPeers(t *testing.T) { } ) - sw := NewSwitch( + sw := NewMultiplexSwitch( mockTransport, WithMaxOutboundPeers(10), ) diff --git a/tm2/pkg/p2p/types.go b/tm2/pkg/p2p/types.go index fe6bccec072..418c5095347 100644 --- a/tm2/pkg/p2p/types.go +++ b/tm2/pkg/p2p/types.go @@ -5,6 +5,7 @@ import ( "net" "github.com/gnolang/gno/tm2/pkg/p2p/conn" + "github.com/gnolang/gno/tm2/pkg/p2p/events" "github.com/gnolang/gno/tm2/pkg/p2p/types" "github.com/gnolang/gno/tm2/pkg/service" ) @@ -46,7 +47,6 @@ type PeerSet interface { Add(peer Peer) Remove(key types.ID) bool Has(key types.ID) bool - HasIP(ip net.IP) bool Get(key types.ID) Peer List() []Peer @@ -81,6 +81,9 @@ type Switch interface { // Peers returns the latest peer set Peers() PeerSet + // Subscribe subscribes to active switch events + Subscribe(filterFn events.EventFilter) (<-chan events.Event, func()) + // StopPeerForError stops the peer with the given reason StopPeerForError(peer Peer, err error)