Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce deduplication key #80

Merged
merged 6 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/nebula/cmd_crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func CrawlAction(c *cli.Context) error {
return fmt.Errorf("new database client: %w", err)
}
defer func() {
if err := dbc.Close(); err != nil && !errors.Is(err, sql.ErrConnDone) {
if err := dbc.Close(); err != nil && !errors.Is(err, sql.ErrConnDone) && !strings.Contains(err.Error(), "use of closed network connection") {
log.WithError(err).Warnln("Failed closing database handle")
}
}()
Expand Down
10 changes: 10 additions & 0 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ type PeerInfo[T any] interface {
// peer info struct. The implementation of Merge may panic if the peer IDs
// don't match.
Merge(other T) T

// DeduplicationKey returns a unique string used for deduplication of crawl
// tasks. For example, in discv4 and discv5 we might want to crawl the same
// peer (as identified by its public key) multiple times when we find new
// ENR's for it. If the deduplication key was just the public key, we would
// only crawl it once. If we later find newer ENR's for the same peer with
// different network addresses, we would skip that peer. On the other hand,
// if the deduplication key was the entire ENR, we would crawl the same peer
// with different (potentially newer) connectivity information again.
DeduplicationKey() string
}

// A Driver is a data structure that provides the necessary implementations and
Expand Down
4 changes: 4 additions & 0 deletions core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type testPeerInfo struct {
addrs []ma.Multiaddr
}

func (p *testPeerInfo) DeduplicationKey() string {
return string(p.peerID)
}

var _ PeerInfo[*testPeerInfo] = (*testPeerInfo)(nil)

func (p *testPeerInfo) Addrs() []ma.Multiaddr {
Expand Down
31 changes: 19 additions & 12 deletions core/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,11 +279,11 @@ func (e *Engine[I, R]) Run(ctx context.Context) (map[string]I, error) {
observeFn(e)
case innerPeerTasks <- peerTask:
// a worker was ready to accept a new task -> perform internal bookkeeping.
e.peerQueue.Drop(string(peerTask.ID()))
e.inflight[string(peerTask.ID())] = struct{}{}
e.peerQueue.Drop(peerTask.DeduplicationKey())
e.inflight[peerTask.DeduplicationKey()] = struct{}{}
case innerWriteTasks <- writeTask:
// a write worker was ready to accept a new task -> perform internal bookkeeping.
e.writeQueue.Drop(string(writeTask.PeerInfo().ID()))
e.writeQueue.Drop(writeTask.PeerInfo().DeduplicationKey())
case result, more := <-peerResults:
if !more {
// the peerResults queue was closed. This means all workers
Expand Down Expand Up @@ -366,19 +366,22 @@ func (e *Engine[I, R]) handlePeerResult(ctx context.Context, result Result[R]) {
// count the number of visits being made
e.telemetry.visitCount.Add(ctx, 1, metric.WithAttributes(attribute.Bool("success", result.Value.IsSuccess())))

// get hold of the deduplication key
key := wr.PeerInfo().DeduplicationKey()

// The operation for this peer is not inflight anymore -> delete it.
delete(e.inflight, string(wr.PeerInfo().ID()))
delete(e.inflight, key)

// Keep track that this peer was processed, so we don't do it again during
// this run. Unless we explicitly allow duplicate processing.
if !e.cfg.DuplicateProcessing {
e.processed[string(wr.PeerInfo().ID())] = struct{}{}
e.processed[key] = struct{}{}
logEntry = logEntry.WithField("processed", len(e.processed))
}

// Publish the processing result to the writer queue so that the data is
// saved to disk.
e.writeQueue.Push(string(wr.PeerInfo().ID()), wr, 0)
e.writeQueue.Push(key, wr, 0)

// let the handler work on the new peer result
newTasks := e.handler.HandlePeerResult(ctx, result)
Expand Down Expand Up @@ -413,21 +416,25 @@ func (e *Engine[I, R]) handleWriteResult(ctx context.Context, result Result[Writ
}

func (e *Engine[I, R]) enqueueTask(task I) {
mapKey := string(task.ID())
key := task.DeduplicationKey()

// Don't add this peer to the queue if we're currently querying it
if _, isInflight := e.inflight[mapKey]; isInflight {
if _, isInflight := e.inflight[key]; isInflight {
return
}

// Don't add the peer to the queue if we have already processed it
if _, processed := e.processed[mapKey]; processed {
// Note: we handle the DuplicateProcessing logic when Nebula runs
// in monitoring mode on the side where we populate this `processed`
// map. If we handled it here and still kept track of all processed
// peers, we would have a memory leak.
if _, processed := e.processed[key]; processed {
return
}

// Check if we have already queued this peer. If so, merge the new
// information with the already existing ones.
queuedTask, isQueued := e.peerQueue.Find(mapKey)
queuedTask, isQueued := e.peerQueue.Find(key)
if isQueued {
task = task.Merge(queuedTask)
}
Expand All @@ -445,9 +452,9 @@ func (e *Engine[I, R]) enqueueTask(task I) {
// If the peer was already queued we only update its priority. If the
// peer wasn't queued, we push it to the queue.
if isQueued {
e.peerQueue.Update(mapKey, task, priority)
e.peerQueue.Update(key, task, priority)
} else {
e.peerQueue.Push(mapKey, task, priority)
e.peerQueue.Push(key, task, priority)
}
}

Expand Down
86 changes: 40 additions & 46 deletions discv4/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,13 @@ import (
"math/rand"
"net/netip"
"strings"
"sync"
"time"

mapset "github.com/deckarep/golang-set/v2"
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discover/v4wire"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
log "github.com/sirupsen/logrus"

"github.com/dennis-tra/nebula-crawler/config"
Expand Down Expand Up @@ -102,7 +100,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
}

if c.cfg.KeepENR {
properties["enr"] = task.Node.String() // discV4Result.ENR.String() panics :/
properties["enr"] = task.enr // discV4Result.ENR.String() panics :/
}

// keep track of all unknown connection errors
Expand Down Expand Up @@ -172,6 +170,8 @@ func (c *Crawler) crawlDiscV4(ctx context.Context, pi PeerInfo) <-chan DiscV4Res
resultCh := make(chan DiscV4Result)

go func() {
defer close(resultCh)

// the final result struct
result := DiscV4Result{}

Expand All @@ -198,15 +198,15 @@ func (c *Crawler) crawlDiscV4(ctx context.Context, pi PeerInfo) <-chan DiscV4Res

result.Strategy = determineStrategy(closestSet)

var remainingClosest map[peer.ID]PeerInfo
var remainingClosest map[string]PeerInfo
switch result.Strategy {
case crawlStrategySingleProbe:
remainingClosest = c.crawlRemainingBucketsConcurrently(pi.Node, pi.udpAddr, 1)
remainingClosest = c.crawlRemainingBucketsSequentially(pi.Node, pi.udpAddr, 1)
case crawlStrategyMultiProbe:
remainingClosest = c.crawlRemainingBucketsConcurrently(pi.Node, pi.udpAddr, 3)
remainingClosest = c.crawlRemainingBucketsSequentially(pi.Node, pi.udpAddr, 3)
case crawlStrategyRandomProbe:
probesPerBucket := int(1.3333 * discover.BucketSize / (float32(len(closestMap)) / float32(probes)))
remainingClosest = c.crawlRemainingBucketsConcurrently(pi.Node, pi.udpAddr, probesPerBucket)
remainingClosest = c.crawlRemainingBucketsSequentially(pi.Node, pi.udpAddr, probesPerBucket)
default:
panic("unexpected strategy: " + string(result.Strategy))
}
Expand All @@ -222,7 +222,7 @@ func (c *Crawler) crawlDiscV4(ctx context.Context, pi PeerInfo) <-chan DiscV4Res

result.RoutingTable = &core.RoutingTable[PeerInfo]{
PeerID: pi.ID(),
Neighbors: []PeerInfo{},
Neighbors: make([]PeerInfo, 0, len(closestMap)),
ErrorBits: uint16(0),
Error: err,
}
Expand All @@ -241,18 +241,16 @@ func (c *Crawler) crawlDiscV4(ctx context.Context, pi PeerInfo) <-chan DiscV4Res
case resultCh <- result:
case <-ctx.Done():
}

close(resultCh)
}()

return resultCh
}

func (c *Crawler) probeBucket0(pi PeerInfo, probes int, returnedENR bool) (map[peer.ID]PeerInfo, []mapset.Set[peer.ID], time.Time, error) {
func (c *Crawler) probeBucket0(pi PeerInfo, probes int, returnedENR bool) (map[string]PeerInfo, []mapset.Set[string], time.Time, error) {
var (
respondedAt time.Time
closestMap = make(map[peer.ID]PeerInfo)
closestSets []mapset.Set[peer.ID]
closestMap = make(map[string]PeerInfo)
closestSets []mapset.Set[string]
errs []error
)

Expand Down Expand Up @@ -290,7 +288,7 @@ func (c *Crawler) probeBucket0(pi PeerInfo, probes int, returnedENR bool) (map[p
continue
}

closestMap[pi.ID()] = pi
closestMap[pi.DeduplicationKey()] = pi
}

closestSets = append(closestSets, mapset.NewThreadUnsafeSetFromMapKeys(closestMap))
Expand All @@ -311,15 +309,15 @@ const (
crawlStrategyRandomProbe CrawlStrategy = "random-probe"
)

func determineStrategy(sets []mapset.Set[peer.ID]) CrawlStrategy {
func determineStrategy(sets []mapset.Set[string]) CrawlStrategy {
// Calculate the average difference between two responses. If the response
// sizes are always 16, one new peer will result in a symmetric difference
// of cardinality 2. One peer in the first set that's not in the second and one
// peer in the second that's not in the first set. We consider that it's the
// happy path if the average symmetric difference is less than 2.
avgSymDiff := float32(0)
diffCount := float32(0)
allNodes := mapset.NewThreadUnsafeSet[peer.ID]()
allNodes := mapset.NewThreadUnsafeSet[string]()
for i := 0; i < len(sets); i++ {
allNodes = allNodes.Union(sets[i])
for j := i + 1; j < len(sets); j++ {
Expand All @@ -339,47 +337,43 @@ func determineStrategy(sets []mapset.Set[peer.ID]) CrawlStrategy {
}
}

func (c *Crawler) crawlRemainingBucketsConcurrently(node *enode.Node, udpAddr netip.AddrPort, probesPerBucket int) map[peer.ID]PeerInfo {
var wg sync.WaitGroup
func (c *Crawler) crawlRemainingBucketsSequentially(node *enode.Node, udpAddr netip.AddrPort, probesPerBucket int) map[string]PeerInfo {
timeouts := 0
allNeighbors := map[string]PeerInfo{}

allNeighborsMu := sync.Mutex{}
allNeighbors := map[peer.ID]PeerInfo{}
OUTER:
for i := 1; i < 15; i++ { // although there are 17 buckets, GenRandomPublicKey only supports the first 16
for j := 0; j < probesPerBucket; j++ {
wg.Add(1)

go func() {
defer wg.Done()
// first, we generate a random key that falls into bucket 0
targetKey, err := GenRandomPublicKey(node.ID(), i)
if err != nil {
log.WithError(err).WithField("nodeID", node.ID().String()).Warnf("Failed generating random key for bucket %d", i)
break
}

// first, we generate a random key that falls into bucket 0
targetKey, err := GenRandomPublicKey(node.ID(), i)
if err != nil {
log.WithError(err).WithField("nodeID", node.ID().String()).Warnf("Failed generating random key for bucket %d", i)
return
// second, we do the Find node request
closest, err := c.listener.FindNode(node.ID(), udpAddr, targetKey)
if errors.Is(err, discover.ErrTimeout) {
timeouts += 1
if timeouts > 3 {
break OUTER
}
continue
} else if err != nil {
break OUTER
}

// second, we do the Find node request
closest, err := c.listener.FindNode(node.ID(), udpAddr, targetKey)
for _, c := range closest {
pi, err := NewPeerInfo(c)
if err != nil {
return
}

// third, update our neighbors map
allNeighborsMu.Lock()
defer allNeighborsMu.Unlock()

for _, c := range closest {
pi, err := NewPeerInfo(c)
if err != nil {
log.WithError(err).Warnln("Failed parsing ethereum node neighbor")
continue
}
allNeighbors[pi.ID()] = pi
log.WithError(err).Warnln("Failed parsing ethereum node neighbor")
continue
}
}()
allNeighbors[pi.DeduplicationKey()] = pi
}
}
}
wg.Wait()

return allNeighbors
}
Expand Down
37 changes: 29 additions & 8 deletions discv4/driver_crawler.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package discv4

import (
"crypto/ecdsa"
"crypto/elliptic"
crand "crypto/rand"
"fmt"
"math"
"net"
Expand Down Expand Up @@ -38,6 +36,7 @@ type PeerInfo struct {
peerID peer.ID
maddrs []ma.Multiaddr
udpAddr netip.AddrPort
enr string
}

var _ core.PeerInfo[PeerInfo] = (*PeerInfo)(nil)
Expand Down Expand Up @@ -96,6 +95,7 @@ func NewPeerInfo(node *enode.Node) (PeerInfo, error) {
peerID: peerID,
maddrs: maddrs,
udpAddr: udpAddr,
enr: node.String(),
}

return pi, nil
Expand All @@ -114,6 +114,13 @@ func (p PeerInfo) Merge(other PeerInfo) PeerInfo {
return p
}

func (p PeerInfo) DeduplicationKey() string {
// previously this was: p.Node.String() but a CPU profile revealed that the
// process of encoding the public key takes a lot CPU cycles. Especially
// because we're calling DeduplicationKey very often!
return p.enr
}

type CrawlDriverConfig struct {
Version string
TrackNeighbors bool
Expand Down Expand Up @@ -239,16 +246,19 @@ var logOnce sync.Once

func (d *CrawlDriver) NewWorker() (core.Worker[PeerInfo, core.CrawlResult[PeerInfo]], error) {
// If I'm not using the below elliptic curve, some Ethereum clients will reject communication
priv, err := ecdsa.GenerateKey(ethcrypto.S256(), crand.Reader)
priv, err := ethcrypto.GenerateKey()
if err != nil {
return nil, fmt.Errorf("new ethereum ecdsa key: %w", err)
}

ethNode := enode.NewLocalNode(d.peerstore, priv)
laddr := &net.UDPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 0,
}

conn, err := net.ListenUDP("udp", nil)
conn, err := net.ListenUDP("udp4", laddr)
if err != nil {
return nil, fmt.Errorf("listen on udp port: %w", err)
return nil, fmt.Errorf("listen on udp4 port: %w", err)
}

if err = conn.SetReadBuffer(d.cfg.UDPBufferSize); err != nil {
Expand All @@ -271,9 +281,20 @@ func (d *CrawlDriver) NewWorker() (core.Worker[PeerInfo, core.CrawlResult[PeerIn

log.Debugln("Listening on UDP port ", conn.LocalAddr().String(), " for Ethereum discovery")

ethNode := enode.NewLocalNode(d.peerstore, priv)
udpAddr := conn.LocalAddr().(*net.UDPAddr)
if udpAddr.IP.IsUnspecified() {
ethNode.SetFallbackIP(net.ParseIP("127.0.0.1"))
} else {
ethNode.SetFallbackIP(udpAddr.IP)
}
ethNode.SetFallbackUDP(udpAddr.Port)

discvxCfg := discover.Config{
PrivateKey: priv,
Unhandled: d.unhandledChan,
PrivateKey: priv,
Unhandled: d.unhandledChan,
NoFindnodeLivenessCheck: true,
RefreshInterval: 100 * time.Hour, // turn off
}
listener, err := discover.ListenV4(conn, ethNode, discvxCfg)
if err != nil {
Expand Down
Loading
Loading