Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

transport integration tests: add tests for resource manager #2285

Merged
merged 9 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions core/network/mocks/mock_resource_scope_span.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/network/mocks/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ package mocknetwork
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_stream_management_scope.go github.com/libp2p/go-libp2p/core/network StreamManagementScope"
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_peer_scope.go github.com/libp2p/go-libp2p/core/network PeerScope"
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_protocol_scope.go github.com/libp2p/go-libp2p/core/network ProtocolScope"
//go:generate sh -c "go run github.com/golang/mock/mockgen -package mocknetwork -destination mock_resource_scope_span.go github.com/libp2p/go-libp2p/core/network ResourceScopeSpan"
2 changes: 0 additions & 2 deletions p2p/host/basic/basic_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,6 @@ func (h *BasicHost) EventBus() event.Bus {
func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
h.Mux().AddHandler(pid, func(p protocol.ID, rwc io.ReadWriteCloser) error {
is := rwc.(network.Stream)
is.SetProtocol(p)
handler(is)
return nil
})
Expand All @@ -605,7 +604,6 @@ func (h *BasicHost) SetStreamHandler(pid protocol.ID, handler network.StreamHand
func (h *BasicHost) SetStreamHandlerMatch(pid protocol.ID, m func(protocol.ID) bool, handler network.StreamHandler) {
h.Mux().AddHandlerWithFunc(pid, m, func(p protocol.ID, rwc io.ReadWriteCloser) error {
is := rwc.(network.Stream)
is.SetProtocol(p)
handler(is)
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *Swarm) close() {

for l := range listeners {
go func(l transport.Listener) {
if err := l.Close(); err != nil {
if err := l.Close(); err != nil && err != transport.ErrListenerClosed {
log.Errorf("error when shutting down listener: %s", err)
}
}(l)
Expand Down
141 changes: 141 additions & 0 deletions p2p/test/transport/rcmgr_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package transport_integration

import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

gomock "github.com/golang/mock/gomock"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
mocknetwork "github.com/libp2p/go-libp2p/core/network/mocks"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/stretchr/testify/require"
)

func TestResourceManagerIsUsed(t *testing.T) {
for _, tc := range transportsToTest {
t.Run(tc.Name, func(t *testing.T) {
for _, testDialer := range []bool{true, false} {
t.Run(tc.Name+fmt.Sprintf(" test_dialer=%v", testDialer), func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to capture the loop variable here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so because t.Run blocks on f https://pkg.go.dev/[email protected]#T.Run. Would not be safe if we did t.Parallel in here. But I don't think we would. It would be a bit annoying to have f be returned from a func to get the closured variables in there. So I'm leaning towards leaving it, unless you prefer to change it.


var reservedMemory, releasedMemory atomic.Int32
defer func() {
require.Equal(t, reservedMemory.Load(), releasedMemory.Load())
MarcoPolo marked this conversation as resolved.
Show resolved Hide resolved
require.NotEqual(t, 0, reservedMemory.Load())
}()

ctrl := gomock.NewController(t)
defer ctrl.Finish()
rcmgr := mocknetwork.NewMockResourceManager(ctrl)
rcmgr.EXPECT().Close()

var listener, dialer host.Host
var expectedPeer peer.ID
var expectedDir network.Direction
var expectedAddr interface{}
if testDialer {
listener = tc.HostGenerator(t, TransportTestCaseOpts{NoRcmgr: true})
dialer = tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, ResourceManager: rcmgr})
expectedPeer = listener.ID()
expectedDir = network.DirOutbound
expectedAddr = listener.Addrs()[0]
} else {
listener = tc.HostGenerator(t, TransportTestCaseOpts{ResourceManager: rcmgr})
dialer = tc.HostGenerator(t, TransportTestCaseOpts{NoListen: true, NoRcmgr: true})
expectedPeer = dialer.ID()
expectedDir = network.DirInbound
expectedAddr = gomock.Any()
}

expectFd := true
if strings.Contains(tc.Name, "QUIC") || strings.Contains(tc.Name, "WebTransport") {
expectFd = false
}

peerScope := mocknetwork.NewMockPeerScope(ctrl)
peerScope.EXPECT().ReserveMemory(gomock.Any(), gomock.Any()).AnyTimes().Do(func(amount int, pri uint8) {
reservedMemory.Add(int32(amount))
})
peerScope.EXPECT().ReleaseMemory(gomock.Any()).AnyTimes().Do(func(amount int) {
releasedMemory.Add(int32(amount))
})
peerScope.EXPECT().BeginSpan().AnyTimes().DoAndReturn(func() (network.ResourceScopeSpan, error) {
s := mocknetwork.NewMockResourceScopeSpan(ctrl)
s.EXPECT().BeginSpan().AnyTimes().Return(mocknetwork.NewMockResourceScopeSpan(ctrl), nil)
// No need to track these memory reservations since we assert that Done is called
s.EXPECT().ReserveMemory(gomock.Any(), gomock.Any())
s.EXPECT().Done()
return s, nil
})
var calledSetPeer atomic.Bool

connScope := mocknetwork.NewMockConnManagementScope(ctrl)
connScope.EXPECT().SetPeer(expectedPeer).Do(func(peer.ID) {
calledSetPeer.Store(true)
})
connScope.EXPECT().PeerScope().AnyTimes().DoAndReturn(func() network.PeerScope {
if calledSetPeer.Load() {
return peerScope
}
return nil
})
connScope.EXPECT().Done()

var allStreamsDone sync.WaitGroup

rcmgr.EXPECT().OpenConnection(expectedDir, expectFd, expectedAddr).Return(connScope, nil)
rcmgr.EXPECT().OpenStream(expectedPeer, gomock.Any()).AnyTimes().DoAndReturn(func(id peer.ID, dir network.Direction) (network.StreamManagementScope, error) {
allStreamsDone.Add(1)
streamScope := mocknetwork.NewMockStreamManagementScope(ctrl)
// No need to track these memory reservations since we assert that Done is called
streamScope.EXPECT().ReserveMemory(gomock.Any(), gomock.Any()).AnyTimes()
streamScope.EXPECT().ReleaseMemory(gomock.Any()).AnyTimes()
streamScope.EXPECT().BeginSpan().AnyTimes().DoAndReturn(func() (network.ResourceScopeSpan, error) {
s := mocknetwork.NewMockResourceScopeSpan(ctrl)
s.EXPECT().BeginSpan().AnyTimes().Return(mocknetwork.NewMockResourceScopeSpan(ctrl), nil)
s.EXPECT().Done()
return s, nil
})

streamScope.EXPECT().SetService(gomock.Any()).MaxTimes(1)
streamScope.EXPECT().SetProtocol(gomock.Any())

streamScope.EXPECT().Done().Do(func() {
allStreamsDone.Done()
})
return streamScope, nil
})

require.NoError(t, dialer.Connect(context.Background(), peer.AddrInfo{
ID: listener.ID(),
Addrs: listener.Addrs(),
}))
// Wait for any in progress identifies to finish.
// We shouldn't have to do this, but basic host currently
// always does an identify.
<-dialer.(interface{ IDService() identify.IDService }).IDService().IdentifyWait(dialer.Network().ConnsToPeer(listener.ID())[0])
<-listener.(interface{ IDService() identify.IDService }).IDService().IdentifyWait(listener.Network().ConnsToPeer(dialer.ID())[0])
<-ping.Ping(context.Background(), dialer, listener.ID())
err := dialer.Network().ClosePeer(listener.ID())
require.NoError(t, err)

// Wait a bit for any pending .Adds before we call .Wait to avoid a data race.
// This shouldn't be necessary since it should be impossible
// for an OpenStream to happen *after* a ClosePeer, however
// in practice it does and leads to test flakiness.
time.Sleep(10 * time.Millisecond)
allStreamsDone.Wait()
dialer.Close()
listener.Close()
})
}
})
}
}
11 changes: 8 additions & 3 deletions p2p/test/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type TransportTestCase struct {
}

type TransportTestCaseOpts struct {
NoListen bool
NoRcmgr bool
ConnGater connmgr.ConnectionGater
NoListen bool
NoRcmgr bool
ConnGater connmgr.ConnectionGater
ResourceManager network.ResourceManager
}

func transformOpts(opts TransportTestCaseOpts) []config.Option {
Expand All @@ -45,6 +46,10 @@ func transformOpts(opts TransportTestCaseOpts) []config.Option {
if opts.ConnGater != nil {
libp2pOpts = append(libp2pOpts, libp2p.ConnectionGater(opts.ConnGater))
}

if opts.ResourceManager != nil {
libp2pOpts = append(libp2pOpts, libp2p.ResourceManager(opts.ResourceManager))
}
return libp2pOpts
}

Expand Down
4 changes: 4 additions & 0 deletions p2p/transport/websocket/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"net/http"
"strings"

"github.com/libp2p/go-libp2p/core/transport"

Expand Down Expand Up @@ -136,6 +137,9 @@ func (l *listener) Close() error {
l.server.Close()
err := l.nl.Close()
<-l.closed
if strings.Contains(err.Error(), "use of closed network connection") {
return transport.ErrListenerClosed
}
return err
}

Expand Down
5 changes: 3 additions & 2 deletions p2p/transport/webtransport/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ type conn struct {
transport *transport
session *webtransport.Session

scope network.ConnScope
scope network.ConnManagementScope
}

var _ tpt.CapableConn = &conn{}

func newConn(tr *transport, sess *webtransport.Session, sconn *connSecurityMultiaddrs, scope network.ConnScope) *conn {
func newConn(tr *transport, sess *webtransport.Session, sconn *connSecurityMultiaddrs, scope network.ConnManagementScope) *conn {
return &conn{
connSecurityMultiaddrs: sconn,
transport: tr,
Expand Down Expand Up @@ -68,6 +68,7 @@ func (c *conn) allowWindowIncrease(size uint64) bool {
// It must be called even if the peer closed the connection in order for
// garbage collection to properly work in this package.
func (c *conn) Close() error {
c.scope.Done()
c.transport.removeConn(c.session)
return c.session.CloseWithError(0, "")
}
Expand Down