Skip to content

Commit

Permalink
Add P2P SDK Pull Gossip (#318)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim authored Sep 12, 2023
1 parent 0af3265 commit 0fd55ab
Show file tree
Hide file tree
Showing 11 changed files with 767 additions and 16 deletions.
15 changes: 15 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,21 @@ func (pool *TxPool) PendingSize() int {
return count
}

// IteratePending iterates over [pool.pending] until [f] returns false.
// The caller must not modify [tx].
func (pool *TxPool) IteratePending(f func(tx *types.Transaction) bool) {
pool.mu.RLock()
defer pool.mu.RUnlock()

for _, list := range pool.pending {
for _, tx := range list.txs.items {
if !f(tx) {
return
}
}
}
}

// Locals retrieves the accounts currently considered local by the pool.
func (pool *TxPool) Locals() []common.Address {
pool.mu.Lock()
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ require (
github.com/tyler-smith/go-bip39 v1.1.0
github.com/urfave/cli/v2 v2.17.2-0.20221006022127-8f469abc00aa
go.uber.org/goleak v1.2.1
go.uber.org/mock v0.2.0
golang.org/x/crypto v0.1.0
golang.org/x/exp v0.0.0-20230206171751-46f607a40771
golang.org/x/sync v0.1.0
golang.org/x/sys v0.8.0
golang.org/x/text v0.8.0
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af
google.golang.org/protobuf v1.30.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

Expand Down Expand Up @@ -126,15 +128,13 @@ require (
go.opentelemetry.io/otel/trace v1.11.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/mock v0.2.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/term v0.7.0 // indirect
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 // indirect
google.golang.org/grpc v1.55.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/ava-labs/avalanchego v1.10.9-rc.4 h1:vtavPfRiF6r1Zc6RV8/arEfVpe9GQsLWHbMfIWkHbMI=
github.com/ava-labs/avalanchego v1.10.9-rc.4/go.mod h1:vTBLl1zK36olfLRA7IUfdbvphWqlkuarIoXxvZTHZVw=
github.com/ava-labs/avalanchego v1.10.9 h1:qxhp3YoD2Wm/iIKP6Wb1isbkUPWmIrJxWgivDoL0obM=
github.com/ava-labs/avalanchego v1.10.9/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw=
github.com/ava-labs/avalanchego v1.10.10-rc.1 h1:dPJISEWqL3tdUShe6RuB8CFuXl3rsH8617sXbLBjkIE=
github.com/ava-labs/avalanchego v1.10.10-rc.1/go.mod h1:C8R5uiltpc8MQ62ixxgODR+15mesWF0aAw3H+Qrl9Iw=
github.com/ava-labs/avalanchego v1.10.10-rc.2 h1:nlHc1JwKb5TEc9oqPU2exvOpazhxr11N2ym/LzYxv4k=
github.com/ava-labs/avalanchego v1.10.10-rc.2/go.mod h1:BN97sZppDSvIMIfEjrLTjdPTFkGLkb0ISJHEcoxMMNk=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
Expand Down
137 changes: 137 additions & 0 deletions plugin/evm/gossip_mempool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"context"
"fmt"
"sync"

"github.com/ava-labs/avalanchego/ids"
"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/network/p2p/gossip"

"github.com/ava-labs/coreth/core"
"github.com/ava-labs/coreth/core/txpool"
"github.com/ava-labs/coreth/core/types"
)

var (
_ gossip.Gossipable = (*GossipEthTx)(nil)
_ gossip.Gossipable = (*GossipAtomicTx)(nil)
_ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil)
)

type GossipAtomicTx struct {
Tx *Tx
}

func (tx *GossipAtomicTx) GetID() ids.ID {
return tx.Tx.ID()
}

func (tx *GossipAtomicTx) Marshal() ([]byte, error) {
return tx.Tx.SignedBytes(), nil
}

func (tx *GossipAtomicTx) Unmarshal(bytes []byte) error {
atomicTx, err := ExtractAtomicTx(bytes, Codec)
tx.Tx = atomicTx

return err
}

func NewGossipEthTxPool(mempool *txpool.TxPool) (*GossipEthTxPool, error) {
bloom, err := gossip.NewBloomFilter(txGossipBloomMaxItems, txGossipBloomFalsePositiveRate)
if err != nil {
return nil, fmt.Errorf("failed to initialize bloom filter: %w", err)
}

return &GossipEthTxPool{
mempool: mempool,
pendingTxs: make(chan core.NewTxsEvent),
bloom: bloom,
}, nil
}

type GossipEthTxPool struct {
mempool *txpool.TxPool
pendingTxs chan core.NewTxsEvent

bloom *gossip.BloomFilter
lock sync.RWMutex
}

func (g *GossipEthTxPool) Subscribe(ctx context.Context) {
g.mempool.SubscribeNewTxsEvent(g.pendingTxs)

for {
select {
case <-ctx.Done():
log.Debug("shutting down subscription")
return
case pendingTxs := <-g.pendingTxs:
g.lock.Lock()
for _, pendingTx := range pendingTxs.Txs {
tx := &GossipEthTx{Tx: pendingTx}
g.bloom.Add(tx)
reset, err := gossip.ResetBloomFilterIfNeeded(g.bloom, txGossipMaxFalsePositiveRate)
if err != nil {
log.Error("failed to reset bloom filter", "err", err)
continue
}

if reset {
log.Debug("resetting bloom filter", "reason", "reached max filled ratio")

g.mempool.IteratePending(func(tx *types.Transaction) bool {
g.bloom.Add(&GossipEthTx{Tx: pendingTx})
return true
})
}
}
g.lock.Unlock()
}
}
}

// Add enqueues the transaction to the mempool. Subscribe should be called
// to receive an event if tx is actually added to the mempool or not.
func (g *GossipEthTxPool) Add(tx *GossipEthTx) error {
return g.mempool.AddRemotes([]*types.Transaction{tx.Tx})[0]
}

func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) {
g.mempool.IteratePending(func(tx *types.Transaction) bool {
return f(&GossipEthTx{Tx: tx})
})
}

func (g *GossipEthTxPool) GetFilter() ([]byte, []byte, error) {
g.lock.RLock()
defer g.lock.RUnlock()

bloom, err := g.bloom.Bloom.MarshalBinary()
salt := g.bloom.Salt

return bloom, salt[:], err
}

type GossipEthTx struct {
Tx *types.Transaction
}

func (tx *GossipEthTx) GetID() ids.ID {
return ids.ID(tx.Tx.Hash())
}

func (tx *GossipEthTx) Marshal() ([]byte, error) {
return tx.Tx.MarshalBinary()
}

func (tx *GossipEthTx) Unmarshal(bytes []byte) error {
tx.Tx = &types.Transaction{}
return tx.Tx.UnmarshalBinary(bytes)
}
119 changes: 119 additions & 0 deletions plugin/evm/gossip_mempool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package evm

import (
"testing"

"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/vms/components/verify"
"github.com/stretchr/testify/require"

"github.com/ava-labs/avalanchego/ids"
)

func TestGossipAtomicTxMarshal(t *testing.T) {
require := require.New(t)

expected := &GossipAtomicTx{
Tx: &Tx{
UnsignedAtomicTx: &UnsignedImportTx{},
Creds: []verify.Verifiable{},
},
}

key0 := testKeys[0]
require.NoError(expected.Tx.Sign(Codec, [][]*secp256k1.PrivateKey{{key0}}))

bytes, err := expected.Marshal()
require.NoError(err)

actual := &GossipAtomicTx{}
require.NoError(actual.Unmarshal(bytes))

require.NoError(err)
require.Equal(expected.GetID(), actual.GetID())
}

func TestAtomicMempoolIterate(t *testing.T) {
txs := []*GossipAtomicTx{
{
Tx: &Tx{
UnsignedAtomicTx: &TestUnsignedTx{
IDV: ids.GenerateTestID(),
},
},
},
{
Tx: &Tx{
UnsignedAtomicTx: &TestUnsignedTx{
IDV: ids.GenerateTestID(),
},
},
},
}

tests := []struct {
name string
add []*GossipAtomicTx
f func(tx *GossipAtomicTx) bool
possibleValues []*GossipAtomicTx
expectedLen int
}{
{
name: "func matches nothing",
add: txs,
f: func(*GossipAtomicTx) bool {
return false
},
possibleValues: nil,
},
{
name: "func matches all",
add: txs,
f: func(*GossipAtomicTx) bool {
return true
},
possibleValues: txs,
expectedLen: 2,
},
{
name: "func matches subset",
add: txs,
f: func(tx *GossipAtomicTx) bool {
return tx.Tx == txs[0].Tx
},
possibleValues: txs,
expectedLen: 1,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require := require.New(t)
m, err := NewMempool(ids.Empty, 10)
require.NoError(err)

for _, add := range tt.add {
require.NoError(m.Add(add))
}

matches := make([]*GossipAtomicTx, 0)
f := func(tx *GossipAtomicTx) bool {
match := tt.f(tx)

if match {
matches = append(matches, tx)
}

return match
}

m.Iterate(f)

require.Len(matches, tt.expectedLen)
require.Subset(tt.possibleValues, matches)
})
}
}
8 changes: 6 additions & 2 deletions plugin/evm/gossiper_atomic_gossiping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"time"

"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow"
"github.com/ava-labs/avalanchego/utils/set"

"github.com/stretchr/testify/assert"

"github.com/ava-labs/coreth/plugin/evm/message"
Expand All @@ -22,10 +22,11 @@ import (
func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) {
assert := assert.New(t)

_, vm, _, sharedMemory, sender := GenesisVM(t, true, "", "", "")
_, vm, _, sharedMemory, sender := GenesisVM(t, false, "", "", "")
defer func() {
assert.NoError(vm.Shutdown(context.Background()))
}()
assert.NoError(vm.Connected(context.Background(), ids.GenerateTestNodeID(), nil))

// Create conflicting transactions
importTxs := createImportTxOptions(t, vm, sharedMemory)
Expand Down Expand Up @@ -56,12 +57,15 @@ func TestMempoolAtmTxsIssueTxAndGossiping(t *testing.T) {
return nil
}

assert.NoError(vm.SetState(context.Background(), snow.NormalOp))

// Optimistically gossip raw tx
assert.NoError(vm.issueTx(tx, true /*=local*/))
time.Sleep(500 * time.Millisecond)
gossipedLock.Lock()
assert.Equal(1, gossiped)
gossipedLock.Unlock()
assert.True(vm.mempool.bloom.Has(&GossipAtomicTx{Tx: tx}))

// Test hash on retry
assert.NoError(vm.gossiper.GossipAtomicTxs([]*Tx{tx}))
Expand Down
Loading

0 comments on commit 0fd55ab

Please sign in to comment.