Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom filter gossip #266

Closed
wants to merge 81 commits into from
Closed
Show file tree
Hide file tree
Changes from 80 commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
76b6dcc
bloom filter gossip
joshua-kim Jun 6, 2023
2df7199
nit
joshua-kim Jul 27, 2023
7b138e3
nit
joshua-kim Jul 31, 2023
74c6d87
nits
joshua-kim Jul 31, 2023
88c2d8e
nits
joshua-kim Aug 1, 2023
2d0aee3
nits
joshua-kim Aug 1, 2023
5926bca
nit
joshua-kim Aug 1, 2023
bc5224b
nit
joshua-kim Aug 1, 2023
642d03c
nit
joshua-kim Aug 1, 2023
9fc69eb
nit
joshua-kim Aug 1, 2023
da10671
public tag
joshua-kim Aug 1, 2023
bd2cb74
remove double pointer
joshua-kim Aug 1, 2023
7a91014
nits
joshua-kim Aug 1, 2023
d31e12d
nit
joshua-kim Aug 1, 2023
e73eb43
nit
joshua-kim Aug 1, 2023
46126c7
nit
joshua-kim Aug 1, 2023
3c45c3f
Update gossip/test_gossip.go
joshua-kim Aug 2, 2023
9720e3c
nit
joshua-kim Aug 1, 2023
020c43f
nit
joshua-kim Aug 3, 2023
4b3ee0a
nit
joshua-kim Aug 4, 2023
f5ee846
nit
joshua-kim Aug 4, 2023
f925c43
nit
joshua-kim Aug 4, 2023
dd8bb71
changes
darioush Aug 7, 2023
16507bd
merge master
darioush Aug 7, 2023
c401a7e
revert
darioush Aug 7, 2023
5aaff37
use consistent version
darioush Aug 7, 2023
fb5b1ac
bump go version
darioush Aug 7, 2023
236e426
fix
darioush Aug 7, 2023
152d450
Merge pull request #295 from ava-labs/bloom-filter-gossip-review
joshua-kim Aug 7, 2023
66bbcac
nit
joshua-kim Aug 8, 2023
8a50023
more unit tests
joshua-kim Aug 8, 2023
a360c28
nit
joshua-kim Aug 8, 2023
088f747
nit
joshua-kim Aug 8, 2023
90f0301
nit
joshua-kim Aug 8, 2023
1fe4b83
more unit tests
joshua-kim Aug 8, 2023
ef521d7
nits
joshua-kim Aug 8, 2023
dc471cc
more unit tests
joshua-kim Aug 8, 2023
45871c6
nits
joshua-kim Aug 9, 2023
a222039
fix ut
joshua-kim Aug 9, 2023
3d1a15e
nit
joshua-kim Aug 9, 2023
74496f1
fix fickle test
joshua-kim Aug 11, 2023
48964aa
remove useless goroutine
joshua-kim Aug 14, 2023
ff9a842
fix bloom params
joshua-kim Aug 15, 2023
4fc0855
fix salt
joshua-kim Aug 15, 2023
1f8a298
nits
joshua-kim Aug 15, 2023
9e98721
increase gossip frequency
joshua-kim Aug 15, 2023
369bd96
nit
joshua-kim Aug 15, 2023
b74f3f0
fix test flake
joshua-kim Aug 15, 2023
957ee8e
nit
joshua-kim Aug 15, 2023
4446f4f
nit
joshua-kim Aug 15, 2023
a4d1c9f
nit
joshua-kim Aug 15, 2023
b06bc7e
nit
joshua-kim Aug 15, 2023
2de3042
comment
joshua-kim Aug 16, 2023
a45b07e
nit
joshua-kim Aug 22, 2023
cc64cdd
drop messages from non-validators
joshua-kim Aug 22, 2023
1b94dc4
nit
joshua-kim Aug 22, 2023
1363616
nit
joshua-kim Aug 28, 2023
9d6675c
Squashed commit of the following:
joshua-kim Aug 28, 2023
732f124
nit
joshua-kim Aug 29, 2023
580b9f0
nit
joshua-kim Aug 30, 2023
9c069ed
nit
joshua-kim Aug 30, 2023
d8aab62
Squashed commit of the following:
joshua-kim Aug 30, 2023
30a9b95
Merge branch 'master' into bloom-filter-gossip
joshua-kim Aug 30, 2023
601cc4d
fix unit tests
joshua-kim Aug 30, 2023
4edcf60
clean diff
joshua-kim Aug 30, 2023
dc3a6d0
clean diff
joshua-kim Aug 30, 2023
a738369
clean diff
joshua-kim Aug 30, 2023
d1485a8
update bloom filter constants
joshua-kim Aug 30, 2023
b1492e5
use proto
joshua-kim Aug 30, 2023
b749986
use 32 byte hash
joshua-kim Aug 30, 2023
0cf3989
Update gossip/gossip.go
joshua-kim Aug 31, 2023
b405041
Update plugin/evm/gossip_mempool.go
joshua-kim Aug 31, 2023
43be5e7
nits
joshua-kim Aug 31, 2023
f79dbc0
update go.mod
joshua-kim Aug 31, 2023
74a65f0
Update peer/network.go
joshua-kim Aug 31, 2023
4031484
use avalanchego logger
joshua-kim Aug 31, 2023
f06e037
fix flakey test
joshua-kim Aug 31, 2023
c031805
fix throttling in unit tests
joshua-kim Sep 1, 2023
f112839
fix throttling in unit tests
joshua-kim Sep 1, 2023
f00b4d5
address race comments
joshua-kim Sep 1, 2023
cfedffe
go get pb
joshua-kim Sep 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,19 @@ func (pool *TxPool) PendingSize() int {
return count
}

// IteratePending iterates over the [pool.pending] until [f] returns false.
func (pool *TxPool) IteratePending(f func(tx *types.Transaction) bool) {
pending := pool.Pending(true)

for _, list := range pending {
for _, tx := range list {
if !f(tx) {
return
}
}
}
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.19

require (
github.com/VictoriaMetrics/fastcache v1.10.0
github.com/ava-labs/avalanchego v1.10.9-rc.4
github.com/ava-labs/avalanchego v1.10.10-rc.0
github.com/cespare/cp v0.1.0
github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811
github.com/davecgh/go-spew v1.1.1
Expand Down Expand Up @@ -44,6 +44,7 @@ require (
golang.org/x/sys v0.8.0
golang.org/x/text v0.8.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
google.golang.org/protobuf v1.30.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

Expand Down Expand Up @@ -134,7 +135,6 @@ require (
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.10.9-rc.4 h1:vtavPfRiF6r1Zc6RV8/arEfVpe9GQsLWHbMfIWkHbMI=
github.com/ava-labs/avalanchego v1.10.9-rc.4/go.mod h1:vTBLl1zK36olfLRA7IUfdbvphWqlkuarIoXxvZTHZVw=
github.com/ava-labs/avalanchego v1.10.10-rc.0 h1:6VjkpwhAJ0tDNJK+UIUD8WIb5VelgH3w61mgk7JAkDQ=
github.com/ava-labs/avalanchego v1.10.10-rc.0/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
131 changes: 131 additions & 0 deletions gossip/bloom.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"encoding/binary"
"hash"
"time"

bloomfilter "github.com/holiman/bloomfilter/v2"
"golang.org/x/exp/rand"
)

const hashLength = 32

var _ hash.Hash64 = (*hasher)(nil)

type Hash [hashLength]byte

// NewBloomFilter returns a new instance of a bloom filter with at most
// [maxExpectedElements] elements anticipated at any moment, and a collision
// rate of [falsePositiveProbability].
func NewBloomFilter(
maxExpectedElements uint64,
falsePositiveProbability float64,
) (*BloomFilter, error) {
bloom, err := bloomfilter.NewOptimal(
maxExpectedElements,
falsePositiveProbability,
)
if err != nil {
return nil, err
}

bloomFilter := &BloomFilter{
Bloom: bloom,
Salt: randomSalt(),
}
return bloomFilter, nil
}

type BloomFilter struct {
Bloom *bloomfilter.Filter
// Salt is provided to eventually unblock collisions in Bloom. It's possible
// that conflicting Gossipable items collide in the bloom filter, so a salt
// is generated to eventually resolve collisions.
Salt Hash
}

func (b *BloomFilter) Add(gossipable Gossipable) {
h := gossipable.GetHash()
salted := hasher{
hash: h[:],
salt: b.Salt,
}
b.Bloom.Add(salted)
}

func (b *BloomFilter) Has(gossipable Gossipable) bool {
h := gossipable.GetHash()
salted := hasher{
hash: h[:],
salt: b.Salt,
}
return b.Bloom.Contains(salted)
}

// ResetBloomFilterIfNeeded resets a bloom filter if it breaches a ratio of
// filled elements. Returns true if the bloom filter was reset.
func ResetBloomFilterIfNeeded(
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
bloomFilter *BloomFilter,
maxFilledRatio float64,
) bool {
if bloomFilter.Bloom.PreciseFilledRatio() < maxFilledRatio {
return false
}

// it's not possible for this to error assuming that the original
// bloom filter's parameters were valid
fresh, _ := bloomfilter.New(bloomFilter.Bloom.M(), bloomFilter.Bloom.K())
bloomFilter.Bloom = fresh
bloomFilter.Salt = randomSalt()
return true
}

func randomSalt() Hash {
salt := Hash{}
r := rand.New(rand.NewSource(uint64(time.Now().Nanosecond())))
_, _ = r.Read(salt[:])
return salt
}

var _ hash.Hash64 = (*hasher)(nil)

type hasher struct {
hash []byte
salt Hash
}

func (h hasher) Write(p []byte) (n int, err error) {
h.hash = append(h.hash, p...)
return len(p), nil
}

func (h hasher) Sum(b []byte) []byte {
h.hash = append(h.hash, b...)
return h.hash
}

func (h hasher) Reset() {
reset := Hash{}
h.hash = reset[:]
}

func (h hasher) BlockSize() int {
return hashLength
}

func (h hasher) Sum64() uint64 {
salted := Hash{}
for i := 0; i < len(h.hash) && i < hashLength; i++ {
salted[i] = h.hash[i] ^ h.salt[i]
}

return binary.BigEndian.Uint64(salted[:])
}
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

func (h hasher) Size() int {
return len(h.hash)
}
64 changes: 64 additions & 0 deletions gossip/bloom_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip

import (
"testing"

bloomfilter "github.com/holiman/bloomfilter/v2"
"github.com/stretchr/testify/require"
)

func TestBloomFilterRefresh(t *testing.T) {
tests := []struct {
name string
refreshRatio float64
add []*testTx
expected []*testTx
}{
{
name: "no refresh",
refreshRatio: 1,
add: []*testTx{
{hash: Hash{0}},
},
expected: []*testTx{
{hash: Hash{0}},
},
},
{
name: "refresh",
refreshRatio: 0.1,
add: []*testTx{
{hash: Hash{0}},
{hash: Hash{1}},
},
expected: []*testTx{
{hash: Hash{1}},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
b, err := bloomfilter.New(10, 1)
require.NoError(err)
bloom := BloomFilter{
Bloom: b,
}

for _, item := range tt.add {
_ = ResetBloomFilterIfNeeded(&bloom, tt.refreshRatio)
bloom.Add(item)
}

require.Equal(uint64(len(tt.expected)), bloom.Bloom.N())

for _, expected := range tt.expected {
require.True(bloom.Has(expected))
}
})
}
}
141 changes: 141 additions & 0 deletions gossip/gossip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package gossip
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

import (
"context"
"time"

"github.com/ava-labs/avalanchego/utils/logging"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"

"github.com/ava-labs/coreth/gossip/proto/pb"
)

// GossipableAny exists to help create non-nil pointers to a concrete Gossipable
// ref: https://stackoverflow.com/questions/69573113/how-can-i-instantiate-a-non-nil-pointer-of-type-argument-with-generic-go
type GossipableAny[T any] interface {
*T
Gossipable
}

type Config struct {
Frequency time.Duration
PollSize int
}

func NewGossiper[T any, U GossipableAny[T]](
config Config,
log logging.Logger,
set Set[U],
client *p2p.Client,
) *Gossiper[T, U] {
return &Gossiper[T, U]{
config: config,
log: log,
set: set,
client: client,
}
}

type Gossiper[T any, U GossipableAny[T]] struct {
config Config
log logging.Logger
set Set[U]
client *p2p.Client
}

func (g *Gossiper[_, _]) Gossip(ctx context.Context) {
gossipTicker := time.NewTicker(g.config.Frequency)
defer gossipTicker.Stop()

for {
select {
case <-gossipTicker.C:
if err := g.gossip(ctx); err != nil {
g.log.Warn("failed to gossip", zap.Error(err))
}
case <-ctx.Done():
g.log.Debug("shutting down gossip")
return
}
}
}

func (g *Gossiper[_, _]) gossip(ctx context.Context) error {
bloom, salt, err := g.set.GetFilter()
if err != nil {
return err
}

request := &pb.PullGossipRequest{
Filter: bloom,
Salt: salt,
}
msgBytes, err := proto.Marshal(request)
if err != nil {
return err
}

for i := 0; i < g.config.PollSize; i++ {
if err := g.client.AppRequestAny(ctx, msgBytes, g.handleResponse); err != nil {
return err
}
}

return nil
}

func (g *Gossiper[T, U]) handleResponse(
nodeID ids.NodeID,
responseBytes []byte,
err error,
) {
if err != nil {
g.log.Debug(
"failed gossip request",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
)
return
}

response := &pb.PullGossipResponse{}
if err := proto.Unmarshal(responseBytes, response); err != nil {
g.log.Debug("failed to unmarshal gossip response", zap.Error(err))
return
}

for _, bytes := range response.Gossip {
gossipable := U(new(T))
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
if err := gossipable.Unmarshal(bytes); err != nil {
g.log.Debug(
"failed to unmarshal gossip",
zap.Stringer("nodeID", nodeID),
zap.Error(err),
)
continue
}

hash := gossipable.GetHash()
g.log.Debug(
"received gossip",
zap.Stringer("nodeID", nodeID),
zap.Binary("hash", hash[:]),
)
if err := g.set.Add(gossipable); err != nil {
g.log.Debug(
"failed to add gossip to the known set",
zap.Stringer("nodeID", nodeID),
zap.Binary("id", hash[:]),
zap.Error(err),
)
continue
}
}
}
Loading