diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 3548dbd1cb..7c94be98e4 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -785,13 +785,20 @@ func TestP2PHTTPHandler(t *testing.T) { require.ErrorIs(t, err, limitcaller.ErrConnectionQueueingTimeout) } +// TestP2PRelay checks p2p nodes can properly relay messages: +// netA and netB are started with ForceFetchTransactions so it subscribes to the txn topic, +// both of them are connected and do not relay messages. +// Later, netB is forced to relay messages and netC is started with a listening address set +// so that it relays messages as well. +// The test checks messages from both netB and netC are received by netA. func TestP2PRelay(t *testing.T) { partitiontest.PartitionTest(t) cfg := config.GetDefaultLocal() cfg.ForceFetchTransactions = true log := logging.TestingLog(t) - netA, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) + log.Debugln("Starting netA") + netA, err := NewP2PNetwork(log.With("net", "netA"), cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) err = netA.Start() @@ -806,7 +813,8 @@ func TestP2PRelay(t *testing.T) { multiAddrStr := addrsA[0].String() phoneBookAddresses := []string{multiAddrStr} - netB, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses) + netB, err := NewP2PNetwork(log.With("net", "netB"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) err = netB.Start() require.NoError(t, err) @@ -826,8 +834,7 @@ func TestP2PRelay(t *testing.T) { return netA.hasPeers() && netB.hasPeers() }, 2*time.Second, 50*time.Millisecond) - makeCounterHandler := func(numExpected int) ([]TaggedMessageProcessor, *atomic.Uint32, chan struct{}) { - var numActual atomic.Uint32 + makeCounterHandler := func(numExpected int, counter *atomic.Uint32, msgs *[][]byte) ([]TaggedMessageProcessor, chan struct{}) { counterDone := make(chan struct{}) counterHandler := []TaggedMessageProcessor{ { @@ -837,10 +844,13 @@ func TestP2PRelay(t *testing.T) { ProcessorHandleFunc }{ ProcessorValidateFunc(func(msg IncomingMessage) ValidatedMessage { - return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: nil} + return ValidatedMessage{Action: Accept, Tag: msg.Tag, ValidatedMessage: msg.Data} }), ProcessorHandleFunc(func(msg ValidatedMessage) OutgoingMessage { - if count := numActual.Add(1); int(count) >= numExpected { + if msgs != nil { + *msgs = append(*msgs, msg.ValidatedMessage.([]byte)) + } + if count := counter.Add(1); int(count) >= numExpected { close(counterDone) } return OutgoingMessage{Action: Ignore} @@ -848,9 +858,10 @@ func TestP2PRelay(t *testing.T) { }, }, } - return counterHandler, &numActual, counterDone + return counterHandler, counterDone } - counterHandler, _, counterDone := makeCounterHandler(1) + var counter atomic.Uint32 + counterHandler, counterDone := makeCounterHandler(1, &counter, nil) netA.RegisterProcessors(counterHandler) // send 5 messages from both netB to netA @@ -866,10 +877,11 @@ func TestP2PRelay(t *testing.T) { case <-time.After(1 * time.Second): } - // add netC with listening address set, and enable relaying on netB - // ensure all messages are received by netA + // add a netC with listening address set and enable relaying on netB + // ensure all messages from netB and netC are received by netA cfg.NetAddress = "127.0.0.1:0" - netC, err := NewP2PNetwork(log, cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) + log.Debugf("Starting netB with phonebook addresses %v", phoneBookAddresses) + netC, err := NewP2PNetwork(log.With("net", "netC"), cfg, "", phoneBookAddresses, genesisID, config.Devtestnet, &nopeNodeInfo{}) require.NoError(t, err) err = netC.Start() require.NoError(t, err) @@ -893,28 +905,32 @@ func TestP2PRelay(t *testing.T) { }, 2*time.Second, 50*time.Millisecond) const expectedMsgs = 10 - counterHandler, count, counterDone := makeCounterHandler(expectedMsgs) + counter.Store(0) + var loggedMsgs [][]byte + counterHandler, counterDone = makeCounterHandler(expectedMsgs, &counter, &loggedMsgs) netA.ClearProcessors() netA.RegisterProcessors(counterHandler) for i := 0; i < expectedMsgs/2; i++ { - err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil) + err := netB.Relay(context.Background(), protocol.TxnTag, []byte{5, 6, 7, byte(i)}, true, nil) require.NoError(t, err) err = netC.Relay(context.Background(), protocol.TxnTag, []byte{11, 12, 10 + byte(i), 14}, true, nil) require.NoError(t, err) } // send some duplicate messages, they should be dropped for i := 0; i < expectedMsgs/2; i++ { - err := netB.Relay(context.Background(), protocol.TxnTag, []byte{1, 2, 3, byte(i)}, true, nil) + err := netB.Relay(context.Background(), protocol.TxnTag, []byte{5, 6, 7, byte(i)}, true, nil) require.NoError(t, err) } select { case <-counterDone: - case <-time.After(2 * time.Second): - if c := count.Load(); c < expectedMsgs { + case <-time.After(3 * time.Second): + if c := counter.Load(); c < expectedMsgs { + t.Logf("Logged messages: %v", loggedMsgs) require.Failf(t, "One or more messages failed to reach destination network", "%d > %d", expectedMsgs, c) } else if c > expectedMsgs { + t.Logf("Logged messages: %v", loggedMsgs) require.Failf(t, "One or more messages that were expected to be dropped, reached destination network", "%d < %d", expectedMsgs, c) } }