Skip to content

Commit

Permalink
metrics wrapper: Improve tests coverage (#1392)
Browse files Browse the repository at this point in the history
* metrics wrapper: Improve tests coverage
  • Loading branch information
3vilhamster authored Nov 6, 2024
1 parent c4a8e17 commit fa329fd
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 116 deletions.
13 changes: 9 additions & 4 deletions internal/common/metrics/service_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
scopeNameListClosedWorkflowExecutions = CadenceMetricsPrefix + "ListClosedWorkflowExecutions"
scopeNameListOpenWorkflowExecutions = CadenceMetricsPrefix + "ListOpenWorkflowExecutions"
scopeNameListWorkflowExecutions = CadenceMetricsPrefix + "ListWorkflowExecutions"
scopeNameListArchivedWorkflowExecutions = CadenceMetricsPrefix + "ListArchviedExecutions"
scopeNameListArchivedWorkflowExecutions = CadenceMetricsPrefix + "ListArchivedWorkflowExecutions"
scopeNameScanWorkflowExecutions = CadenceMetricsPrefix + "ScanWorkflowExecutions"
scopeNameCountWorkflowExecutions = CadenceMetricsPrefix + "CountWorkflowExecutions"
scopeNamePollForActivityTask = CadenceMetricsPrefix + "PollForActivityTask"
Expand Down Expand Up @@ -90,6 +90,8 @@ const (
scopeNameListTaskListPartitions = CadenceMetricsPrefix + "ListTaskListPartitions"
scopeNameGetClusterInfo = CadenceMetricsPrefix + "GetClusterInfo"
scopeRefreshWorkflowTasks = CadenceMetricsPrefix + "RefreshWorkflowTasks"
scopeNameGetTaskListsByDomain = CadenceMetricsPrefix + "GetTaskListsByDomain"
scopeRestartWorkflowExecution = CadenceMetricsPrefix + "RestartWorkflowExecution"
)

// NewWorkflowServiceWrapper creates a new wrapper to WorkflowService that will emit metrics for each service call.
Expand Down Expand Up @@ -414,8 +416,11 @@ func (w *workflowServiceMetricsWrapper) GetClusterInfo(ctx context.Context, opts
return result, err
}

func (w *workflowServiceMetricsWrapper) GetTaskListsByDomain(ctx context.Context, Request *shared.GetTaskListsByDomainRequest, opts ...yarpc.CallOption) (*shared.GetTaskListsByDomainResponse, error) {
panic("implement me")
func (w *workflowServiceMetricsWrapper) GetTaskListsByDomain(ctx context.Context, request *shared.GetTaskListsByDomainRequest, opts ...yarpc.CallOption) (*shared.GetTaskListsByDomainResponse, error) {
scope := w.getOperationScope(scopeNameGetTaskListsByDomain)
result, err := w.service.GetTaskListsByDomain(ctx, request, opts...)
scope.handleError(err)
return result, err
}

func (w *workflowServiceMetricsWrapper) RefreshWorkflowTasks(ctx context.Context, request *shared.RefreshWorkflowTasksRequest, opts ...yarpc.CallOption) error {
Expand All @@ -426,7 +431,7 @@ func (w *workflowServiceMetricsWrapper) RefreshWorkflowTasks(ctx context.Context
}

func (w *workflowServiceMetricsWrapper) RestartWorkflowExecution(ctx context.Context, request *shared.RestartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.RestartWorkflowExecutionResponse, error) {
scope := w.getOperationScope(scopeRefreshWorkflowTasks)
scope := w.getOperationScope(scopeRestartWorkflowExecution)
resp, err := w.service.RestartWorkflowExecution(ctx, request, opts...)
scope.handleError(err)
return resp, err
Expand Down
209 changes: 97 additions & 112 deletions internal/common/metrics/service_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"github.com/uber/tchannel-go/thrift"
"go.uber.org/yarpc"

"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
"go.uber.org/cadence/.gen/go/cadence/workflowservicetest"
s "go.uber.org/cadence/.gen/go/shared"
)

Expand All @@ -60,144 +63,124 @@ var (
)

type testCase struct {
serviceMethod string
callArgs []interface{}
mockReturns []interface{}
serviceMethod string
callArgs []interface{}
}

type errCase struct {
err error
expectedCounters []string
}

func Test_Wrapper(t *testing.T) {
ctx, _ := thrift.NewContext(time.Minute)
tests := []testCase{
// one case for each service call
{"DeprecateDomain", []interface{}{ctx, &s.DeprecateDomainRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"DescribeDomain", []interface{}{ctx, &s.DescribeDomainRequest{}}, []interface{}{&s.DescribeDomainResponse{}, nil}, []string{CadenceRequest}},
{"GetWorkflowExecutionHistory", []interface{}{ctx, &s.GetWorkflowExecutionHistoryRequest{}}, []interface{}{&s.GetWorkflowExecutionHistoryResponse{}, nil}, []string{CadenceRequest}},
{"ListClosedWorkflowExecutions", []interface{}{ctx, &s.ListClosedWorkflowExecutionsRequest{}}, []interface{}{&s.ListClosedWorkflowExecutionsResponse{}, nil}, []string{CadenceRequest}},
{"ListOpenWorkflowExecutions", []interface{}{ctx, &s.ListOpenWorkflowExecutionsRequest{}}, []interface{}{&s.ListOpenWorkflowExecutionsResponse{}, nil}, []string{CadenceRequest}},
{"PollForActivityTask", []interface{}{ctx, &s.PollForActivityTaskRequest{}}, []interface{}{&s.PollForActivityTaskResponse{}, nil}, []string{CadenceRequest}},
{"PollForDecisionTask", []interface{}{ctx, &s.PollForDecisionTaskRequest{}}, []interface{}{&s.PollForDecisionTaskResponse{}, nil}, []string{CadenceRequest}},
{"RecordActivityTaskHeartbeat", []interface{}{ctx, &s.RecordActivityTaskHeartbeatRequest{}}, []interface{}{&s.RecordActivityTaskHeartbeatResponse{}, nil}, []string{CadenceRequest}},
{"RegisterDomain", []interface{}{ctx, &s.RegisterDomainRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RequestCancelWorkflowExecution", []interface{}{ctx, &s.RequestCancelWorkflowExecutionRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RespondActivityTaskCanceled", []interface{}{ctx, &s.RespondActivityTaskCanceledRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RespondActivityTaskCompleted", []interface{}{ctx, &s.RespondActivityTaskCompletedRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RespondActivityTaskFailed", []interface{}{ctx, &s.RespondActivityTaskFailedRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RespondActivityTaskCanceledByID", []interface{}{ctx, &s.RespondActivityTaskCanceledByIDRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RespondActivityTaskCompletedByID", []interface{}{ctx, &s.RespondActivityTaskCompletedByIDRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RespondActivityTaskFailedByID", []interface{}{ctx, &s.RespondActivityTaskFailedByIDRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"RespondDecisionTaskCompleted", []interface{}{ctx, &s.RespondDecisionTaskCompletedRequest{}}, []interface{}{nil, nil}, []string{CadenceRequest}},
{"SignalWorkflowExecution", []interface{}{ctx, &s.SignalWorkflowExecutionRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"SignalWithStartWorkflowExecution", []interface{}{ctx, &s.SignalWithStartWorkflowExecutionRequest{}}, []interface{}{&s.StartWorkflowExecutionResponse{}, nil}, []string{CadenceRequest}},
{"SignalWithStartWorkflowExecutionAsync", []interface{}{ctx, &s.SignalWithStartWorkflowExecutionAsyncRequest{}}, []interface{}{&s.SignalWithStartWorkflowExecutionAsyncResponse{}, nil}, []string{CadenceRequest}},
{"StartWorkflowExecution", []interface{}{ctx, &s.StartWorkflowExecutionRequest{}}, []interface{}{&s.StartWorkflowExecutionResponse{}, nil}, []string{CadenceRequest}},
{"StartWorkflowExecutionAsync", []interface{}{ctx, &s.StartWorkflowExecutionAsyncRequest{}}, []interface{}{&s.StartWorkflowExecutionAsyncResponse{}, nil}, []string{CadenceRequest}},
{"TerminateWorkflowExecution", []interface{}{ctx, &s.TerminateWorkflowExecutionRequest{}}, []interface{}{nil}, []string{CadenceRequest}},
{"ResetWorkflowExecution", []interface{}{ctx, &s.ResetWorkflowExecutionRequest{}}, []interface{}{&s.ResetWorkflowExecutionResponse{}, nil}, []string{CadenceRequest}},
{"UpdateDomain", []interface{}{ctx, &s.UpdateDomainRequest{}}, []interface{}{&s.UpdateDomainResponse{}, nil}, []string{CadenceRequest}},
// one case of invalid request
{"PollForActivityTask", []interface{}{ctx, &s.PollForActivityTaskRequest{}}, []interface{}{nil, &s.EntityNotExistsError{}}, []string{CadenceRequest, CadenceInvalidRequest}},
// one case of server error
{"PollForActivityTask", []interface{}{ctx, &s.PollForActivityTaskRequest{}}, []interface{}{nil, &s.InternalServiceError{}}, []string{CadenceRequest, CadenceError}},
{"QueryWorkflow", []interface{}{ctx, &s.QueryWorkflowRequest{}}, []interface{}{nil, &s.InternalServiceError{}}, []string{CadenceRequest, CadenceError}},
{"RespondQueryTaskCompleted", []interface{}{ctx, &s.RespondQueryTaskCompletedRequest{}}, []interface{}{&s.InternalServiceError{}}, []string{CadenceRequest, CadenceError}},

typeWrapper := reflect.TypeOf(&workflowServiceMetricsWrapper{})
tests := make([]testCase, 0, typeWrapper.NumMethod())
for numMethod := 0; numMethod < typeWrapper.NumMethod(); numMethod++ {
method := typeWrapper.Method(numMethod)
inputs := make([]interface{}, 0, method.Type.NumIn()-1)
inputs = append(inputs, ctx)

for i := 1; i < method.Type.NumIn(); i++ {
if method.Type.In(i).Kind() == reflect.Ptr {
inputs = append(inputs, reflect.New(method.Type.In(i).Elem()).Interface())
}
}

tests = append(tests, testCase{
serviceMethod: method.Name,
callArgs: inputs,
})
}

// run each test twice - once with the regular scope, once with a sanitized metrics scope
for _, test := range tests {
runTest(t, test, newService, assertMetrics, fmt.Sprintf("%v_normal", test.serviceMethod))
runTest(t, test, newPromService, assertPromMetrics, fmt.Sprintf("%v_prom_sanitized", test.serviceMethod))
for _, errCase := range []struct {
err error
expectedCounters []string
}{
{
err: nil,
expectedCounters: []string{CadenceRequest},
},
{
err: &s.EntityNotExistsError{},
expectedCounters: []string{CadenceRequest, CadenceInvalidRequest},
},
{
err: &s.InternalServiceError{},
expectedCounters: []string{CadenceRequest, CadenceError},
},
} {
runTest(t, test, errCase, newService, assertMetrics, fmt.Sprintf("%v_errcase_%v_normal", test.serviceMethod, errCase.err))
runTest(t, test, errCase, newPromService, assertPromMetrics, fmt.Sprintf("%v_errcase_%v_prom_sanitized", test.serviceMethod, errCase.err))
}

}
}

func runTest(
t *testing.T,
test testCase,
errCase errCase,
serviceFunc func(*testing.T) (*workflowservicetest.MockClient, workflowserviceclient.Interface, io.Closer, *CapturingStatsReporter),
validationFunc func(*testing.T, *CapturingStatsReporter, string, []string),
name string,
) {
t.Run(name, func(t *testing.T) {
t.Parallel()
// gomock mutates the returns slice, which leads to different test values between the two runs.
// copy the slice until gomock fixes it: https://github.com/golang/mock/issues/353
returns := append(make([]interface{}, 0, len(test.mockReturns)), test.mockReturns...)

mockService, wrapperService, closer, reporter := serviceFunc(t)
switch test.serviceMethod {
case "DeprecateDomain":
mockService.EXPECT().DeprecateDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "DescribeDomain":
mockService.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "GetWorkflowExecutionHistory":
mockService.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "ListClosedWorkflowExecutions":
mockService.EXPECT().ListClosedWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "ListOpenWorkflowExecutions":
mockService.EXPECT().ListOpenWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "PollForActivityTask":
mockService.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "PollForDecisionTask":
mockService.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RecordActivityTaskHeartbeat":
mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RecordActivityTaskHeartbeatByID":
mockService.EXPECT().RecordActivityTaskHeartbeatByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RegisterDomain":
mockService.EXPECT().RegisterDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RequestCancelWorkflowExecution":
mockService.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondActivityTaskCanceled":
mockService.EXPECT().RespondActivityTaskCanceled(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondActivityTaskCompleted":
mockService.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondActivityTaskFailed":
mockService.EXPECT().RespondActivityTaskFailed(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondActivityTaskCanceledByID":
mockService.EXPECT().RespondActivityTaskCanceledByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondActivityTaskCompletedByID":
mockService.EXPECT().RespondActivityTaskCompletedByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondActivityTaskFailedByID":
mockService.EXPECT().RespondActivityTaskFailedByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondDecisionTaskCompleted":
mockService.EXPECT().RespondDecisionTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "SignalWorkflowExecution":
mockService.EXPECT().SignalWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "SignalWithStartWorkflowExecution":
mockService.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "SignalWithStartWorkflowExecutionAsync":
mockService.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "StartWorkflowExecution":
mockService.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "StartWorkflowExecutionAsync":
mockService.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "TerminateWorkflowExecution":
mockService.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "ResetWorkflowExecution":
mockService.EXPECT().ResetWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "UpdateDomain":
mockService.EXPECT().UpdateDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "QueryWorkflow":
mockService.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
case "RespondQueryTaskCompleted":
mockService.EXPECT().RespondQueryTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...)
mockServiceVal := reflect.ValueOf(mockService)
method, exists := mockServiceVal.Type().MethodByName(test.serviceMethod)
require.True(t, exists, "method %s does not exists", test.serviceMethod)

expecterVals := mockServiceVal.MethodByName("EXPECT").Call(nil)
expectedMethod := expecterVals[0].MethodByName(test.serviceMethod)
mockInputs := make([]reflect.Value, 0, expectedMethod.Type().NumIn())
for inCounter := 0; inCounter < expectedMethod.Type().NumIn(); inCounter++ {
mockInputs = append(mockInputs, reflect.ValueOf(gomock.Any()))
}
callVals := expectedMethod.Call(mockInputs)

returnVals := make([]reflect.Value, 0, method.Type.NumOut())
for i := 0; i < method.Type.NumOut()-1; i++ {
output := method.Type.Out(i)
if errCase.err == nil {
returnVals = append(returnVals, reflect.New(output).Elem())
} else {
returnVals = append(returnVals, reflect.Zero(output))
}
}

if errCase.err != nil {
returnVals = append(returnVals, reflect.ValueOf(errCase.err))
} else {
returnVals = append(returnVals, nilError)
}

callVals[0].MethodByName("Return").Call(returnVals)

callOption := yarpc.CallOption{}
inputs := make([]reflect.Value, len(test.callArgs))
for i, arg := range test.callArgs {
inputs[i] = reflect.ValueOf(arg)
}
inputs = append(inputs, reflect.ValueOf(callOption))
method := reflect.ValueOf(wrapperService).MethodByName(test.serviceMethod)
method.Call(inputs)
actualMethod := reflect.ValueOf(wrapperService).MethodByName(test.serviceMethod)
methodReturnVals := actualMethod.Call(inputs)
err := methodReturnVals[len(methodReturnVals)-1].Interface()
if errCase.err != nil {
assert.ErrorIs(t, err.(error), errCase.err)
} else {
assert.Nil(t, err, "error must be nil")
}
require.NoError(t, closer.Close())
validationFunc(t, reporter, test.serviceMethod, test.expectedCounters)
validationFunc(t, reporter, test.serviceMethod, errCase.expectedCounters)
})
}

func assertMetrics(t *testing.T, reporter *CapturingStatsReporter, methodName string, counterNames []string) {
require.Equal(t, len(counterNames), len(reporter.counts))
assert.Equal(t, len(counterNames), len(reporter.counts), "expected %v counters, got %v", counterNames, reporter.counts)
for _, name := range counterNames {
counterName := CadenceMetricsPrefix + methodName + "." + name
find := false
Expand All @@ -208,14 +191,14 @@ func assertMetrics(t *testing.T, reporter *CapturingStatsReporter, methodName st
break
}
}
require.True(t, find)
assert.True(t, find, "counter %v not found in counters %v", counterName, reporter.counts)
}
require.Equal(t, 1, len(reporter.timers))
require.Equal(t, CadenceMetricsPrefix+methodName+"."+CadenceLatency, reporter.timers[0].name)
assert.Equal(t, 1, len(reporter.timers), "expected 1 timer, got %v", len(reporter.timers))
assert.Equal(t, CadenceMetricsPrefix+methodName+"."+CadenceLatency, reporter.timers[0].name, "expected timer %v, got %v", CadenceMetricsPrefix+methodName+"."+CadenceLatency, reporter.timers[0].name)
}

func assertPromMetrics(t *testing.T, reporter *CapturingStatsReporter, methodName string, counterNames []string) {
require.Equal(t, len(counterNames), len(reporter.counts))
assert.Equal(t, len(counterNames), len(reporter.counts), "expected %v counters, got %v", counterNames, reporter.counts)
for _, name := range counterNames {
counterName := makePromCompatible(CadenceMetricsPrefix + methodName + "." + name)
find := false
Expand All @@ -226,11 +209,11 @@ func assertPromMetrics(t *testing.T, reporter *CapturingStatsReporter, methodNam
break
}
}
require.True(t, find)
assert.True(t, find, "counter %v not found in counters %v", counterName, reporter.counts)
}
require.Equal(t, 1, len(reporter.timers))
assert.Equal(t, 1, len(reporter.timers), "expected 1 timer, got %v", len(reporter.timers))
expected := makePromCompatible(CadenceMetricsPrefix + methodName + "." + CadenceLatency)
require.Equal(t, expected, reporter.timers[0].name)
assert.Equal(t, expected, reporter.timers[0].name, "expected timer %v, got %v", expected, reporter.timers[0].name)
}

func makePromCompatible(name string) string {
Expand Down Expand Up @@ -276,3 +259,5 @@ func newPromScope(isReplay *bool) (tally.Scope, io.Closer, *CapturingStatsReport
scope, closer := tally.NewRootScope(opts, time.Second)
return WrapScope(isReplay, scope, &realClock{}), closer, reporter
}

var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())

0 comments on commit fa329fd

Please sign in to comment.