Skip to content

Commit

Permalink
fix flakiness in TestCreateStream_SinglePairwiseConnection
Browse files Browse the repository at this point in the history
  • Loading branch information
peterargue committed Jan 31, 2025
1 parent 94e4897 commit 638abea
Showing 1 changed file with 39 additions and 20 deletions.
59 changes: 39 additions & 20 deletions network/p2p/node/libp2pNode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,30 +265,31 @@ func TestCreateStream_SinglePairwiseConnection(t *testing.T) {
p2ptest.StartNodes(t, signalerCtx, nodes)
defer p2ptest.StopNodes(t, nodes, cancel)

ctxWithTimeout, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
done := make(chan struct{})
numOfStreamsPerNode := 100 // create large number of streamChan per node per connection to ensure the resource manager does not cause starvation of resources
expectedTotalNumOfStreams := 600

// create a number of streamChan concurrently between each node
streamChan := make(chan network.Stream, expectedTotalNumOfStreams)

go createConcurrentStreams(t, ctxWithTimeout, nodes, ids, numOfStreamsPerNode, streamChan, done)
unittest.RequireCloseBefore(t, done, 5*time.Second, "could not create streamChan on time")
require.Len(t,
streamChan,
expectedTotalNumOfStreams,
fmt.Sprintf("expected %d total number of streamChan created got %d", expectedTotalNumOfStreams, len(streamChan)))
createConcurrentStreams(t, ctx, nodes, ids, numOfStreamsPerNode, streamChan, expectedTotalNumOfStreams)

// ensure only a single connection exists between all nodes
ensureSinglePairwiseConnection(t, nodes)
close(streamChan)
}

// createStreams will attempt to create n number of streams concurrently between each combination of node pairs.
func createConcurrentStreams(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode, ids flow.IdentityList, n int, streams chan network.Stream, done chan struct{}) {
defer close(done)
func createConcurrentStreams(t *testing.T, ctx context.Context, nodes []p2p.LibP2PNode, ids flow.IdentityList, n int, streams chan network.Stream, expectedTotalNumOfStreams int) {
ctx, cancel := context.WithCancel(ctx)
// cancel called below to shutdown all streams

streamHandler := func(stream network.Stream) error {
streams <- stream

// wait for the done signal to close the stream
<-ctx.Done()
return nil
}

var wg sync.WaitGroup
for _, this := range nodes {
for i, other := range nodes {
Expand All @@ -304,13 +305,7 @@ func createConcurrentStreams(t *testing.T, ctx context.Context, nodes []p2p.LibP
wg.Add(1)
go func(sender p2p.LibP2PNode) {
defer wg.Done()
err := sender.OpenAndWriteOnStream(ctx, pInfo.ID, t.Name(), func(stream network.Stream) error {
streams <- stream

// wait for the done signal to close the stream
<-ctx.Done()
return nil
})
err := sender.OpenAndWriteOnStream(ctx, pInfo.ID, t.Name(), streamHandler)
require.NoError(t, err)
}(this)
}
Expand All @@ -319,7 +314,31 @@ func createConcurrentStreams(t *testing.T, ctx context.Context, nodes []p2p.LibP
// in 2 connections 1 created by each node, this happens because we are calling CreateStream concurrently.
time.Sleep(500 * time.Millisecond)
}
unittest.RequireReturnsBefore(t, wg.Wait, 3*time.Second, "could not create streams on time")

// pause until all streams are created
unittest.RequireReturnsBefore(t, func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(10 * time.Millisecond):
}

if len(streams) >= expectedTotalNumOfStreams {
return
}
}
}, 3*time.Second, "could not create streams on time")

require.Len(t,
streams,
expectedTotalNumOfStreams,
fmt.Sprintf("expected %d total number of streamChan created got %d", expectedTotalNumOfStreams, len(streams)))

// cancel the context to trigger streams to shutdown
cancel()

unittest.RequireReturnsBefore(t, wg.Wait, 1*time.Second, "could not shutdown streams on time")
}

// ensureSinglePairwiseConnection ensure each node in the list has exactly one connection to every other node in the list.
Expand Down

0 comments on commit 638abea

Please sign in to comment.