Skip to content

Commit

Permalink
tests: extend TestP2PRelay logging (algorand#6048)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jul 1, 2024
1 parent 4ba009b commit 3bcfedb
Showing 1 changed file with 32 additions and 16 deletions.
48 changes: 32 additions & 16 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,13 +785,20 @@ func TestP2PHTTPHandler(t *testing.T) {
require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout)
}

// TestP2PRelay checks p2p nodes can properly relay messages:
// netA and netB are started with ForceFetchTransactions so it subscribes to the txn topic,
// both of them are connected and do not relay messages.
// Later, netB is forced to relay messages and netC is started with a listening address set
// so that it relays messages as well.
// The test checks messages from both netB and netC are received by netA.
func TestP2PRelay(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
cfg.ForceFetchTransactions = true
log := logging.TestingLog(t)
netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
log.Debugln("Starting netA")
netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)

err = netA.Start()
Expand All @@ -806,7 +813,8 @@ func TestP2PRelay(t *testing.T) {
multiAddrStr := addrsA[0].String()
phoneBookAddresses := []string{multiAddrStr}

netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses)
netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
err = netB.Start()
require.NoError(t, err)
Expand All @@ -826,8 +834,7 @@ func TestP2PRelay(t *testing.T) {
return netA.hasPeers() && netB.hasPeers()
}, 2*time.Second, 50*time.Millisecond)

makeCounterHandler := func(numExpected int) ([]TaggedMessageProcessor, *atomic.Uint32, chan struct{}) {
var numActual atomic.Uint32
makeCounterHandler := func(numExpected int, counter *atomic.Uint32, msgs *[][]byte) ([]TaggedMessageProcessor, chan struct{}) {
counterDone := make(chan struct{})
counterHandler := []TaggedMessageProcessor{
{
Expand All @@ -837,20 +844,24 @@ func TestP2PRelay(t *testing.T) {
ProcessorHandleFunc
}{
ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage {
return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil}
return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: msg.Data}
}),
ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage {
if count := numActual.Add(1); int(count) >= numExpected {
if msgs != nil {
*msgs = append(*msgs, msg.ValidatedMessage.([]byte))
}
if count := counter.Add(1); int(count) >= numExpected {
close(counterDone)
}
return OutgoingMessage{Action: Ignore}
}),
},
},
}
return counterHandler, &numActual, counterDone
return counterHandler, counterDone
}
counterHandler, _, counterDone := makeCounterHandler(1)
var counter atomic.Uint32
counterHandler, counterDone := makeCounterHandler(1, &counter, nil)
netA.RegisterProcessors(counterHandler)

// send 5 messages from both netB to netA
Expand All @@ -866,10 +877,11 @@ func TestP2PRelay(t *testing.T) {
case <-time.After(1 * time.Second):
}

// add netC with listening address set, and enable relaying on netB
// ensure all messages are received by netA
// add a netC with listening address set and enable relaying on netB
// ensure all messages from netB and netC are received by netA
cfg.NetAddress = "127.0.0.1:0"
netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses)
netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{})
require.NoError(t, err)
err = netC.Start()
require.NoError(t, err)
Expand All @@ -893,28 +905,32 @@ func TestP2PRelay(t *testing.T) {
}, 2*time.Second, 50*time.Millisecond)

const expectedMsgs = 10
counterHandler, count, counterDone := makeCounterHandler(expectedMsgs)
counter.Store(0)
var loggedMsgs [][]byte
counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs)
netA.ClearProcessors()
netA.RegisterProcessors(counterHandler)

for i := 0; i < expectedMsgs/2; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{5, 6, 7, byte(i)}, true, nil)
require.NoError(t, err)
err = netC.Relay(context.Background(), protocol.TxnTag, []byte{11, 12, 10 + byte(i), 14}, true, nil)
require.NoError(t, err)
}
// send some duplicate messages, they should be dropped
for i := 0; i < expectedMsgs/2; i++ {
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil)
err := netB.Relay(context.Background(), protocol.TxnTag, []byte{5, 6, 7, byte(i)}, true, nil)
require.NoError(t, err)
}

select {
case <-counterDone:
case <-time.After(2 * time.Second):
if c := count.Load(); c < expectedMsgs {
case <-time.After(3 * time.Second):
if c := counter.Load(); c < expectedMsgs {
t.Logf("Logged messages: %v", loggedMsgs)
require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, c)
} else if c > expectedMsgs {
t.Logf("Logged messages: %v", loggedMsgs)
require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, c)
}
}
Expand Down

0 comments on commit 3bcfedb

Please sign in to comment.