Skip to content

Commit

Permalink
Add a "transient" network connectivity state
Browse files Browse the repository at this point in the history
Previously, we'd consider "transiently" connected peers to be connected.
This meant:

1. We wouldn't fire a second event when transitioning to "really
connected". The only option for users was to listen on the old-style
per-connection notifications.
2. "Connectedness" checks would be a little too eager to treat a peer as
connected.

For 99% of users, "transient" peers should be treated as disconnected.
So while it's technically a breaking change to split-out "transient"
connectivity into a separate state, I expect it's more likely to fix
bugs than anything.

Unfortunately, this change _did_ require several changes to go-libp2p
itself because go-libp2p _does_ care about transient connections:

1. We want to keep peerstore information for transient peers.
2. We may sometimes want to treat peers as "connected" in the host.
3. Identify still needs to run over transient connections.

fixes #2692
  • Loading branch information
Stebalien committed Jan 29, 2024
1 parent 6aa701a commit 11515f0
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 24 deletions.
5 changes: 4 additions & 1 deletion core/network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,13 @@ const (
// CannotConnect means recently attempted connecting but failed to connect.
// (should signal "made effort, failed")
CannotConnect

// Transient means we have a transient connection to the peer, but aren't fully connected.
Transient
)

func (c Connectedness) String() string {
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect"}
str := [...]string{"NotConnected", "Connected", "CanConnect", "CannotConnect", "Transient"}
if c < 0 || int(c) >= len(str) {
return unrecognized
}
Expand Down
4 changes: 3 additions & 1 deletion p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,10 @@ func (h *BasicHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
h.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

forceDirect, _ := network.GetForceDirectDial(ctx)
canUseTransient, _ := network.GetUseTransient(ctx)
if !forceDirect {
if h.Network().Connectedness(pi.ID) == network.Connected {
connectedness := rh.Network().Connectedness(pi.ID)

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.20.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.20.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.21.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.21.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.20.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.20.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.21.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.21.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.20.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.20.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.21.x)

undefined: rh

Check failure on line 728 in p2p/host/basic/basic_host.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.21.x)

undefined: rh
if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) {
return nil
}
}
Expand Down
13 changes: 7 additions & 6 deletions p2p/host/pstoremanager/pstoremanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,15 +103,16 @@ func (m *PeerstoreManager) background(ctx context.Context, sub event.Subscriptio
ev := e.(event.EvtPeerConnectednessChanged)
p := ev.Peer
switch ev.Connectedness {
case network.NotConnected:
case network.Connected, network.Transient:
// If we reconnect to the peer before we've cleared the information,
// keep it. This is an optimization to keep the disconnected map
// small. We still need to check that a peer is actually
// disconnected before removing it from the peer store.
delete(disconnected, p)
default:
if _, ok := disconnected[p]; !ok {
disconnected[p] = time.Now()
}
case network.Connected:
// If we reconnect to the peer before we've cleared the information, keep it.
// This is an optimization to keep the disconnected map small.
// We still need to check that a peer is actually disconnected before removing it from the peer store.
delete(disconnected, p)
}
case <-ticker.C:
now := time.Now()
Expand Down
4 changes: 3 additions & 1 deletion p2p/host/routed/routed.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ func Wrap(h host.Host, r Routing) *RoutedHost {
func (rh *RoutedHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
// first, check if we're already connected unless force direct dial.
forceDirect, _ := network.GetForceDirectDial(ctx)
canUseTransient, _ := network.GetUseTransient(ctx)
if !forceDirect {
if rh.Network().Connectedness(pi.ID) == network.Connected {
connectedness := rh.Network().Connectedness(pi.ID)
if connectedness == network.Connected || (canUseTransient && connectedness == network.Transient) {
return nil
}
}
Expand Down
42 changes: 34 additions & 8 deletions p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
}
stat.Direction = dir
stat.Opened = time.Now()
isTransient := stat.Transient

// Wrap and register the connection.
c := &Conn{
Expand Down Expand Up @@ -383,8 +384,9 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
return nil, ErrSwarmClosed
}

oldState := s.connectednessUnlocked(p)

c.streams.m = make(map[*Stream]struct{})
isFirstConnection := len(s.conns.m[p]) == 0
s.conns.m[p] = append(s.conns.m[p], c)

// Add two swarm refs:
Expand All @@ -397,8 +399,12 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,
c.notifyLk.Lock()
s.conns.Unlock()

// Notify goroutines waiting for a direct connection
if !c.Stat().Transient {
newState := network.Transient
if !isTransient {
newState = network.Connected

// Notify goroutines waiting for a direct connection
//
// Go routines interested in waiting for direct connection first acquire this lock
// and then acquire s.conns.RLock. Do not acquire this lock before conns.Unlock to
// prevent deadlock.
Expand All @@ -412,10 +418,10 @@ func (s *Swarm) addConn(tc transport.CapableConn, dir network.Direction) (*Conn,

// Emit event after releasing `s.conns` lock so that a consumer can still
// use swarm methods that need the `s.conns` lock.
if isFirstConnection {
if oldState != newState {
s.emitter.Emit(event.EvtPeerConnectednessChanged{
Peer: p,
Connectedness: network.Connected,
Connectedness: newState,
})
}

Expand Down Expand Up @@ -646,10 +652,30 @@ func isDirectConn(c *Conn) bool {
// To check if we have an open connection, use `s.Connectedness(p) ==
// network.Connected`.
func (s *Swarm) Connectedness(p peer.ID) network.Connectedness {
if s.bestConnToPeer(p) != nil {
return network.Connected
s.conns.RLock()
defer s.conns.RUnlock()

s.connectednessUnlocked(p)
}

Check failure on line 659 in p2p/net/swarm/swarm.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.20.x)

missing return

Check failure on line 659 in p2p/net/swarm/swarm.go

View workflow job for this annotation

GitHub Actions / go-test / ubuntu (go 1.21.x)

missing return

Check failure on line 659 in p2p/net/swarm/swarm.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.20.x)

missing return

Check failure on line 659 in p2p/net/swarm/swarm.go

View workflow job for this annotation

GitHub Actions / go-test / windows (go 1.21.x)

missing return

Check failure on line 659 in p2p/net/swarm/swarm.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.20.x)

missing return

Check failure on line 659 in p2p/net/swarm/swarm.go

View workflow job for this annotation

GitHub Actions / go-test / macos (go 1.21.x)

missing return

func (s *Swarm) connectednessUnlocked(p peer.ID) network.Connectedness {
var haveTransient bool
for _, c := range s.conns.m[p] {
if c.conn.IsClosed() {
// We *will* garbage collect this soon anyways.
continue
}
if c.Stat().Transient {
haveTransient = true
} else {
return network.Connected
}
}
if haveTransient {
return network.Transient
} else {
return network.NotConnected
}
return network.NotConnected
}

// Conns returns a slice of all connections.
Expand Down
17 changes: 10 additions & 7 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -746,7 +746,8 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
// Taking the lock ensures that we don't concurrently process a disconnect.
ids.addrMu.Lock()
ttl := peerstore.RecentlyConnectedAddrTTL
if ids.Host.Network().Connectedness(p) == network.Connected {
switch ids.Host.Network().Connectedness(p) {
case network.Transient, network.Connected:
ttl = peerstore.ConnectedAddrTTL
}

Expand Down Expand Up @@ -975,13 +976,15 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) {
delete(ids.conns, c)
ids.connsMu.Unlock()

if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected {
// Last disconnect.
// Undo the setting of addresses to peer.ConnectedAddrTTL we did
ids.addrMu.Lock()
defer ids.addrMu.Unlock()
ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
switch ids.Host.Network().Connectedness(c.RemotePeer()) {
case network.Connected, network.Transient:
return
}
// Last disconnect.
// Undo the setting of addresses to peer.ConnectedAddrTTL we did
ids.addrMu.Lock()
defer ids.addrMu.Unlock()
ids.Host.Peerstore().UpdateAddrs(c.RemotePeer(), peerstore.ConnectedAddrTTL, peerstore.RecentlyConnectedAddrTTL)
}

func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
Expand Down

0 comments on commit 11515f0

Please sign in to comment.