Skip to content

Commit

Permalink
Fix failing mempool tests
Browse files Browse the repository at this point in the history
  • Loading branch information
zivkovicmilos committed Nov 16, 2024
1 parent 7c59c13 commit a179026
Showing 1 changed file with 59 additions and 60 deletions.
119 changes: 59 additions & 60 deletions tm2/pkg/bft/mempool/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package mempool
import (
"context"
"fmt"
"log/slog"
"net"
"os"
"sync"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/gnolang/gno/tm2/pkg/log"
"github.com/gnolang/gno/tm2/pkg/p2p/conn"
"github.com/gnolang/gno/tm2/pkg/p2p/events"
p2pTypes "github.com/gnolang/gno/tm2/pkg/p2p/types"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/gnolang/gno/tm2/pkg/bft/proxy"
"github.com/gnolang/gno/tm2/pkg/bft/types"
"github.com/gnolang/gno/tm2/pkg/errors"
"github.com/gnolang/gno/tm2/pkg/log"
"github.com/gnolang/gno/tm2/pkg/p2p"
p2pcfg "github.com/gnolang/gno/tm2/pkg/p2p/config"
"github.com/gnolang/gno/tm2/pkg/testutils"
Expand Down Expand Up @@ -74,12 +72,14 @@ func makeAndConnectReactors(t *testing.T, mconfig *memcfg.MempoolConfig, pconfig
}

// "Simulate" the networking layer
MakeConnectedSwitches(t, pconfig, n, options)
makeConnectedSwitches(t, pconfig, n, options)

return reactors
}

func MakeConnectedSwitches(
// makeConnectedSwitches creates a cluster of peers, with the given options.
// Used to simulate the networking layer for the specific module
func makeConnectedSwitches(
t *testing.T,
cfg *p2pcfg.P2PConfig,
n int,
Expand All @@ -91,9 +91,6 @@ func MakeConnectedSwitches(
sws = make([]*p2p.MultiplexSwitch, 0, n)
ts = make([]*p2p.MultiplexTransport, 0, n)
addrs = make([]*p2pTypes.NetAddress, 0, n)

// TODO remove
lgs = make([]*slog.Logger, 0, n)
)

// Generate the switches
Expand All @@ -120,23 +117,20 @@ func MakeConnectedSwitches(
Network: "testing",
Software: "p2ptest",
Version: "v1.2.3-rc.0-deadbeef",
Channels: []byte{0x01},
Channels: []byte{MempoolChannel},
Moniker: fmt.Sprintf("node-%d", i),
Other: p2pTypes.NodeInfoOther{
TxIndex: "off",
RPCAddress: fmt.Sprintf("127.0.0.1:%d", 0),
},
}

// TODO remove
lg := slog.New(slog.NewTextHandler(os.Stdout, nil))

// Create the multiplex transport
multiplexTransport := p2p.NewMultiplexTransport(
info,
*key,
conn.MConfigFromP2P(cfg),
lg,
log.NewNoopLogger(),
)

// Start the transport
Expand All @@ -147,12 +141,9 @@ func MakeConnectedSwitches(
})

dialAddr := multiplexTransport.NetAddress()
addrs = append(addrs, &dialAddr)

addrs = append(addrs, &dialAddr)
ts = append(ts, multiplexTransport)

// TODO remove
lgs = append(lgs, lg)
}

ctx, cancelFn := context.WithTimeout(context.Background(), 10*time.Second)
Expand All @@ -164,16 +155,9 @@ func MakeConnectedSwitches(
// Make sure the switches connect to each other.
// Set up event listeners to make sure
// the setup method blocks until switches are connected
// Create the multiplex switch
newOpts := []p2p.SwitchOption{
p2p.WithPersistentPeers(addrs),
}
opts[i] = append(opts[i], p2p.WithPersistentPeers(addrs))

newOpts = append(newOpts, opts[i]...)

multiplexSwitch := p2p.NewMultiplexSwitch(ts[i], newOpts...)

multiplexSwitch.SetLogger(lgs[i].With("sw", i)) // TODO remove
multiplexSwitch := p2p.NewMultiplexSwitch(ts[i], opts[i]...)

ch, unsubFn := multiplexSwitch.Subscribe(func(event events.Event) bool {
return event.Type() == events.PeerConnected
Expand Down Expand Up @@ -215,50 +199,70 @@ func MakeConnectedSwitches(

require.NoError(t, g.Wait())

fmt.Printf("\n\nDONE\n\n")

return sws
}

func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) {
func waitForTxsOnReactors(
t *testing.T,
txs types.Txs,
reactors []*Reactor,
) {
t.Helper()

// wait for the txs in all mempools
wg := new(sync.WaitGroup)
ctx, cancelFn := context.WithTimeout(context.Background(), 30*time.Second)
defer cancelFn()

// Wait for the txs to propagate in all mempools
var wg sync.WaitGroup

for i, reactor := range reactors {
wg.Add(1)

go func(r *Reactor, reactorIndex int) {
defer wg.Done()
waitForTxsOnReactor(t, txs, r, reactorIndex)

reapedTxs := waitForTxsOnReactor(t, ctx, len(txs), r)

for i, tx := range txs {
assert.Equalf(t, tx, reapedTxs[i],
fmt.Sprintf(
"txs at index %d on reactor %d don't match: %v vs %v",
i, reactorIndex,
tx,
reapedTxs[i],
),
)
}
}(reactor, i)
}

done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()

timer := time.After(timeout)
select {
case <-timer:
t.Fatal("Timed out waiting for txs")
case <-done:
}
wg.Wait()
}

func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) {
func waitForTxsOnReactor(
t *testing.T,
ctx context.Context,
expectedLength int,
reactor *Reactor,
) types.Txs {
t.Helper()

mempool := reactor.mempool
for mempool.Size() < len(txs) {
time.Sleep(time.Millisecond * 100)
}
var (
mempool = reactor.mempool
ticker = time.NewTicker(100 * time.Millisecond)
)

for {
select {
case <-ctx.Done():
t.Fatal("timed out waiting for txs")
case <-ticker.C:
if mempool.Size() < expectedLength {
continue
}

reapedTxs := mempool.ReapMaxTxs(len(txs))
for i, tx := range txs {
assert.Equalf(t, tx, reapedTxs[i],
"txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])
return mempool.ReapMaxTxs(expectedLength)
}
}
}

Expand All @@ -270,11 +274,6 @@ func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) {
assert.Zero(t, reactor.mempool.Size())
}

const (
numTxs = 1000
timeout = 120 * time.Second // ridiculously high because CircleCI is slow
)

func TestReactorBroadcastTxMessage(t *testing.T) {
t.Parallel()

Expand All @@ -297,7 +296,7 @@ func TestReactorBroadcastTxMessage(t *testing.T) {

// send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others
txs := checkTxs(t, reactors[0].mempool, numTxs, UnknownPeerID, true)
txs := checkTxs(t, reactors[0].mempool, 1000, UnknownPeerID, true)
waitForTxsOnReactors(t, txs, reactors)
}

Expand All @@ -316,7 +315,7 @@ func TestReactorNoBroadcastToSender(t *testing.T) {

// send a bunch of txs to the first reactor's mempool, claiming it came from peer
// ensure peer gets no txs
checkTxs(t, reactors[0].mempool, numTxs, 1, true)
checkTxs(t, reactors[0].mempool, 1000, 1, true)
ensureNoTxs(t, reactors[1], 100*time.Millisecond)
}

Expand Down

0 comments on commit a179026

Please sign in to comment.