Skip to content

Commit

Permalink
fix: don't wait for the identify exchange to complete
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 28, 2024
1 parent 138591b commit 81e0a94
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 67 deletions.
3 changes: 2 additions & 1 deletion libp2p/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libp2p
import (
"context"
"encoding/json"
"strings"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -119,7 +120,7 @@ func mergeResults(r *core.CrawlResult[PeerInfo], p2pRes P2PResult, apiRes APIRes

// treat ErrConnectionClosedImmediately as no error because we were able
// to connect
if p2pRes.ConnClosedImmediately {
if p2pRes.CrawlError != nil && strings.Contains(p2pRes.CrawlError.Error(), "connection failed") {
properties["direct_close"] = true
}

Expand Down
88 changes: 22 additions & 66 deletions libp2p/crawler_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
Expand Down Expand Up @@ -53,9 +54,6 @@ type P2PResult struct {
// e.g., they could miss quic-v1 addresses if the reporting peer doesn't
// know about that protocol.
ListenAddrs []ma.Multiaddr

// If the connection was closed immediately
ConnClosedImmediately bool
}

func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
Expand All @@ -67,31 +65,29 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
}

result.ConnectStartTime = time.Now()
result.ConnectError = c.connect(ctx, pi.AddrInfo) // use filtered addr list
conn, err := c.connect(ctx, pi.AddrInfo) // use filtered addr list
result.ConnectEndTime = time.Now()
result.ConnectError = err

// If we could successfully connect to the peer we actually crawl it.
if result.ConnectError == nil {

// check if we're actually connected
if c.host.Network().Connectedness(pi.ID()) == network.NotConnected {
// this is a weird behavior I was obesrving. Libp2p reports a
// successful connection establishment but isn't connected right
// after the call returned. This is not a big problem at this
// point because drainBuckets will open the connection again.
// This works more often than not but is still weird. At least
// keep track of this issue - just in case.
result.ConnClosedImmediately = true
}

// Fetch all neighbors
result.RoutingTable, result.CrawlError = c.drainBuckets(ctx, pi.AddrInfo)
if result.CrawlError != nil {
result.CrawlErrorStr = db.NetError(result.CrawlError)
}

// wait for the Identify exchange to complete (no-op if already done)
c.identifyWait(ctx, pi.AddrInfo)
// the internal timeout is set to 30 s. When crawling we only allow 5s.
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

select {
case <-timeoutCtx.Done():
// identification timed out.
case <-c.host.IDService().IdentifyWait(conn):
// identification may have succeeded.
}

// Extract information from peer store
ps := c.host.Peerstore()
Expand Down Expand Up @@ -136,9 +132,9 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
}

// connect establishes a connection to the given peer. It also handles metric capturing.
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn, error) {
if len(pi.Addrs) == 0 {
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
return nil, fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
}

// init an exponential backoff
Expand All @@ -160,13 +156,15 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
})
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())

c.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
err := c.host.Connect(timeoutCtx, pi)
conn, err := c.host.Network().DialPeer(timeoutCtx, pi.ID)
cancel()

// yay, it worked! Or has it? The caller checks the connectedness again.
if err == nil {
return nil
return conn, nil
}

switch true {
Expand All @@ -186,18 +184,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// We already have too many open connections over a relay. Try again!
default:
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

sleepDur := bo.NextBackOff()
if sleepDur == backoff.Stop {
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-time.After(sleepDur):
retry += 1
continue
Expand Down Expand Up @@ -302,45 +300,3 @@ func (c *Crawler) drainBucket(ctx context.Context, rt *kbucket.RoutingTable, pid

return nil, fmt.Errorf("getting closest peer with CPL %d: %w", bucket, err)
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var wg sync.WaitGroup
for _, conn := range c.host.Network().ConnsToPeer(pi.ID) {
conn := conn

wg.Add(1)
go func() {
defer wg.Done()

select {
case <-timeoutCtx.Done():
case <-c.host.IDService().IdentifyWait(conn):

// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
if c.isIdentified(pi.ID) {
cancel()
return
}
}
}()
}

wg.Wait()
}

// isIdentified returns true if the given peer.ID was successfully identified.
// Just because IdentifyWait returns doesn't mean the peer was identified.
func (c *Crawler) isIdentified(pid peer.ID) bool {
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
return err == nil && agent.(string) != ""
}

0 comments on commit 81e0a94

Please sign in to comment.