diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 29be40f192..5095396794 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -630,6 +630,19 @@ func (pool *TxPool) PendingSize() int { return count } +// IteratePending iterates over the [pool.pending] until [f] returns false. +func (pool *TxPool) IteratePending(f func(tx *types.Transaction) bool) { + pending := pool.Pending(true) + + for _, list := range pending { + for _, tx := range list { + if !f(tx) { + return + } + } + } +} + // Locals retrieves the accounts currently considered local by the pool. func (pool *TxPool) Locals() []common.Address { pool.mu.Lock() diff --git a/go.mod b/go.mod index ed7007a6f0..10ced6c0fd 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( github.com/VictoriaMetrics/fastcache v1.10.0 - github.com/ava-labs/avalanchego v1.10.9-rc.4 + github.com/ava-labs/avalanchego v1.10.10-rc.0 github.com/cespare/cp v0.1.0 github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 github.com/davecgh/go-spew v1.1.1 @@ -44,6 +44,7 @@ require ( golang.org/x/sys v0.8.0 golang.org/x/text v0.8.0 golang.org/x/time v0.0.0-20220922220347-f3bd1da661af + google.golang.org/protobuf v1.30.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -134,7 +135,6 @@ require ( gonum.org/v1/gonum v0.11.0 // indirect google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect google.golang.org/grpc v1.55.0 // indirect - google.golang.org/protobuf v1.30.0 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index 651ec8cf22..480048ea5a 100644 --- a/go.sum +++ b/go.sum @@ -51,12 +51,14 @@ github.com/VictoriaMetrics/fastcache v1.10.0 h1:5hDJnLsKLpnUEToub7ETuRu8RCkb40wo github.com/VictoriaMetrics/fastcache v1.10.0/go.mod h1:tjiYeEfYXCqacuvYw/7UoDIeJaNxq6132xHICNP77w8= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/ava-labs/avalanchego v1.10.9-rc.4 h1:vtavPfRiF6r1Zc6RV8/arEfVpe9GQsLWHbMfIWkHbMI= -github.com/ava-labs/avalanchego v1.10.9-rc.4/go.mod h1:vTBLl1zK36olfLRA7IUfdbvphWqlkuarIoXxvZTHZVw= +github.com/ava-labs/avalanchego v1.10.10-rc.0 h1:6VjkpwhAJ0tDNJK+UIUD8WIb5VelgH3w61mgk7JAkDQ= +github.com/ava-labs/avalanchego v1.10.10-rc.0/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -123,6 +125,7 @@ github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 h1:ytcWPaNPhNoG github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811/go.mod h1:Nb5lgvnQ2+oGlE/EyZy4+2/CxRh9KfvCXnag1vtpxVM= github.com/cockroachdb/redact v1.1.3 h1:AKZds10rFSIj7qADf0g46UixK8NNLwWTNdCIGS5wfSQ= github.com/cockroachdb/redact v1.1.3/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= +github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2/go.mod h1:8BT+cPK6xvFOcRlk0R8eg+OTkcqI6baNH4xAkpiYVvQ= github.com/codegangsta/inject v0.0.0-20150114235600-33e0aa1cb7c0/go.mod h1:4Zcjuz89kmFXt9morQgcfYZAYZ5n8WHjt81YYWIwtTM= github.com/consensys/bavard v0.1.13 h1:oLhMLOFGTLdlda/kma4VOJazblc7IM5y5QPd2A/YjhQ= github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH223nGHukI= @@ -209,6 +212,7 @@ github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxI github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -370,13 +374,16 @@ github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7Bd github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/kataras/golog v0.0.10/go.mod h1:yJ8YKCmyL+nWjERB90Qwn+bdyBZsaQwU3bTVFgkFIp8= github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYbq3UhfoFmE= @@ -407,6 +414,7 @@ github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4F github.com/labstack/echo/v4 v4.5.0/go.mod h1:czIriw4a0C1dFun+ObrXp7ok03xON0N1awStJ6ArI7Y= github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c= +github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= @@ -448,9 +456,11 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= @@ -1023,6 +1033,7 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/gossip/bloom.go b/gossip/bloom.go new file mode 100644 index 0000000000..c4f845d2f6 --- /dev/null +++ b/gossip/bloom.go @@ -0,0 +1,131 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "encoding/binary" + "hash" + "time" + + bloomfilter "github.com/holiman/bloomfilter/v2" + "golang.org/x/exp/rand" +) + +const hashLength = 32 + +var _ hash.Hash64 = (*hasher)(nil) + +type Hash [hashLength]byte + +// NewBloomFilter returns a new instance of a bloom filter with at most +// [maxExpectedElements] elements anticipated at any moment, and a collision +// rate of [falsePositiveProbability]. +func NewBloomFilter( + maxExpectedElements uint64, + falsePositiveProbability float64, +) (*BloomFilter, error) { + bloom, err := bloomfilter.NewOptimal( + maxExpectedElements, + falsePositiveProbability, + ) + if err != nil { + return nil, err + } + + bloomFilter := &BloomFilter{ + Bloom: bloom, + Salt: randomSalt(), + } + return bloomFilter, nil +} + +type BloomFilter struct { + Bloom *bloomfilter.Filter + // Salt is provided to eventually unblock collisions in Bloom. It's possible + // that conflicting Gossipable items collide in the bloom filter, so a salt + // is generated to eventually resolve collisions. + Salt Hash +} + +func (b *BloomFilter) Add(gossipable Gossipable) { + h := gossipable.GetHash() + salted := hasher{ + hash: h[:], + salt: b.Salt, + } + b.Bloom.Add(salted) +} + +func (b *BloomFilter) Has(gossipable Gossipable) bool { + h := gossipable.GetHash() + salted := hasher{ + hash: h[:], + salt: b.Salt, + } + return b.Bloom.Contains(salted) +} + +// ResetBloomFilterIfNeeded resets a bloom filter if it breaches a ratio of +// filled elements. Returns true if the bloom filter was reset. +func ResetBloomFilterIfNeeded( + bloomFilter *BloomFilter, + maxFilledRatio float64, +) bool { + if bloomFilter.Bloom.PreciseFilledRatio() < maxFilledRatio { + return false + } + + // it's not possible for this to error assuming that the original + // bloom filter's parameters were valid + fresh, _ := bloomfilter.New(bloomFilter.Bloom.M(), bloomFilter.Bloom.K()) + bloomFilter.Bloom = fresh + bloomFilter.Salt = randomSalt() + return true +} + +func randomSalt() Hash { + salt := Hash{} + r := rand.New(rand.NewSource(uint64(time.Now().Nanosecond()))) + _, _ = r.Read(salt[:]) + return salt +} + +var _ hash.Hash64 = (*hasher)(nil) + +type hasher struct { + hash []byte + salt Hash +} + +func (h hasher) Write(p []byte) (n int, err error) { + h.hash = append(h.hash, p...) + return len(p), nil +} + +func (h hasher) Sum(b []byte) []byte { + h.hash = append(h.hash, b...) + return h.hash +} + +func (h hasher) Reset() { + reset := Hash{} + h.hash = reset[:] +} + +func (h hasher) BlockSize() int { + return hashLength +} + +func (h hasher) Sum64() uint64 { + salted := Hash{} + for i := 0; i < len(h.hash) && i < hashLength; i++ { + salted[i] = h.hash[i] ^ h.salt[i] + } + + return binary.BigEndian.Uint64(salted[:]) +} + +func (h hasher) Size() int { + return len(h.hash) +} diff --git a/gossip/bloom_test.go b/gossip/bloom_test.go new file mode 100644 index 0000000000..7a17579932 --- /dev/null +++ b/gossip/bloom_test.go @@ -0,0 +1,64 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "testing" + + bloomfilter "github.com/holiman/bloomfilter/v2" + "github.com/stretchr/testify/require" +) + +func TestBloomFilterRefresh(t *testing.T) { + tests := []struct { + name string + refreshRatio float64 + add []*testTx + expected []*testTx + }{ + { + name: "no refresh", + refreshRatio: 1, + add: []*testTx{ + {hash: Hash{0}}, + }, + expected: []*testTx{ + {hash: Hash{0}}, + }, + }, + { + name: "refresh", + refreshRatio: 0.1, + add: []*testTx{ + {hash: Hash{0}}, + {hash: Hash{1}}, + }, + expected: []*testTx{ + {hash: Hash{1}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + b, err := bloomfilter.New(10, 1) + require.NoError(err) + bloom := BloomFilter{ + Bloom: b, + } + + for _, item := range tt.add { + _ = ResetBloomFilterIfNeeded(&bloom, tt.refreshRatio) + bloom.Add(item) + } + + require.Equal(uint64(len(tt.expected)), bloom.Bloom.N()) + + for _, expected := range tt.expected { + require.True(bloom.Has(expected)) + } + }) + } +} diff --git a/gossip/gossip.go b/gossip/gossip.go new file mode 100644 index 0000000000..ccd39a3fc2 --- /dev/null +++ b/gossip/gossip.go @@ -0,0 +1,141 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "context" + "time" + + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/golang/protobuf/proto" + "go.uber.org/zap" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + + "github.com/ava-labs/coreth/gossip/proto/pb" +) + +// GossipableAny exists to help create non-nil pointers to a concrete Gossipable +// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go +type GossipableAny[T any] interface { + *T + Gossipable +} + +type Config struct { + Frequency time.Duration + PollSize int +} + +func NewGossiper[T any, U GossipableAny[T]]( + config Config, + log logging.Logger, + set Set[U], + client *p2p.Client, +) *Gossiper[T, U] { + return &Gossiper[T, U]{ + config: config, + log: log, + set: set, + client: client, + } +} + +type Gossiper[T any, U GossipableAny[T]] struct { + config Config + log logging.Logger + set Set[U] + client *p2p.Client +} + +func (g *Gossiper[_, _]) Gossip(ctx context.Context) { + gossipTicker := time.NewTicker(g.config.Frequency) + defer gossipTicker.Stop() + + for { + select { + case <-gossipTicker.C: + if err := g.gossip(ctx); err != nil { + g.log.Warn("failed to gossip", zap.Error(err)) + } + case <-ctx.Done(): + g.log.Debug("shutting down gossip") + return + } + } +} + +func (g *Gossiper[_, _]) gossip(ctx context.Context) error { + bloom, salt, err := g.set.GetFilter() + if err != nil { + return err + } + + request := &pb.PullGossipRequest{ + Filter: bloom, + Salt: salt, + } + msgBytes, err := proto.Marshal(request) + if err != nil { + return err + } + + for i := 0; i < g.config.PollSize; i++ { + if err := g.client.AppRequestAny(ctx, msgBytes, g.handleResponse); err != nil { + return err + } + } + + return nil +} + +func (g *Gossiper[T, U]) handleResponse( + nodeID ids.NodeID, + responseBytes []byte, + err error, +) { + if err != nil { + g.log.Debug( + "failed gossip request", + zap.Stringer("nodeID", nodeID), + zap.Error(err), + ) + return + } + + response := &pb.PullGossipResponse{} + if err := proto.Unmarshal(responseBytes, response); err != nil { + g.log.Debug("failed to unmarshal gossip response", zap.Error(err)) + return + } + + for _, bytes := range response.Gossip { + gossipable := U(new(T)) + if err := gossipable.Unmarshal(bytes); err != nil { + g.log.Debug( + "failed to unmarshal gossip", + zap.Stringer("nodeID", nodeID), + zap.Error(err), + ) + continue + } + + hash := gossipable.GetHash() + g.log.Debug( + "received gossip", + zap.Stringer("nodeID", nodeID), + zap.Binary("hash", hash[:]), + ) + if err := g.set.Add(gossipable); err != nil { + g.log.Debug( + "failed to add gossip to the known set", + zap.Stringer("nodeID", nodeID), + zap.Binary("id", hash[:]), + zap.Error(err), + ) + continue + } + } +} diff --git a/gossip/gossip_test.go b/gossip/gossip_test.go new file mode 100644 index 0000000000..dc9cef3944 --- /dev/null +++ b/gossip/gossip_test.go @@ -0,0 +1,166 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" +) + +func TestGossiperShutdown(t *testing.T) { + config := Config{Frequency: time.Second} + gossiper := NewGossiper[testTx](config, logging.NoLog{}, nil, nil) + ctx, cancel := context.WithCancel(context.Background()) + + wg := &sync.WaitGroup{} + wg.Add(1) + + go func() { + gossiper.Gossip(ctx) + wg.Done() + }() + + cancel() + wg.Wait() +} + +func TestGossiperGossip(t *testing.T) { + tests := []struct { + name string + maxResponseSize int + requester []*testTx // what we have + responder []*testTx // what the peer we're requesting gossip from has + expectedPossibleValues []*testTx // possible values we can have + expectedLen int + }{ + { + name: "no gossip - no one knows anything", + }, + { + name: "no gossip - requester knows more than responder", + maxResponseSize: 1024, + requester: []*testTx{{hash: Hash{0}}}, + expectedPossibleValues: []*testTx{{hash: Hash{0}}}, + expectedLen: 1, + }, + { + name: "no gossip - requester knows everything responder knows", + maxResponseSize: 1024, + requester: []*testTx{{hash: Hash{0}}}, + responder: []*testTx{{hash: Hash{0}}}, + expectedPossibleValues: []*testTx{{hash: Hash{0}}}, + expectedLen: 1, + }, + { + name: "gossip - requester knows nothing", + maxResponseSize: 1024, + responder: []*testTx{{hash: Hash{0}}}, + expectedPossibleValues: []*testTx{{hash: Hash{0}}}, + expectedLen: 1, + }, + { + name: "gossip - requester knows less than responder", + maxResponseSize: 1024, + requester: []*testTx{{hash: Hash{0}}}, + responder: []*testTx{{hash: Hash{0}}, {hash: Hash{1}}}, + expectedPossibleValues: []*testTx{{hash: Hash{0}}, {hash: Hash{1}}}, + expectedLen: 2, + }, + { + name: "gossip - max response size exceeded", + maxResponseSize: 32, + responder: []*testTx{{hash: Hash{0}}, {hash: Hash{1}}}, + expectedPossibleValues: []*testTx{{hash: Hash{0}}, {hash: Hash{1}}}, + expectedLen: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + ctrl := gomock.NewController(t) + + responseSender := common.NewMockSender(ctrl) + responseRouter := p2p.NewRouter(logging.NoLog{}, responseSender) + responseBloom, err := NewBloomFilter(1000, 0.01) + require.NoError(err) + responseSet := testSet{ + set: set.Set[*testTx]{}, + bloom: responseBloom, + } + for _, item := range tt.responder { + require.NoError(responseSet.Add(item)) + } + peers := &p2p.Peers{} + require.NoError(peers.Connected(context.Background(), ids.EmptyNodeID, nil)) + + handler := NewHandler[*testTx](responseSet, tt.maxResponseSize) + _, err = responseRouter.RegisterAppProtocol(0x0, handler, peers) + require.NoError(err) + + requestSender := common.NewMockSender(ctrl) + requestRouter := p2p.NewRouter(logging.NoLog{}, requestSender) + + gossiped := make(chan struct{}) + requestSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, request []byte) { + go func() { + require.NoError(responseRouter.AppRequest(ctx, ids.EmptyNodeID, requestID, time.Time{}, request)) + }() + }).AnyTimes() + + responseSender.EXPECT(). + SendAppResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) { + require.NoError(requestRouter.AppResponse(ctx, nodeID, requestID, appResponseBytes)) + close(gossiped) + }).AnyTimes() + + bloom, err := NewBloomFilter(1000, 0.01) + require.NoError(err) + requestSet := testSet{ + set: set.Set[*testTx]{}, + bloom: bloom, + } + for _, item := range tt.requester { + require.NoError(requestSet.Add(item)) + } + + requestClient, err := requestRouter.RegisterAppProtocol(0x0, nil, peers) + require.NoError(err) + + config := Config{ + Frequency: 500 * time.Millisecond, + PollSize: 1, + } + gossiper := NewGossiper[testTx, *testTx](config, logging.NoLog{}, requestSet, requestClient) + received := set.Set[*testTx]{} + requestSet.onAdd = func(tx *testTx) { + received.Add(tx) + } + + require.NoError(gossiper.gossip(context.Background())) + <-gossiped + + require.Len(requestSet.set, tt.expectedLen) + require.Subset(tt.expectedPossibleValues, requestSet.set.List()) + + // we should not receive anything that we already had before we + // requested the gossip + for _, tx := range tt.requester { + require.NotContains(received, tx) + } + }) + } +} diff --git a/gossip/gossipable.go b/gossip/gossipable.go new file mode 100644 index 0000000000..24ee8d8543 --- /dev/null +++ b/gossip/gossipable.go @@ -0,0 +1,23 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +// Gossipable is an item that can be gossiped across the network +type Gossipable interface { + // GetHash represents the unique hash of this item + GetHash() Hash + Marshal() ([]byte, error) + Unmarshal(bytes []byte) error +} + +// Set holds a set of known Gossipable items +type Set[T Gossipable] interface { + // Add adds a Gossipable to the set + Add(gossipable T) error + // Get returns elements that match the provided filter function + Get(filter func(gossipable T) bool) []T + // GetFilter returns the byte representation of bloom filter and its + // corresponding salt. + GetFilter() (bloom []byte, salt []byte, err error) +} diff --git a/gossip/handler.go b/gossip/handler.go new file mode 100644 index 0000000000..bb1b096ba9 --- /dev/null +++ b/gossip/handler.go @@ -0,0 +1,85 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "context" + "errors" + "time" + + "github.com/golang/protobuf/proto" + bloomfilter "github.com/holiman/bloomfilter/v2" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + + "github.com/ava-labs/coreth/gossip/proto/pb" +) + +var ( + _ p2p.Handler = (*Handler[Gossipable])(nil) + + ErrInvalidHash = errors.New("invalid hash") +) + +func NewHandler[T Gossipable](set Set[T], maxResponseSize int) *Handler[T] { + return &Handler[T]{ + Handler: p2p.NoOpHandler{}, + set: set, + maxResponseSize: maxResponseSize, + } +} + +type Handler[T Gossipable] struct { + p2p.Handler + set Set[T] + maxResponseSize int +} + +func (h Handler[T]) AppRequest(_ context.Context, _ ids.NodeID, _ time.Time, requestBytes []byte) ([]byte, error) { + request := &pb.PullGossipRequest{} + if err := proto.Unmarshal(requestBytes, request); err != nil { + return nil, err + } + + if len(request.Salt) != hashLength { + return nil, ErrInvalidHash + } + filter := &BloomFilter{ + Bloom: &bloomfilter.Filter{}, + Salt: Hash{}, + } + if err := filter.Bloom.UnmarshalBinary(request.Filter); err != nil { + return nil, err + } + + copy(filter.Salt[:], request.Salt) + + // filter out what the requesting peer already knows about + unknown := h.set.Get(func(gossipable T) bool { + return !filter.Has(gossipable) + }) + + responseSize := 0 + gossipBytes := make([][]byte, 0, len(unknown)) + for _, gossipable := range unknown { + bytes, err := gossipable.Marshal() + if err != nil { + return nil, err + } + + responseSize += len(bytes) + if responseSize > h.maxResponseSize { + break + } + + gossipBytes = append(gossipBytes, bytes) + } + + response := &pb.PullGossipResponse{ + Gossip: gossipBytes, + } + + return proto.Marshal(response) +} diff --git a/gossip/proto/message.proto b/gossip/proto/message.proto new file mode 100644 index 0000000000..92aaac4e4b --- /dev/null +++ b/gossip/proto/message.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package gossip; + +option go_package="github.com/ava-labs/coreth/gossip/proto/pb"; + +message PullGossipRequest { + bytes filter = 1; + bytes salt = 2; +} + +message PullGossipResponse { + repeated bytes gossip = 1; +} diff --git a/gossip/test_gossip.go b/gossip/test_gossip.go new file mode 100644 index 0000000000..f153abf984 --- /dev/null +++ b/gossip/test_gossip.go @@ -0,0 +1,64 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "github.com/ava-labs/avalanchego/utils/set" +) + +var ( + _ Gossipable = (*testTx)(nil) + _ Set[*testTx] = (*testSet)(nil) +) + +type testTx struct { + hash Hash +} + +func (t *testTx) GetHash() Hash { + return t.hash +} + +func (t *testTx) Marshal() ([]byte, error) { + return t.hash[:], nil +} + +func (t *testTx) Unmarshal(bytes []byte) error { + t.hash = Hash{} + copy(t.hash[:], bytes[:]) + return nil +} + +type testSet struct { + set set.Set[*testTx] + bloom *BloomFilter + onAdd func(tx *testTx) +} + +func (t testSet) Add(gossipable *testTx) error { + t.set.Add(gossipable) + t.bloom.Add(gossipable) + if t.onAdd != nil { + t.onAdd(gossipable) + } + + return nil +} + +func (t testSet) Get(filter func(gossipable *testTx) bool) []*testTx { + result := make([]*testTx, 0) + for tx := range t.set { + if !filter(tx) { + continue + } + result = append(result, tx) + } + + return result +} + +func (t testSet) GetFilter() ([]byte, []byte, error) { + bloom, err := t.bloom.Bloom.MarshalBinary() + return bloom, t.bloom.Salt[:], err +} diff --git a/peer/network.go b/peer/network.go index 8c658a402f..22f9ace624 100644 --- a/peer/network.go +++ b/peer/network.go @@ -12,6 +12,8 @@ import ( "golang.org/x/sync/semaphore" + "github.com/ava-labs/avalanchego/network/p2p" + "github.com/ethereum/go-ethereum/log" "github.com/ava-labs/avalanchego/codec" @@ -87,23 +89,25 @@ type network struct { outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests - appSender common.AppSender // avalanchego AppSender for sending messages - codec codec.Manager // Codec used for parsing messages - crossChainCodec codec.Manager // Codec used for parsing cross chain messages - appRequestHandler message.RequestHandler // maps request type => handler - crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler - gossipHandler message.GossipHandler // maps gossip type => handler - peers *peerTracker // tracking of peers & bandwidth - appStats stats.RequestHandlerStats // Provide request handler metrics - crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics + router *p2p.Router + appSender common.AppSender // avalanchego AppSender for sending messages + codec codec.Manager // Codec used for parsing messages + crossChainCodec codec.Manager // Codec used for parsing cross chain messages + appRequestHandler message.RequestHandler // maps request type => handler + crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler + gossipHandler message.GossipHandler // maps gossip type => handler + peers *peerTracker // tracking of peers & bandwidth + appStats stats.RequestHandlerStats // Provide request handler metrics + crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics // Set to true when Shutdown is called, after which all operations on this // struct are no-ops. closed utils.Atomic[bool] } -func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network { +func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network { return &network{ + router: router, appSender: appSender, codec: codec, crossChainCodec: crossChainCodec, @@ -336,6 +340,13 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u var req message.Request if _, err := n.codec.Unmarshal(request, &req); err != nil { log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) + + // this might be a sdk request + if err := n.router.AppRequest(ctx, nodeID, requestID, deadline, request); err != nil { + log.Debug("failed to handle app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err) + return nil + } + return nil } @@ -366,7 +377,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u // Error returned by this function is expected to be treated as fatal by the engine // If [requestID] is not known, this function will emit a log and return a nil error. // If the response handler returns an error it is propagated as a fatal error. -func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { +func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error { n.lock.Lock() defer n.lock.Unlock() @@ -378,6 +389,11 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui handler, exists := n.markRequestFulfilled(requestID) if !exists { + // this might be a sdk response + if err := n.router.AppResponse(ctx, nodeID, requestID, response); err == nil { + return nil + } + // Should never happen since the engine should be managing outstanding requests log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response)) return nil @@ -395,7 +411,7 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui // - request times out before a response is provided // error returned by this function is expected to be treated as fatal by the engine // returns error only when the response handler returns an error -func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error { +func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { n.lock.Lock() defer n.lock.Unlock() @@ -407,9 +423,9 @@ func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, request handler, exists := n.markRequestFulfilled(requestID) if !exists { - // Should never happen since the engine should be managing outstanding requests - log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID) - return nil + // this must have been a sdk request + log.Debug("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID) + return n.router.AppRequestFailed(ctx, nodeID, requestID) } // We must release the slot diff --git a/peer/network_test.go b/peer/network_test.go index 3e1c32f492..d65a17bbc2 100644 --- a/peer/network_test.go +++ b/peer/network_test.go @@ -12,7 +12,9 @@ import ( "testing" "time" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" ethcommon "github.com/ethereum/go-ethereum/common" @@ -53,7 +55,7 @@ var ( func TestNetworkDoesNotConnectToItself(t *testing.T) { selfNodeID := ids.GenerateTestNodeID() - n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1) + n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), nil, nil, nil, selfNodeID, 1, 1) assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion)) assert.EqualValues(t, 0, n.Size()) } @@ -89,7 +91,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) nodeID := ids.GenerateTestNodeID() @@ -164,7 +166,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16) net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager}) client := NewNetworkClient(net) @@ -244,7 +246,7 @@ func TestAppRequestOnShutdown(t *testing.T) { codecManager := buildCodec(t, HelloRequest{}, HelloResponse{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) client := NewNetworkClient(net) nodeID := ids.GenerateTestNodeID() require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion)) @@ -293,7 +295,7 @@ func TestRequestMinVersion(t *testing.T) { } // passing nil as codec works because the net.AppRequest is never called - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16) client := NewNetworkClient(net) requestMessage := TestMessage{Message: "this is a request"} requestBytes, err := message.RequestToBytes(codecManager, requestMessage) @@ -356,7 +358,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) { processingDuration: 500 * time.Millisecond, } - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetRequestHandler(requestHandler) nodeID := ids.GenerateTestNodeID() @@ -396,7 +398,7 @@ func TestGossip(t *testing.T) { } gossipHandler := &testGossipHandler{} - clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(gossipHandler) assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion)) @@ -423,7 +425,7 @@ func TestHandleInvalidMessages(t *testing.T) { requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{}) @@ -462,7 +464,6 @@ func TestHandleInvalidMessages(t *testing.T) { assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse)) assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse)) assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse)) - assert.NoError(t, clientNetwork.AppRequestFailed(context.Background(), nodeID, requestID)) } func TestNetworkPropagatesRequestHandlerError(t *testing.T) { @@ -473,7 +474,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) { requestID := uint32(1) sender := testAppSender{} - clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{}) clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler @@ -513,7 +514,7 @@ func TestCrossChainAppRequest(t *testing.T) { }, } - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) client := NewNetworkClient(net) @@ -568,7 +569,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) { codecManager := buildCodec(t, TestMessage{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager}) client := NewNetworkClient(net) @@ -628,7 +629,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) { } codecManager := buildCodec(t, TestMessage{}) crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{}) - net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) + net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1) client := NewNetworkClient(net) exampleCrossChainRequest := ExampleCrossChainRequest{ diff --git a/plugin/evm/gossip_mempool.go b/plugin/evm/gossip_mempool.go new file mode 100644 index 0000000000..0ba204a49e --- /dev/null +++ b/plugin/evm/gossip_mempool.go @@ -0,0 +1,155 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/log" + + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/core/txpool" + "github.com/ava-labs/coreth/core/types" + "github.com/ava-labs/coreth/gossip" + "github.com/ava-labs/coreth/plugin/evm/message" +) + +var ( + _ gossip.Gossipable = (*GossipEthTx)(nil) + _ gossip.Gossipable = (*GossipAtomicTx)(nil) + _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) +) + +type GossipAtomicTx struct { + Tx *Tx `serialize:"true"` +} + +func (tx *GossipAtomicTx) GetHash() gossip.Hash { + id := tx.Tx.ID() + hash := gossip.Hash{} + copy(hash[:], id[:]) + + return hash +} + +func (tx *GossipAtomicTx) Marshal() ([]byte, error) { + return Codec.Marshal(message.Version, tx) +} + +func (tx *GossipAtomicTx) Unmarshal(bytes []byte) error { + _, err := Codec.Unmarshal(bytes, tx) + return err +} + +func NewGossipEthTxPool(mempool *txpool.TxPool) (*GossipEthTxPool, error) { + bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + if err != nil { + return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) + } + + return &GossipEthTxPool{ + mempool: mempool, + pendingTxs: make(chan core.NewTxsEvent), + bloom: bloom, + }, nil +} + +type GossipEthTxPool struct { + mempool *txpool.TxPool + pendingTxs chan core.NewTxsEvent + + bloom *gossip.BloomFilter + lock sync.RWMutex +} + +func (g *GossipEthTxPool) Subscribe(shutdownChan chan struct{}, shutdownWg *sync.WaitGroup) { + defer shutdownWg.Done() + g.mempool.SubscribeNewTxsEvent(g.pendingTxs) + + for { + select { + case <-shutdownChan: + log.Debug("shutting down subscription") + return + case pendingTxs := <-g.pendingTxs: + g.lock.Lock() + for _, pendingTx := range pendingTxs.Txs { + tx := &GossipEthTx{Tx: pendingTx} + g.bloom.Add(tx) + if gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipBloomMaxFilledRatio) { + log.Debug("resetting bloom filter", "reason", "reached max filled ratio") + + pending := g.mempool.Pending(false) + for _, pendingTxs := range pending { + for _, pendingTx := range pendingTxs { + g.bloom.Add(&GossipEthTx{Tx: pendingTx}) + } + } + } + } + g.lock.Unlock() + } + } +} + +// Add enqueues the transaction to the mempool. Subscribe should be called +// to receive an event if tx is actually added to the mempool or not. +func (g *GossipEthTxPool) Add(tx *GossipEthTx) error { + if err := g.mempool.AddRemotes([]*types.Transaction{tx.Tx})[0]; err != nil { + return err + } + + return nil +} + +func (g *GossipEthTxPool) Get(filter func(tx *GossipEthTx) bool) []*GossipEthTx { + limit := 1000 + resultSize := 0 + result := make([]*GossipEthTx, 0) + + g.mempool.IteratePending(func(tx *types.Transaction) bool { + resultSize += int(tx.Size()) + if resultSize > limit { + return false + } + + gossipTx := &GossipEthTx{ + Tx: tx, + } + result = append(result, gossipTx) + return true + }) + + return result +} + +func (g *GossipEthTxPool) GetFilter() ([]byte, []byte, error) { + g.lock.RLock() + defer g.lock.RUnlock() + + bloom, err := g.bloom.Bloom.MarshalBinary() + return bloom, g.bloom.Salt[:], err +} + +type GossipEthTx struct { + Tx *types.Transaction +} + +func (tx *GossipEthTx) GetHash() gossip.Hash { + txHash := tx.Tx.Hash() + hash := gossip.Hash{} + copy(hash[:], txHash[:]) + + return hash +} + +func (tx *GossipEthTx) Marshal() ([]byte, error) { + return tx.Tx.MarshalBinary() +} + +func (tx *GossipEthTx) Unmarshal(bytes []byte) error { + tx.Tx = &types.Transaction{} + return tx.Tx.UnmarshalBinary(bytes) +} diff --git a/plugin/evm/gossip_mempool_test.go b/plugin/evm/gossip_mempool_test.go new file mode 100644 index 0000000000..f5dbb3a697 --- /dev/null +++ b/plugin/evm/gossip_mempool_test.go @@ -0,0 +1,86 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/ava-labs/avalanchego/ids" +) + +func TestAtomicMempoolAddTx(t *testing.T) { + txs := []*GossipAtomicTx{ + { + Tx: &Tx{ + UnsignedAtomicTx: &TestUnsignedTx{ + IDV: ids.GenerateTestID(), + }, + }, + }, + { + Tx: &Tx{ + UnsignedAtomicTx: &TestUnsignedTx{ + IDV: ids.GenerateTestID(), + }, + }, + }, + } + + tests := []struct { + name string + add []*GossipAtomicTx + filter func(tx *GossipAtomicTx) bool + expected []*GossipAtomicTx + }{ + { + name: "empty", + }, + { + name: "filter matches nothing", + add: txs, + filter: func(*GossipAtomicTx) bool { + return false + }, + expected: nil, + }, + { + name: "filter matches all", + add: txs, + filter: func(*GossipAtomicTx) bool { + return true + }, + expected: txs, + }, + { + name: "filter matches subset", + add: txs, + filter: func(tx *GossipAtomicTx) bool { + return tx.Tx == txs[0].Tx + }, + expected: txs[:1], + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + m, err := NewMempool(ids.Empty, 10) + require.NoError(err) + + for _, add := range tt.add { + require.NoError(m.Add(add)) + } + + txs := m.Get(tt.filter) + require.Len(txs, len(tt.expected)) + + for _, expected := range tt.expected { + require.Contains(txs, expected) + } + }) + } +} diff --git a/plugin/evm/gossiper_atomic_gossiping_test.go b/plugin/evm/gossiper_atomic_gossiping_test.go index 73e3ce17b3..6ded11967b 100644 --- a/plugin/evm/gossiper_atomic_gossiping_test.go +++ b/plugin/evm/gossiper_atomic_gossiping_test.go @@ -11,8 +11,8 @@ import ( "time" "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/utils/set" - "github.com/stretchr/testify/assert" "github.com/ava-labs/coreth/plugin/evm/message" @@ -22,10 +22,11 @@ import ( func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) { assert := assert.New(t) - _, vm, _, sharedMemory, sender := GenesisVM(t, true, "", "", "") + _, vm, _, sharedMemory, sender := GenesisVM(t, false, "", "", "") defer func() { assert.NoError(vm.Shutdown(context.Background())) }() + assert.NoError(vm.Connected(context.Background(), ids.GenerateTestNodeID(), nil)) // Create conflicting transactions importTxs := createImportTxOptions(t, vm, sharedMemory) @@ -56,12 +57,15 @@ func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) { return nil } + assert.NoError(vm.SetState(context.Background(), snow.NormalOp)) + // Optimistically gossip raw tx assert.NoError(vm.issueTx(tx, true /*=local*/)) time.Sleep(500 * time.Millisecond) gossipedLock.Lock() assert.Equal(1, gossiped) gossipedLock.Unlock() + assert.True(vm.mempool.bloom.Has(&GossipAtomicTx{Tx: tx})) // Test hash on retry assert.NoError(vm.gossiper.GossipAtomicTxs([]*Tx{tx})) diff --git a/plugin/evm/mempool.go b/plugin/evm/mempool.go index 25b67298f4..aa1a036fd9 100644 --- a/plugin/evm/mempool.go +++ b/plugin/evm/mempool.go @@ -10,8 +10,10 @@ import ( "github.com/ava-labs/avalanchego/cache" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/coreth/metrics" "github.com/ethereum/go-ethereum/log" + + "github.com/ava-labs/coreth/gossip" + "github.com/ava-labs/coreth/metrics" ) const ( @@ -69,12 +71,19 @@ type Mempool struct { txHeap *txHeap // utxoSpenders maps utxoIDs to the transaction consuming them in the mempool utxoSpenders map[ids.ID]*Tx + // bloom is a bloom filter containing the txs in the mempool + bloom *gossip.BloomFilter metrics *mempoolMetrics } // NewMempool returns a Mempool with [maxSize] -func NewMempool(AVAXAssetID ids.ID, maxSize int) *Mempool { +func NewMempool(AVAXAssetID ids.ID, maxSize int) (*Mempool, error) { + bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + if err != nil { + return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) + } + return &Mempool{ AVAXAssetID: AVAXAssetID, issuedTxs: make(map[ids.ID]*Tx), @@ -84,8 +93,9 @@ func NewMempool(AVAXAssetID ids.ID, maxSize int) *Mempool { txHeap: newTxHeap(maxSize), maxSize: maxSize, utxoSpenders: make(map[ids.ID]*Tx), + bloom: bloom, metrics: newMempoolMetrics(), - } + }, nil } // Len returns the number of transactions in the mempool @@ -125,6 +135,10 @@ func (m *Mempool) atomicTxGasPrice(tx *Tx) (uint64, error) { return burned / gasUsed, nil } +func (m *Mempool) Add(tx *GossipAtomicTx) error { + return m.AddTx(tx.Tx) +} + // Add attempts to add [tx] to the mempool and returns an error if // it could not be addeed to the mempool. func (m *Mempool) AddTx(tx *Tx) error { @@ -266,9 +280,43 @@ func (m *Mempool) addTx(tx *Tx, force bool) error { // and CancelCurrentTx. m.newTxs = append(m.newTxs, tx) m.addPending() + + m.bloom.Add(&GossipAtomicTx{Tx: tx}) + if gossip.ResetBloomFilterIfNeeded(m.bloom, txGossipBloomMaxFilledRatio) { + log.Debug("resetting bloom filter", "reason", "reached max filled ratio") + + for _, pendingTx := range m.txHeap.minHeap.items { + m.bloom.Add(&GossipAtomicTx{Tx: pendingTx.tx}) + } + } + return nil } +func (m *Mempool) Get(filter func(tx *GossipAtomicTx) bool) []*GossipAtomicTx { + m.lock.RLock() + defer m.lock.RUnlock() + + gossipTxs := make([]*GossipAtomicTx, 0, len(m.txHeap.maxHeap.items)) + for _, item := range m.txHeap.maxHeap.items { + gossipTx := &GossipAtomicTx{Tx: item.tx} + if !filter(gossipTx) { + continue + } + gossipTxs = append(gossipTxs, gossipTx) + } + + return gossipTxs +} + +func (m *Mempool) GetFilter() ([]byte, []byte, error) { + m.lock.RLock() + defer m.lock.RUnlock() + + bloom, err := m.bloom.Bloom.MarshalBinary() + return bloom, m.bloom.Salt[:], err +} + // NextTx returns a transaction to be issued from the mempool. func (m *Mempool) NextTx() (*Tx, bool) { m.lock.Lock() diff --git a/plugin/evm/mempool_atomic_gossiping_test.go b/plugin/evm/mempool_atomic_gossiping_test.go index a0f82a8c01..a2e05e6286 100644 --- a/plugin/evm/mempool_atomic_gossiping_test.go +++ b/plugin/evm/mempool_atomic_gossiping_test.go @@ -108,13 +108,15 @@ func TestMempoolMaxMempoolSizeHandling(t *testing.T) { // shortcut to simulated almost filled mempool mempool.maxSize = 0 - assert.ErrorIs(mempool.AddTx(tx), errTooManyAtomicTx) + err := mempool.AddTx(tx) + assert.ErrorIs(err, errTooManyAtomicTx) assert.False(mempool.has(tx.ID())) // shortcut to simulated empty mempool mempool.maxSize = defaultMempoolSize - assert.NoError(mempool.AddTx(tx)) + err = mempool.AddTx(tx) + assert.NoError(err) assert.True(mempool.has(tx.ID())) } @@ -193,10 +195,12 @@ func TestMempoolPriorityDrop(t *testing.T) { mempool.maxSize = 1 tx1 := createImportTx(t, vm, ids.ID{1}, params.AvalancheAtomicTxFee) - assert.NoError(mempool.AddTx(tx1)) + err := mempool.AddTx(tx1) + assert.NoError(err) assert.True(mempool.has(tx1.ID())) tx2 := createImportTx(t, vm, ids.ID{2}, params.AvalancheAtomicTxFee) - assert.ErrorIs(mempool.AddTx(tx2), errInsufficientAtomicTxFee) + err = mempool.AddTx(tx2) + assert.ErrorIs(err, errInsufficientAtomicTxFee) assert.True(mempool.has(tx1.ID())) assert.False(mempool.has(tx2.ID())) tx3 := createImportTx(t, vm, ids.ID{3}, 2*params.AvalancheAtomicTxFee) diff --git a/plugin/evm/mempool_test.go b/plugin/evm/mempool_test.go new file mode 100644 index 0000000000..6d719a0ee2 --- /dev/null +++ b/plugin/evm/mempool_test.go @@ -0,0 +1,35 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "testing" + + "github.com/ava-labs/avalanchego/ids" + "github.com/stretchr/testify/require" +) + +func TestMempoolAddTx(t *testing.T) { + require := require.New(t) + m, err := NewMempool(ids.Empty, 5_000) + require.NoError(err) + + txs := make([]*GossipAtomicTx, 0) + for i := 0; i < 3_000; i++ { + tx := &GossipAtomicTx{ + Tx: &Tx{ + UnsignedAtomicTx: &TestUnsignedTx{ + IDV: ids.GenerateTestID(), + }, + }, + } + + txs = append(txs, tx) + require.NoError(m.Add(tx)) + } + + for _, tx := range txs { + require.True(m.bloom.Has(tx)) + } +} diff --git a/plugin/evm/tx_gossip_test.go b/plugin/evm/tx_gossip_test.go new file mode 100644 index 0000000000..be078dc0b2 --- /dev/null +++ b/plugin/evm/tx_gossip_test.go @@ -0,0 +1,251 @@ +// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package evm + +import ( + "context" + "math/big" + "sync" + "testing" + "time" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/network/p2p" + commonEng "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils" + "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" + "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/utils/set" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "google.golang.org/protobuf/proto" + + "github.com/ava-labs/coreth/core" + "github.com/ava-labs/coreth/core/types" + "github.com/ava-labs/coreth/gossip" + "github.com/ava-labs/coreth/gossip/proto/pb" + "github.com/ava-labs/coreth/params" +) + +func TestEthTxGossip(t *testing.T) { + require := require.New(t) + + // set up prefunded address + importAmount := uint64(1_000_000_000) + issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONLatest, "", "", map[ids.ShortID]uint64{ + testShortIDAddrs[0]: importAmount, + }) + defer func() { + require.NoError(vm.Shutdown(context.Background())) + }() + + importAccepted := make(chan core.NewTxPoolHeadEvent) + vm.txPool.SubscribeNewHeadEvent(importAccepted) + + importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]}) + require.NoError(err) + require.NoError(vm.issueTx(importTx, true)) + <-issuer + + blk, err := vm.BuildBlock(context.Background()) + require.NoError(err) + + require.NoError(blk.Verify(context.Background())) + require.NoError(vm.SetPreference(context.Background(), blk.ID())) + require.NoError(blk.Accept(context.Background())) + <-importAccepted + + // sender for the peer requesting gossip from [vm] + ctrl := gomock.NewController(t) + peerSender := commonEng.NewMockSender(ctrl) + router := p2p.NewRouter(logging.NoLog{}, peerSender) + + // we're only making client requests, so we don't need a server handler + client, err := router.RegisterAppProtocol(ethTxGossipProtocol, nil, nil) + require.NoError(err) + + emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + require.NoError(err) + emptyBloomFilterBytes, err := emptyBloomFilter.Bloom.MarshalBinary() + require.NoError(err) + request := &pb.PullGossipRequest{ + Filter: emptyBloomFilterBytes, + Salt: utils.RandomBytes(32), + } + + requestBytes, err := proto.Marshal(request) + require.NoError(err) + + wg := &sync.WaitGroup{} + + requestingNodeID := ids.GenerateTestNodeID() + peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) { + go func() { + require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) + }() + }).AnyTimes() + + sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { + go func() { + require.NoError(router.AppResponse(ctx, nodeID, requestID, appResponseBytes)) + }() + return nil + } + + // we only accept gossip requests from validators + mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) + require.True(ok) + mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { + return 0, nil + } + mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil + } + + // Ask the VM for any new transactions. We should get nothing at first. + wg.Add(1) + onResponse := func(nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &pb.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Empty(response.Gossip) + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() + + // Issue a tx to the VM + address := testEthAddrs[0] + key := testKeys[0].ToECDSA() + tx := types.NewTransaction(0, address, big.NewInt(10), 100_000, big.NewInt(params.LaunchMinGasPrice), nil) + signedTx, err := types.SignTx(tx, types.NewEIP155Signer(vm.chainID), key) + require.NoError(err) + + errs := vm.txPool.AddLocals([]*types.Transaction{signedTx}) + require.Len(errs, 1) + require.Nil(errs[0]) + + // wait so we aren't throttled by the vm + time.Sleep(throttlingPeriod / throttlingLimit) + + // Ask the VM for new transactions. We should get the newly issued tx. + wg.Add(1) + onResponse = func(nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &pb.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Len(response.Gossip, 1) + + gotTx := &GossipEthTx{} + require.NoError(gotTx.Unmarshal(response.Gossip[0])) + require.Equal(signedTx.Hash(), gotTx.Tx.Hash()) + + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() +} + +func TestAtomicTxGossip(t *testing.T) { + require := require.New(t) + + // set up prefunded address + importAmount := uint64(1_000_000_000) + issuer, vm, _, _, sender := GenesisVMWithUTXOs(t, true, genesisJSONApricotPhase0, "", "", map[ids.ShortID]uint64{ + testShortIDAddrs[0]: importAmount, + }) + + defer func() { + require.NoError(vm.Shutdown(context.Background())) + }() + + // sender for the peer requesting gossip from [vm] + ctrl := gomock.NewController(t) + peerSender := commonEng.NewMockSender(ctrl) + router := p2p.NewRouter(logging.NoLog{}, peerSender) + + // we're only making client requests, so we don't need a server handler + client, err := router.RegisterAppProtocol(atomicTxGossipProtocol, nil, nil) + require.NoError(err) + + emptyBloomFilter, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate) + require.NoError(err) + bloomBytes, err := emptyBloomFilter.Bloom.MarshalBinary() + require.NoError(err) + request := &pb.PullGossipRequest{ + Filter: bloomBytes, + Salt: emptyBloomFilter.Salt[:], + } + requestBytes, err := proto.Marshal(request) + require.NoError(err) + + requestingNodeID := ids.GenerateTestNodeID() + wg := &sync.WaitGroup{} + peerSender.EXPECT().SendAppRequest(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(func(ctx context.Context, nodeIDs set.Set[ids.NodeID], requestID uint32, appRequestBytes []byte) { + go func() { + require.NoError(vm.AppRequest(ctx, requestingNodeID, requestID, time.Time{}, appRequestBytes)) + }() + }).AnyTimes() + + sender.SendAppResponseF = func(ctx context.Context, nodeID ids.NodeID, requestID uint32, appResponseBytes []byte) error { + go func() { + require.NoError(router.AppResponse(ctx, nodeID, requestID, appResponseBytes)) + }() + return nil + } + + // we only accept gossip requests from validators + mockValidatorSet, ok := vm.ctx.ValidatorState.(*validators.TestState) + require.True(ok) + mockValidatorSet.GetCurrentHeightF = func(context.Context) (uint64, error) { + return 0, nil + } + mockValidatorSet.GetValidatorSetF = func(context.Context, uint64, ids.ID) (map[ids.NodeID]*validators.GetValidatorOutput, error) { + return map[ids.NodeID]*validators.GetValidatorOutput{requestingNodeID: nil}, nil + } + + // Ask the VM for any new transactions. We should get nothing at first. + wg.Add(1) + onResponse := func(nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &pb.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Empty(response.Gossip) + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() + + // issue a new tx to the vm + importTx, err := vm.newImportTx(vm.ctx.XChainID, testEthAddrs[0], initialBaseFee, []*secp256k1.PrivateKey{testKeys[0]}) + require.NoError(err) + + require.NoError(vm.issueTx(importTx, true /*=local*/)) + <-issuer + + // wait so we aren't throttled by the vm + time.Sleep(throttlingPeriod / throttlingLimit) + + // Ask the VM for new transactions. We should get the newly issued tx. + wg.Add(1) + onResponse = func(nodeID ids.NodeID, responseBytes []byte, err error) { + require.NoError(err) + + response := &pb.PullGossipResponse{} + require.NoError(proto.Unmarshal(responseBytes, response)) + require.Len(response.Gossip, 1) + + gotTx := &GossipAtomicTx{} + require.NoError(gotTx.Unmarshal(response.Gossip[0])) + require.Equal(importTx.InputUTXOs(), gotTx.Tx.InputUTXOs()) + + wg.Done() + } + require.NoError(client.AppRequest(context.Background(), set.Set[ids.NodeID]{vm.ctx.NodeID: struct{}{}}, requestBytes, onResponse)) + wg.Wait() +} diff --git a/plugin/evm/tx_test.go b/plugin/evm/tx_test.go index 3438f16296..769690d272 100644 --- a/plugin/evm/tx_test.go +++ b/plugin/evm/tx_test.go @@ -150,7 +150,9 @@ func executeTxTest(t *testing.T, test atomicTxTest) { // If this test simulates processing txs during bootstrapping (where some verification is skipped), // initialize the block building goroutines normally initialized in SetState(snow.NormalOps). // This ensures that the VM can build a block correctly during the test. - vm.initBlockBuilding() + if err := vm.initBlockBuilding(); err != nil { + t.Fatal(err) + } } if err := vm.issueTx(tx, true /*=local*/); err != nil { diff --git a/plugin/evm/vm.go b/plugin/evm/vm.go index a62245587d..c9ff043640 100644 --- a/plugin/evm/vm.go +++ b/plugin/evm/vm.go @@ -17,6 +17,7 @@ import ( "time" avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/coreth/consensus/dummy" corethConstants "github.com/ava-labs/coreth/constants" @@ -28,6 +29,7 @@ import ( "github.com/ava-labs/coreth/eth" "github.com/ava-labs/coreth/eth/ethconfig" "github.com/ava-labs/coreth/ethdb" + "github.com/ava-labs/coreth/gossip" corethPrometheus "github.com/ava-labs/coreth/metrics/prometheus" "github.com/ava-labs/coreth/miner" "github.com/ava-labs/coreth/node" @@ -127,8 +129,26 @@ const ( bytesToIDCacheSize = 5 * units.MiB targetAtomicTxsSize = 40 * units.KiB + + // p2p app protocols + ethTxGossipProtocol = 0x0 + atomicTxGossipProtocol = 0x1 + + // gossip constants + txGossipMaxResponseSize = 20 * units.KiB + txGossipBloomMaxFilledRatio = 0.75 + txGossipBloomMaxItems = 8 * 1024 + txGossipBloomFalsePositiveRate = 0.01 + maxValidatorSetStaleness = time.Minute + throttlingPeriod = 10 * time.Second + throttlingLimit = 2 ) +var txGossipConfig = gossip.Config{ + Frequency: 10 * time.Second, + PollSize: 10, +} + // Define the API endpoints for the VM const ( avaxEndpoint = "/avax" @@ -209,7 +229,9 @@ func init() { // VM implements the snowman.ChainVM interface type VM struct { - ctx *snow.Context + ctx *snow.Context // TODO rename to snowCtx + backgroundCtx context.Context // TODO rename to ctx + cancel context.CancelFunc // *chain.State helps to implement the VM interface by wrapping blocks // with an efficient caching layer. *chain.State @@ -256,7 +278,9 @@ type VM struct { builder *blockBuilder - gossiper Gossiper + gossiper Gossiper + ethTxGossiper *gossip.Gossiper[GossipEthTx, *GossipEthTx] + atomicTxGossiper *gossip.Gossiper[GossipAtomicTx, *GossipAtomicTx] baseCodec codec.Registry codec codec.Manager @@ -276,6 +300,11 @@ type VM struct { client peer.NetworkClient networkCodec codec.Manager + validators *p2p.Validators + router *p2p.Router + ethTxGossipClient *p2p.Client + atomicTxGossipClient *p2p.Client + // Metrics multiGatherer avalanchegoMetrics.MultiGatherer @@ -313,7 +342,7 @@ func (vm *VM) GetActivationTime() time.Time { // Initialize implements the snowman.ChainVM interface func (vm *VM) Initialize( - _ context.Context, + ctx context.Context, chainCtx *snow.Context, dbManager manager.Manager, genesisBytes []byte, @@ -338,6 +367,9 @@ func (vm *VM) Initialize( deprecateMsg := vm.config.Deprecate() vm.ctx = chainCtx + ctx, cancel := context.WithCancel(ctx) + vm.backgroundCtx = ctx + vm.cancel = cancel // Create logger alias, err := vm.ctx.BCLookup.PrimaryAlias(vm.ctx.ChainID) @@ -499,15 +531,20 @@ func (vm *VM) Initialize( vm.codec = Codec // TODO: read size from settings - vm.mempool = NewMempool(chainCtx.AVAXAssetID, defaultMempoolSize) + vm.mempool, err = NewMempool(chainCtx.AVAXAssetID, defaultMempoolSize) + if err != nil { + return fmt.Errorf("failed to initialize mempool: %w", err) + } if err := vm.initializeMetrics(); err != nil { return err } // initialize peer network + vm.validators = p2p.NewValidators(vm.ctx.Log, vm.ctx.SubnetID, vm.ctx.ValidatorState, maxValidatorSetStaleness) + vm.router = p2p.NewRouter(vm.ctx.Log, appSender) vm.networkCodec = message.Codec - vm.Network = peer.NewNetwork(appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) + vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests) vm.client = peer.NewNetworkClient(vm.Network) if err := vm.initializeChain(lastAcceptedHash); err != nil { @@ -933,7 +970,9 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error { return vm.fx.Bootstrapping() case snow.NormalOp: // Initialize goroutines related to block building once we enter normal operation as there is no need to handle mempool gossip before this point. - vm.initBlockBuilding() + if err := vm.initBlockBuilding(); err != nil { + return fmt.Errorf("failed to initialize block building: %w", err) + } vm.bootstrapped = true return vm.fx.Bootstrapped() default: @@ -942,13 +981,64 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error { } // initBlockBuilding starts goroutines to manage block building -func (vm *VM) initBlockBuilding() { +func (vm *VM) initBlockBuilding() error { // NOTE: gossip network must be initialized first otherwise ETH tx gossip will not work. gossipStats := NewGossipStats() vm.gossiper = vm.createGossiper(gossipStats) vm.builder = vm.NewBlockBuilder(vm.toEngine) vm.builder.awaitSubmittedTxs() vm.Network.SetGossipHandler(NewGossipHandler(vm, gossipStats)) + + ethTxPool, err := NewGossipEthTxPool(vm.txPool) + if err != nil { + return err + } + vm.shutdownWg.Add(1) + go ethTxPool.Subscribe(vm.shutdownChan, &vm.shutdownWg) + + ethTxGossipHandler := &p2p.ValidatorHandler{ + ValidatorSet: vm.validators, + Handler: &p2p.ThrottlerHandler{ + Throttler: p2p.NewSlidingWindowThrottler(throttlingPeriod, throttlingLimit), + Handler: gossip.NewHandler[*GossipEthTx](ethTxPool, txGossipMaxResponseSize), + }, + } + ethTxGossipClient, err := vm.router.RegisterAppProtocol(ethTxGossipProtocol, ethTxGossipHandler, vm.validators) + if err != nil { + return err + } + vm.ethTxGossipClient = ethTxGossipClient + + atomicTxGossipHandler := &p2p.ValidatorHandler{ + ValidatorSet: vm.validators, + Handler: &p2p.ThrottlerHandler{ + Throttler: p2p.NewSlidingWindowThrottler(throttlingPeriod, throttlingLimit), + Handler: gossip.NewHandler[*GossipAtomicTx](vm.mempool, txGossipMaxResponseSize), + }, + } + atomicTxGossipClient, err := vm.router.RegisterAppProtocol(atomicTxGossipProtocol, atomicTxGossipHandler, vm.validators) + if err != nil { + return err + } + vm.atomicTxGossipClient = atomicTxGossipClient + + vm.ethTxGossiper = gossip.NewGossiper[GossipEthTx, *GossipEthTx]( + txGossipConfig, + vm.ctx.Log, + ethTxPool, + vm.ethTxGossipClient, + ) + go vm.ethTxGossiper.Gossip(vm.backgroundCtx) + + vm.atomicTxGossiper = gossip.NewGossiper[GossipAtomicTx, *GossipAtomicTx]( + txGossipConfig, + vm.ctx.Log, + vm.mempool, + vm.atomicTxGossipClient, + ) + go vm.atomicTxGossiper.Gossip(vm.backgroundCtx) + + return nil } // setAppRequestHandlers sets the request handlers for the VM to serve state sync @@ -986,6 +1076,7 @@ func (vm *VM) Shutdown(context.Context) error { if vm.ctx == nil { return nil } + vm.cancel() vm.Network.Shutdown() if err := vm.StateSyncClient.Shutdown(); err != nil { log.Error("error stopping state syncer", "err", err) @@ -1333,7 +1424,7 @@ func (vm *VM) issueTx(tx *Tx, local bool) error { } return err } - // NOTE: Gossiping of the issued [Tx] is handled in [AddTx] + return nil }