Skip to content

Commit

Permalink
identify: Fix IdentifyWait when Connected events happen out of order (l…
Browse files Browse the repository at this point in the history
…ibp2p#2173)

* Add test case

* Id works even if Connected notifications happen out of order

* Add IsClosed shim

* Call basichost `.Start()` for all tests
  • Loading branch information
MarcoPolo authored Mar 7, 2023
1 parent 7effad7 commit 5330570
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 8 deletions.
4 changes: 4 additions & 0 deletions core/network/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type Conn interface {

// GetStreams returns all open streams over this conn.
GetStreams() []Stream

// IsClosed returns whether a connection is fully closed, so it can
// be garbage collected.
IsClosed() bool
}

// ConnectionState holds information about the connection.
Expand Down
4 changes: 4 additions & 0 deletions p2p/host/basic/basic_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ func TestHostSimple(t *testing.T) {
h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()
h1.Start()
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h2.Close()
h2.Start()

h2pi := h2.Peerstore().PeerInfo(h2.ID())
require.NoError(t, h1.Connect(ctx, h2pi))
Expand Down Expand Up @@ -447,8 +449,10 @@ func TestNewDialOld(t *testing.T) {
func TestNewStreamResolve(t *testing.T) {
h1, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h1.Start()
h2, err := NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h2.Start()

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand Down
1 change: 1 addition & 0 deletions p2p/host/routed/routed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestRoutedHostConnectToObsoleteAddresses(t *testing.T) {
h1, err := basic.NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()
h1.Start()

h2, err := basic.NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions p2p/net/connmgr/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,7 @@ func (m mockConn) LocalMultiaddr() ma.Multiaddr { panic
func (m mockConn) RemoteMultiaddr() ma.Multiaddr { panic("implement me") }
func (m mockConn) Stat() network.ConnStats { return m.stats }
func (m mockConn) ID() string { panic("implement me") }
func (m mockConn) IsClosed() bool { panic("implement me") }
func (m mockConn) NewStream(ctx context.Context) (network.Stream, error) { panic("implement me") }
func (m mockConn) GetStreams() []network.Stream { panic("implement me") }
func (m mockConn) Scope() network.ConnScope { panic("implement me") }
Expand Down
7 changes: 7 additions & 0 deletions p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ type conn struct {

closeOnce sync.Once

isClosed atomic.Bool

sync.RWMutex
}

Expand All @@ -67,12 +69,17 @@ func newConn(ln, rn *peernet, l *link, dir network.Direction) *conn {
return c
}

func (c *conn) IsClosed() bool {
return c.isClosed.Load()
}

func (c *conn) ID() string {
return strconv.FormatInt(c.id, 10)
}

func (c *conn) Close() error {
c.closeOnce.Do(func() {
c.isClosed.Store(true)
go c.rconn.Close()
c.teardown()
})
Expand Down
4 changes: 4 additions & 0 deletions p2p/net/swarm/swarm_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type Conn struct {

var _ network.Conn = &Conn{}

func (c *Conn) IsClosed() bool {
return c.conn.IsClosed()
}

func (c *Conn) ID() string {
// format: <first 10 chars of peer id>-<global conn ordinal>
return fmt.Sprintf("%s-%d", c.RemotePeer().Pretty()[0:10], c.id)
Expand Down
29 changes: 21 additions & 8 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,17 @@ func (ids *idService) IdentifyWait(c network.Conn) <-chan struct{} {
defer ids.connsMu.Unlock()

e, found := ids.conns[c]
if !found { // No entry found. Connection was most likely closed (and removed from this map) recently.
log.Debugw("connection not found in identify service", "peer", c.RemotePeer())
ch := make(chan struct{})
close(ch)
return ch
if !found {
// No entry found. We may have gotten an out of order notification. Check it we should have this conn (because we're still connected)
// We hold the ids.connsMu lock so this is safe since a disconnect event will be processed later if we are connected.
if c.IsClosed() {
log.Debugw("connection not found in identify service", "peer", c.RemotePeer())
ch := make(chan struct{})
close(ch)
return ch
} else {
ids.addConnWithLock(c)
}
}

if e.IdentifyWaitChan != nil {
Expand Down Expand Up @@ -863,6 +869,15 @@ func (ids *idService) consumeObservedAddress(observed []byte, c network.Conn) {
ids.observedAddrs.Record(c, maddr)
}

// addConnWithLock assuems caller holds the connsMu lock
func (ids *idService) addConnWithLock(c network.Conn) {
_, found := ids.conns[c]
if !found {
<-ids.setupCompleted
ids.conns[c] = entry{}
}
}

func signedPeerRecordFromMessage(msg *pb.Identify) (*record.Envelope, error) {
if msg.SignedPeerRecord == nil || len(msg.SignedPeerRecord) == 0 {
return nil, nil
Expand All @@ -883,10 +898,8 @@ func (nn *netNotifiee) Connected(_ network.Network, c network.Conn) {
// The swarm implementation guarantees this.
ids := nn.IDService()

<-ids.setupCompleted

ids.connsMu.Lock()
ids.conns[c] = entry{}
ids.addConnWithLock(c)
ids.connsMu.Unlock()

nn.IDService().IdentifyWait(c)
Expand Down
42 changes: 42 additions & 0 deletions p2p/protocol/identify/id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package identify_test

import (
"context"
"errors"
"fmt"
"sync"
"testing"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/record"
coretest "github.com/libp2p/go-libp2p/core/test"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
blhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"
Expand Down Expand Up @@ -894,6 +896,46 @@ func TestIncomingIDStreamsTimeout(t *testing.T) {
}
}

func TestOutOfOrderConnectedNotifs(t *testing.T) {
h1, err := libp2p.New(libp2p.NoListenAddrs)
require.NoError(t, err)
h2, err := libp2p.New(libp2p.ListenAddrs(ma.StringCast("/ip4/127.0.0.1/udp/0/quic-v1")))
require.NoError(t, err)

doneCh := make(chan struct{})
errCh := make(chan error)

// This callback may be called before identify's Connnected callback completes. If it does, the IdentifyWait should still finish successfully.
h1.Network().Notify(&network.NotifyBundle{
ConnectedF: func(n network.Network, c network.Conn) {
bh1 := h1.(*basichost.BasicHost)
idChan := bh1.IDService().IdentifyWait(c)
go func() {
<-idChan
protos, err := bh1.Peerstore().GetProtocols(h2.ID())
if err != nil {
errCh <- err
}
if len(protos) == 0 {
errCh <- errors.New("no protocols found. Identify did not complete")
}

close(doneCh)
}()
},
})

h1.Connect(context.Background(), peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()})

select {
case <-doneCh:
case err := <-errCh:
t.Fatalf("err: %v", err)
case <-time.After(3 * time.Second):
t.Fatalf("identify wait never completed")
}
}

func waitForAddrInStream(t *testing.T, s <-chan ma.Multiaddr, expected ma.Multiaddr, timeout time.Duration, failMsg string) {
t.Helper()
for {
Expand Down
2 changes: 2 additions & 0 deletions p2p/protocol/ping/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ func TestPing(t *testing.T) {
h1, err := bhost.NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h1.Close()
h1.Start()
h2, err := bhost.NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
defer h2.Close()
h2.Start()

err = h1.Connect(ctx, peer.AddrInfo{
ID: h2.ID(),
Expand Down
2 changes: 2 additions & 0 deletions p2p/test/backpressure/backpressure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ func TestStBackpressureStreamWrite(t *testing.T) {

h1, err := bhost.NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h1.Start()
h2, err := bhost.NewHost(swarmt.GenSwarm(t), nil)
require.NoError(t, err)
h2.Start()

// setup sender handler on 2
h2.SetStreamHandler(protocol.TestingID, func(s network.Stream) {
Expand Down
1 change: 1 addition & 0 deletions p2p/test/reconnects/reconnect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestReconnect5(t *testing.T) {
h, err := bhost.NewHost(swarmt.GenSwarm(t, swarmOpt), nil)
require.NoError(t, err)
defer h.Close()
h.Start()
hosts = append(hosts, h)
h.SetStreamHandler(protocol.TestingID, EchoStreamHandler)
}
Expand Down

0 comments on commit 5330570

Please sign in to comment.