Skip to content

Commit

Permalink
Merge pull request #6917 from onflow/petera/fix-access-connect-test
Browse files Browse the repository at this point in the history
[Access] Fix access connection tests for latest version of testing library
  • Loading branch information
peterargue authored Jan 17, 2025
2 parents 48984c8 + e075836 commit a3c2cea
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 59 deletions.
67 changes: 20 additions & 47 deletions engine/access/rpc/connection/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestProxyAccessAPI(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -75,15 +75,15 @@ func TestProxyAccessAPI(t *testing.T) {
// make the call to the collection node
resp, err := client.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyExecutionAPI(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -124,15 +124,15 @@ func TestProxyExecutionAPI(t *testing.T) {
// make the call to the execution node
resp, err := client.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyAccessAPIConnectionReuse(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -186,15 +186,15 @@ func TestProxyAccessAPIConnectionReuse(t *testing.T) {
ctx := context.Background()
resp, err := accessAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

func TestProxyExecutionAPIConnectionReuse(t *testing.T) {
logger := unittest.Logger()
metrics := metrics.NewNoopCollector()

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -248,7 +248,7 @@ func TestProxyExecutionAPIConnectionReuse(t *testing.T) {
ctx := context.Background()
resp, err := executionAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

// TestExecutionNodeClientTimeout tests that the execution API client times out after the timeout duration
Expand All @@ -259,7 +259,7 @@ func TestExecutionNodeClientTimeout(t *testing.T) {
timeout := 10 * time.Millisecond

// create an execution node
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -316,7 +316,7 @@ func TestCollectionNodeClientTimeout(t *testing.T) {
timeout := 10 * time.Millisecond

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -371,31 +371,14 @@ func TestConnectionPoolFull(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn1, cn2, cn3 := new(collectionNode), new(collectionNode), new(collectionNode)
cn1, cn2, cn3 := newCollectionNode(t), newCollectionNode(t), newCollectionNode(t)
cn1.start(t)
cn2.start(t)
cn3.start(t)
defer cn1.stop(t)
defer cn2.stop(t)
defer cn3.stop(t)

expected := &access.PingResponse{}
cn1.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)
cn2.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)
cn3.handler.
On("Ping",
testifymock.Anything,
testifymock.AnythingOfType("*access.PingRequest")).
Return(expected, nil)

// create the factory
connectionFactory := new(ConnectionFactoryImpl)
// set the collection grpc port
Expand Down Expand Up @@ -467,7 +450,7 @@ func TestConnectionPoolStale(t *testing.T) {
metrics := metrics.NewNoopCollector()

// create a collection node
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -534,7 +517,7 @@ func TestConnectionPoolStale(t *testing.T) {
ctx = context.Background()
resp, err := accessAPIClient.Ping(ctx, req)
assert.NoError(t, err)
assert.Equal(t, resp, expected)
assert.IsType(t, expected, resp)
}

// TestExecutionNodeClientClosedGracefully tests the scenario where the execution node client is closed gracefully.
Expand All @@ -550,7 +533,7 @@ func TestExecutionNodeClientClosedGracefully(t *testing.T) {

// Add createExecNode function to recreate it each time for rapid test
createExecNode := func() (*executionNode, func()) {
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
return en, func() {
en.stop(t)
Expand Down Expand Up @@ -650,7 +633,7 @@ func TestEvictingCacheClients(t *testing.T) {
metrics := metrics.NewNoopCollector()

// Create a new collection node for testing
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand All @@ -674,10 +657,6 @@ func TestEvictingCacheClients(t *testing.T) {
func(context.Context, *access.PingRequest) error { return nil },
)

netReq := &access.GetNetworkParametersRequest{}
netResp := &access.GetNetworkParametersResponse{}
cn.handler.On("GetNetworkParameters", testifymock.Anything, netReq).Return(netResp, nil)

// Create the connection factory
connectionFactory := new(ConnectionFactoryImpl)
// Set the gRPC port
Expand Down Expand Up @@ -740,7 +719,7 @@ func TestEvictingCacheClients(t *testing.T) {
}, 100*time.Millisecond, 10*time.Millisecond, "client timed out closing connection")

// Call a gRPC method on the client, requests should be blocked since the connection is invalidated
resp, err := client.GetNetworkParameters(ctx, netReq)
resp, err := client.GetNetworkParameters(ctx, &access.GetNetworkParametersRequest{})
assert.Equal(t, status.Errorf(codes.Unavailable, "the connection to %s was closed", clientAddress), err)
assert.Nil(t, resp)

Expand All @@ -749,9 +728,7 @@ func TestEvictingCacheClients(t *testing.T) {

// Call a gRPC method on the client
_, err = client.Ping(ctx, pingReq)
// Check that Ping was called
cn.handler.AssertCalled(t, "Ping", testifymock.Anything, pingReq)
assert.NoError(t, err)
require.NoError(t, err)

// Wait for the client connection to change state from "Ready" to "Shutdown" as connection was closed.
require.Eventually(t, func() bool {
Expand All @@ -770,7 +747,7 @@ func TestConcurrentConnections(t *testing.T) {

// Add createExecNode function to recreate it each time for rapid test
createExecNode := func() (*executionNode, func()) {
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
return en, func() {
en.stop(t)
Expand Down Expand Up @@ -886,7 +863,7 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {
circuitBreakerRestoreTimeout := 1500 * time.Millisecond

// Create an execution node for testing.
en := new(executionNode)
en := newExecutionNode(t)
en.start(t)
defer en.stop(t)

Expand Down Expand Up @@ -934,8 +911,6 @@ func TestCircuitBreakerExecutionNode(t *testing.T) {

// Make the call to the execution node.
_, err = client.Ping(ctx, req)
en.handler.AssertCalled(t, "Ping", testifymock.Anything, req)

return time.Since(start), err
}

Expand Down Expand Up @@ -1005,7 +980,7 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {
circuitBreakerRestoreTimeout := 1500 * time.Millisecond

// Create a collection node for testing.
cn := new(collectionNode)
cn := newCollectionNode(t)
cn.start(t)
defer cn.stop(t)

Expand Down Expand Up @@ -1053,8 +1028,6 @@ func TestCircuitBreakerCollectionNode(t *testing.T) {

// Make the call to the collection node.
_, err = client.Ping(ctx, req)
cn.handler.AssertCalled(t, "Ping", testifymock.Anything, req)

return time.Since(start), err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func BenchmarkWithDeflateCompression(b *testing.B) {
// runBenchmark is a helper function that performs the benchmarking for different compressors.
func runBenchmark(b *testing.B, compressorName string) {
// create an execution node
en := new(executionNode)
en := newExecutionNode(b)
en.start(b)
defer en.stop(b)

Expand Down
38 changes: 27 additions & 11 deletions engine/access/rpc/connection/node_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,19 @@ type executionNode struct {
handler *mock.ExecutionAPIServer
}

func newExecutionNode(tb testing.TB) *executionNode {
return &executionNode{
handler: mock.NewExecutionAPIServer(tb),
}
}

func (en *executionNode) start(tb testing.TB) {
if en.handler == nil {
tb.Fatalf("executionNode must be initialized using newExecutionNode")
}

en.setupNode(tb)
handler := new(mock.ExecutionAPIServer)
execution.RegisterExecutionAPIServer(en.server, handler)
en.handler = handler
execution.RegisterExecutionAPIServer(en.server, en.handler)
en.node.start(tb)
}

Expand All @@ -81,14 +89,22 @@ type collectionNode struct {
handler *mock.AccessAPIServer
}

func (cn *collectionNode) start(t *testing.T) {
cn.setupNode(t)
handler := new(mock.AccessAPIServer)
access.RegisterAccessAPIServer(cn.server, handler)
cn.handler = handler
cn.node.start(t)
func newCollectionNode(tb testing.TB) *collectionNode {
return &collectionNode{
handler: mock.NewAccessAPIServer(tb),
}
}

func (cn *collectionNode) start(tb testing.TB) {
if cn.handler == nil {
tb.Fatalf("collectionNode must be initialized using newCollectionNode")
}

cn.setupNode(tb)
access.RegisterAccessAPIServer(cn.server, cn.handler)
cn.node.start(tb)
}

func (cn *collectionNode) stop(t *testing.T) {
cn.node.stop(t)
func (cn *collectionNode) stop(tb testing.TB) {
cn.node.stop(tb)
}

0 comments on commit a3c2cea

Please sign in to comment.