Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Oct 23, 2024
1 parent 6309311 commit ef3206d
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 74 deletions.
2 changes: 1 addition & 1 deletion cmd/nebula/cmd_crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func CrawlAction(c *cli.Context) error {
return fmt.Errorf("new database client: %w", err)
}
defer func() {
if err := dbc.Close(); err != nil && !errors.Is(err, sql.ErrConnDone) {
if err := dbc.Close(); err != nil && !errors.Is(err, sql.ErrConnDone) && !strings.Contains(err.Error(), "use of closed network connection") {
log.WithError(err).Warnln("Failed closing database handle")
}
}()
Expand Down
99 changes: 53 additions & 46 deletions discv4/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math/rand"
"net/netip"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -100,7 +101,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
}

if c.cfg.KeepENR {
properties["enr"] = task.Node.String() // discV4Result.ENR.String() panics :/
properties["enr"] = task.enr // discV4Result.ENR.String() panics :/
}

// keep track of all unknown connection errors
Expand Down Expand Up @@ -383,7 +384,10 @@ func (c *Crawler) crawlRemainingBucketsConcurrently(node *enode.Node, udpAddr ne
}

func (c *Crawler) crawlRemainingBucketsSequentially(node *enode.Node, udpAddr netip.AddrPort, probesPerBucket int) map[string]PeerInfo {
timeouts := 0
allNeighbors := map[string]PeerInfo{}

OUTER:
for i := 1; i < 15; i++ { // although there are 17 buckets, GenRandomPublicKey only supports the first 16
for j := 0; j < probesPerBucket; j++ {

Expand All @@ -397,10 +401,13 @@ func (c *Crawler) crawlRemainingBucketsSequentially(node *enode.Node, udpAddr ne
// second, we do the Find node request
closest, err := c.listener.FindNode(node.ID(), udpAddr, targetKey)
if errors.Is(err, discover.ErrTimeout) {
break
timeouts += 1
if timeouts > 3 {
break OUTER
}
continue
} else if err != nil {
log.WithError(err).WithField("nodeID", node.ID().String()).Warnf("Failed query node bucket %d", i)
break
break OUTER
}

for _, c := range closest {
Expand Down Expand Up @@ -435,49 +442,49 @@ func (c *Crawler) crawlDevp2p(ctx context.Context, pi PeerInfo) <-chan Devp2pRes
result := Devp2pResult{}

result.ConnectStartTime = time.Now()
// conn, err := c.client.Connect(ctx, pi)
conn, err := c.client.Connect(ctx, pi)
result.ConnectEndTime = time.Now()
// result.ConnectError = err

//if result.ConnectError == nil {
//
// // start another go routine to cancel the entire operation if it
// // times out. The context will be cancelled when this function
// // returns or the timeout is reached. In both cases, we close the
// // connection to the remote peer which will trigger that the call
// // to Identify below will return (if the context is canceled because
// // of a timeout and not function return).
// timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
// defer cancel()
// go func() {
// <-timeoutCtx.Done()
// // Free connection resources
// if err := conn.Close(); err != nil && !strings.Contains(err.Error(), errUseOfClosedNetworkConnectionStr) {
// log.WithError(err).WithField("remoteID", pi.ID().ShortString()).Warnln("Could not close connection to peer")
// }
// }()
//
// resp, status, err := conn.Identify()
// if err != nil && resp == nil && status == nil {
// result.ConnectError = err
// }
// result.IdentifyEndTime = time.Now()
// result.Status = status
//
// if resp != nil {
// result.Agent = resp.Name
// protocols := make([]string, len(resp.Caps))
// for i, c := range resp.Caps {
// protocols[i] = "/" + c.String()
// }
// result.Protocols = protocols
// }
//}
//
//// if there was a connection error, parse it to a known one
//if result.ConnectError != nil {
// result.ConnectErrorStr = db.NetError(result.ConnectError)
//}
result.ConnectError = err

if result.ConnectError == nil {

// start another go routine to cancel the entire operation if it
// times out. The context will be cancelled when this function
// returns or the timeout is reached. In both cases, we close the
// connection to the remote peer which will trigger that the call
// to Identify below will return (if the context is canceled because
// of a timeout and not function return).
timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
defer cancel()
go func() {
<-timeoutCtx.Done()
// Free connection resources
if err := conn.Close(); err != nil && !strings.Contains(err.Error(), errUseOfClosedNetworkConnectionStr) {
log.WithError(err).WithField("remoteID", pi.ID().ShortString()).Warnln("Could not close connection to peer")
}
}()

resp, status, err := conn.Identify()
if err != nil && resp == nil && status == nil {
result.ConnectError = err
}
result.IdentifyEndTime = time.Now()
result.Status = status

if resp != nil {
result.Agent = resp.Name
protocols := make([]string, len(resp.Caps))
for i, c := range resp.Caps {
protocols[i] = "/" + c.String()
}
result.Protocols = protocols
}
}

// if there was a connection error, parse it to a known one
if result.ConnectError != nil {
result.ConnectErrorStr = db.NetError(result.ConnectError)
}

// send the result back and close channel
select {
Expand Down
45 changes: 19 additions & 26 deletions discv4/driver_crawler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package discv4

import (
"crypto/ecdsa"
"crypto/elliptic"
crand "crypto/rand"
"fmt"
"math"
"net"
Expand Down Expand Up @@ -38,6 +36,7 @@ type PeerInfo struct {
peerID peer.ID
maddrs []ma.Multiaddr
udpAddr netip.AddrPort
enr string
}

var _ core.PeerInfo[PeerInfo] = (*PeerInfo)(nil)
Expand Down Expand Up @@ -96,6 +95,7 @@ func NewPeerInfo(node *enode.Node) (PeerInfo, error) {
peerID: peerID,
maddrs: maddrs,
udpAddr: udpAddr,
enr: node.String(),
}

return pi, nil
Expand All @@ -115,7 +115,10 @@ func (p PeerInfo) Merge(other PeerInfo) PeerInfo {
}

func (p PeerInfo) DeduplicationKey() string {
return p.Node.String()
// previously this was: p.Node.String() but a CPU profile revealed that the
// process of encoding the public key takes a lot CPU cycles. Especially
// because we're calling DeduplicationKey very often!
return p.enr
}

type CrawlDriverConfig struct {
Expand Down Expand Up @@ -243,31 +246,19 @@ var logOnce sync.Once

func (d *CrawlDriver) NewWorker() (core.Worker[PeerInfo, core.CrawlResult[PeerInfo]], error) {
// If I'm not using the below elliptic curve, some Ethereum clients will reject communication
priv, err := ecdsa.GenerateKey(ethcrypto.S256(), crand.Reader)
priv, err := ethcrypto.GenerateKey()
if err != nil {
return nil, fmt.Errorf("new ethereum ecdsa key: %w", err)
}
//ip := net.ParseIP("0.0.0.0")
//if ip == nil {
// return nil, fmt.Errorf("failed to parse IP address")
//}
//
//conn, err := net.ListenUDP("udp4", &net.UDPAddr{
// IP: ip,
// Port: 0,
//})
//if err != nil {
// return nil, fmt.Errorf("listen on udp port: %w", err)
//}

socket, err := net.ListenPacket("udp4", "0.0.0.0:0")
if err != nil {
return nil, fmt.Errorf("listen on udp port: %w", err)

laddr := &net.UDPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 0,
}

conn, ok := socket.(*net.UDPConn)
if !ok {
return nil, fmt.Errorf("failed to cast socket to UDPConn")
conn, err := net.ListenUDP("udp4", laddr)
if err != nil {
return nil, fmt.Errorf("listen on udp4 port: %w", err)
}

if err = conn.SetReadBuffer(d.cfg.UDPBufferSize); err != nil {
Expand All @@ -293,15 +284,17 @@ func (d *CrawlDriver) NewWorker() (core.Worker[PeerInfo, core.CrawlResult[PeerIn
ethNode := enode.NewLocalNode(d.peerstore, priv)
udpAddr := conn.LocalAddr().(*net.UDPAddr)
if udpAddr.IP.IsUnspecified() {
ethNode.SetFallbackIP(net.IP{127, 0, 0, 1})
ethNode.SetFallbackIP(net.ParseIP("127.0.0.1"))
} else {
ethNode.SetFallbackIP(udpAddr.IP)
}
ethNode.SetFallbackUDP(udpAddr.Port)

discvxCfg := discover.Config{
PrivateKey: priv,
Unhandled: d.unhandledChan,
PrivateKey: priv,
Unhandled: d.unhandledChan,
NoFindnodeLivenessCheck: true,
RefreshInterval: 100 * time.Hour, // turn off
}
listener, err := discover.ListenV4(conn, ethNode, discvxCfg)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion libp2p/driver_crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (p PeerInfo) Merge(other PeerInfo) PeerInfo {
}

func (p PeerInfo) DeduplicationKey() string {
return string(p.AddrInfo.ID)
return p.AddrInfo.ID.String()
}

type CrawlDriverConfig struct {
Expand Down

0 comments on commit ef3206d

Please sign in to comment.