Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: don't wait for the identify exchange to complete before draining buckets #81

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ var KnownErrors = map[string]string{
"max dial attempts exceeded": models.NetErrorMaxDialAttemptsExceeded,
"host is down": models.NetErrorHostIsDown,
"stream reset": models.NetErrorStreamReset,
"stream closed": models.NetErrorStreamReset,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we create a new models.NetErrorStreamClosed entry for this error?
(I'm not 100% sure if it just means the same as the "stream reset" one)

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'm lazy and didn't want to add a migration just for this error code 🙈

It's not exactly the same as a stream reset but for analysis purposes I'd say we can treat it equally

"failed to negotiate security protocol: EOF": models.NetErrorNegotiateSecurityProtocol, // connect retry logic in discv5 relies on the ": EOF" suffix.
"failed to negotiate stream multiplexer": models.NetErrorNegotiateStreamMultiplexer,
"resource limit exceeded": models.NetErrorResourceLimitExceeded,
Expand Down Expand Up @@ -91,6 +92,7 @@ var knownErrorsPrecedence = []string{
"max dial attempts exceeded",
"host is down",
"stream reset",
"stream closed",
"failed to negotiate security protocol: EOF",
"failed to negotiate stream multiplexer",
"resource limit exceeded",
Expand Down
111 changes: 27 additions & 84 deletions discv5/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
basichost "github.com/libp2p/go-libp2p/p2p/host/basic"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -200,47 +201,28 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
Addrs: sanitizedAddrs,
}

var conn network.Conn
result.ConnectStartTime = time.Now()
result.ConnectError = c.connect(ctx, addrInfo) // use filtered addr list
conn, result.ConnectError = c.connect(ctx, addrInfo) // use filtered addr list
result.ConnectEndTime = time.Now()

// If we could successfully connect to the peer we actually crawl it.
if result.ConnectError == nil {

conns := c.host.Network().ConnsToPeer(pi.ID())

// check if we're connected
if len(conns) == 0 {
// this is a weird behavior I was obesrving. Libp2p reports a
// successful connection establishment but isn't connected right
// after the call returned. This point is not a big problem at this
// point because fetchNeighbors will open the connection again. This
// works more often than not but is still weird. At least keep track
// of these cases.
result.ConnClosedImmediately = true

// try it again one more time
if !c.isIdentified(addrInfo.ID) {
_ = c.connect(ctx, addrInfo)
}
} else if len(conns) == 1 {
// handle happy-path separately
result.Transport = conns[0].ConnState().Transport
} else {
transports := map[string]struct{}{}
for _, conn := range conns {
transports[conn.ConnState().Transport] = struct{}{}
}
// keep track of the transport of the open connection
result.Transport = conn.ConnState().Transport

if len(transports) == 1 {
result.Transport = conns[0].ConnState().Transport
} else if len(transports) != 0 {
result.Transport = "multi"
}
}
// wait for the Identify exchange to complete (no-op if already done)
// the internal timeout is set to 30 s. When crawling we only allow 5s.
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

// wait for the Identify exchange to complete
c.identifyWait(ctx, addrInfo)
select {
case <-timeoutCtx.Done():
// identification timed out.
case <-c.host.IDService().IdentifyWait(conn):
// identification may have succeeded.
}

// Extract information from peer store
ps := c.host.Peerstore()
Expand Down Expand Up @@ -285,9 +267,9 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
}

// connect establishes a connection to the given peer. It also handles metric capturing.
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn, error) {
if len(pi.Addrs) == 0 {
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
return nil, fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
}

// init an exponential backoff
Expand All @@ -307,15 +289,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
})
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())

// save addresses into the peer store temporarily
c.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
err := c.host.Connect(timeoutCtx, pi)
conn, err := c.host.Network().DialPeer(timeoutCtx, pi.ID)
cancel()

if err == nil {
return nil
return conn, nil
}

switch true {
switch {
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionRefused]):
// Might be transient because the remote doesn't want us to connect. Try again!
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionGated]):
Expand All @@ -332,18 +317,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// We already have too many open connections over a relay. Try again!
default:
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

sleepDur := bo.NextBackOff()
if sleepDur == backoff.Stop {
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-time.After(sleepDur):
retry += 1
continue
Expand Down Expand Up @@ -410,48 +395,6 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) ([]ma.Multiaddr, bool) {
return maddrs, false
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: parameterize
defer cancel()

var wg sync.WaitGroup
for _, conn := range c.host.Network().ConnsToPeer(pi.ID) {
conn := conn

wg.Add(1)
go func() {
defer wg.Done()

select {
case <-timeoutCtx.Done():
case <-c.host.IDService().IdentifyWait(conn):

// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
if c.isIdentified(pi.ID) {
cancel()
return
}
}
}()
}

wg.Wait()
}

// isIdentified returns true if the given peer.ID was successfully identified.
// Just because IdentifyWait returns doesn't mean the peer was identified.
func (c *Crawler) isIdentified(pid peer.ID) bool {
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
return err == nil && agent.(string) != ""
}

type DiscV5Result struct {
// The time we received the first successful response
RespondedAt *time.Time
Expand Down
3 changes: 2 additions & 1 deletion discv5/driver_crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ func (d *CrawlDriver) Close() {

func newLibp2pHost(version string) (host.Host, error) {
// Configure the resource manager to not limit anything
var noSubnetLimit []rcmgr.ConnLimitPerSubnet
limiter := rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits)
rm, err := rcmgr.NewResourceManager(limiter)
rm, err := rcmgr.NewResourceManager(limiter, rcmgr.WithLimitPerSubnet(noSubnetLimit, noSubnetLimit))
if err != nil {
return nil, fmt.Errorf("new resource manager: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion libp2p/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package libp2p
import (
"context"
"encoding/json"
"strings"
"time"

"github.com/benbjohnson/clock"
Expand Down Expand Up @@ -119,7 +120,7 @@ func mergeResults(r *core.CrawlResult[PeerInfo], p2pRes P2PResult, apiRes APIRes

// treat ErrConnectionClosedImmediately as no error because we were able
// to connect
if p2pRes.ConnClosedImmediately {
if p2pRes.CrawlError != nil && strings.Contains(p2pRes.CrawlError.Error(), "connection failed") {
properties["direct_close"] = true
}

Expand Down
90 changes: 27 additions & 63 deletions libp2p/crawler_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
ma "github.com/multiformats/go-multiaddr"
log "github.com/sirupsen/logrus"
"go.uber.org/atomic"
Expand Down Expand Up @@ -54,8 +55,8 @@ type P2PResult struct {
// know about that protocol.
ListenAddrs []ma.Multiaddr

// If the connection was closed immediately
ConnClosedImmediately bool
// the transport of a successful connection
Transport string
}

func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
Expand All @@ -66,23 +67,16 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
RoutingTable: &core.RoutingTable[PeerInfo]{PeerID: pi.ID()},
}

var conn network.Conn
result.ConnectStartTime = time.Now()
result.ConnectError = c.connect(ctx, pi.AddrInfo) // use filtered addr list
conn, result.ConnectError = c.connect(ctx, pi.AddrInfo) // use filtered addr list
result.ConnectEndTime = time.Now()

// If we could successfully connect to the peer we actually crawl it.
if result.ConnectError == nil {

// check if we're actually connected
if c.host.Network().Connectedness(pi.ID()) == network.NotConnected {
// this is a weird behavior I was obesrving. Libp2p reports a
// successful connection establishment but isn't connected right
// after the call returned. This is not a big problem at this
// point because drainBuckets will open the connection again.
// This works more often than not but is still weird. At least
// keep track of this issue - just in case.
result.ConnClosedImmediately = true
}
// keep track of the transport of the open connection
result.Transport = conn.ConnState().Transport

// Fetch all neighbors
result.RoutingTable, result.CrawlError = c.drainBuckets(ctx, pi.AddrInfo)
Expand All @@ -91,7 +85,16 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
}

// wait for the Identify exchange to complete (no-op if already done)
c.identifyWait(ctx, pi.AddrInfo)
// the internal timeout is set to 30 s. When crawling we only allow 5s.
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

select {
case <-timeoutCtx.Done():
// identification timed out.
case <-c.host.IDService().IdentifyWait(conn):
// identification may have succeeded.
}

// Extract information from peer store
ps := c.host.Peerstore()
Expand Down Expand Up @@ -136,9 +139,9 @@ func (c *Crawler) crawlP2P(ctx context.Context, pi PeerInfo) <-chan P2PResult {
}

// connect establishes a connection to the given peer. It also handles metric capturing.
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) (network.Conn, error) {
if len(pi.Addrs) == 0 {
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
return nil, fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
}

// init an exponential backoff
Expand All @@ -160,13 +163,16 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
})
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())

// save addresses into the peer store temporarily
c.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.TempAddrTTL)

timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
err := c.host.Connect(timeoutCtx, pi)
conn, err := c.host.Network().DialPeer(timeoutCtx, pi.ID)
cancel()

// yay, it worked! Or has it? The caller checks the connectedness again.
if err == nil {
return nil
return conn, nil
}

switch true {
Expand All @@ -186,18 +192,18 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// We already have too many open connections over a relay. Try again!
default:
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

sleepDur := bo.NextBackOff()
if sleepDur == backoff.Stop {
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
return err
return nil, err
}

select {
case <-ctx.Done():
return ctx.Err()
return nil, ctx.Err()
case <-time.After(sleepDur):
retry += 1
continue
Expand Down Expand Up @@ -302,45 +308,3 @@ func (c *Crawler) drainBucket(ctx context.Context, rt *kbucket.RoutingTable, pid

return nil, fmt.Errorf("getting closest peer with CPL %d: %w", bucket, err)
}

// identifyWait waits until any connection to a peer passed the Identify
// exchange successfully or all identification attempts have failed.
// The call to IdentifyWait returns immediately if the connection was
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

var wg sync.WaitGroup
for _, conn := range c.host.Network().ConnsToPeer(pi.ID) {
conn := conn

wg.Add(1)
go func() {
defer wg.Done()

select {
case <-timeoutCtx.Done():
case <-c.host.IDService().IdentifyWait(conn):

// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
if c.isIdentified(pi.ID) {
cancel()
return
}
}
}()
}

wg.Wait()
}

// isIdentified returns true if the given peer.ID was successfully identified.
// Just because IdentifyWait returns doesn't mean the peer was identified.
func (c *Crawler) isIdentified(pid peer.ID) bool {
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
return err == nil && agent.(string) != ""
}
Loading
Loading