diff --git a/internal/common/metrics/service_wrapper.go b/internal/common/metrics/service_wrapper.go index a5d38933b..5a663598a 100644 --- a/internal/common/metrics/service_wrapper.go +++ b/internal/common/metrics/service_wrapper.go @@ -56,7 +56,7 @@ const ( scopeNameListClosedWorkflowExecutions = CadenceMetricsPrefix + "ListClosedWorkflowExecutions" scopeNameListOpenWorkflowExecutions = CadenceMetricsPrefix + "ListOpenWorkflowExecutions" scopeNameListWorkflowExecutions = CadenceMetricsPrefix + "ListWorkflowExecutions" - scopeNameListArchivedWorkflowExecutions = CadenceMetricsPrefix + "ListArchviedExecutions" + scopeNameListArchivedWorkflowExecutions = CadenceMetricsPrefix + "ListArchivedWorkflowExecutions" scopeNameScanWorkflowExecutions = CadenceMetricsPrefix + "ScanWorkflowExecutions" scopeNameCountWorkflowExecutions = CadenceMetricsPrefix + "CountWorkflowExecutions" scopeNamePollForActivityTask = CadenceMetricsPrefix + "PollForActivityTask" @@ -90,6 +90,8 @@ const ( scopeNameListTaskListPartitions = CadenceMetricsPrefix + "ListTaskListPartitions" scopeNameGetClusterInfo = CadenceMetricsPrefix + "GetClusterInfo" scopeRefreshWorkflowTasks = CadenceMetricsPrefix + "RefreshWorkflowTasks" + scopeNameGetTaskListsByDomain = CadenceMetricsPrefix + "GetTaskListsByDomain" + scopeRestartWorkflowExecution = CadenceMetricsPrefix + "RestartWorkflowExecution" ) // NewWorkflowServiceWrapper creates a new wrapper to WorkflowService that will emit metrics for each service call. @@ -414,8 +416,11 @@ func (w *workflowServiceMetricsWrapper) GetClusterInfo(ctx context.Context, opts return result, err } -func (w *workflowServiceMetricsWrapper) GetTaskListsByDomain(ctx context.Context, Request *shared.GetTaskListsByDomainRequest, opts ...yarpc.CallOption) (*shared.GetTaskListsByDomainResponse, error) { - panic("implement me") +func (w *workflowServiceMetricsWrapper) GetTaskListsByDomain(ctx context.Context, request *shared.GetTaskListsByDomainRequest, opts ...yarpc.CallOption) (*shared.GetTaskListsByDomainResponse, error) { + scope := w.getOperationScope(scopeNameGetTaskListsByDomain) + result, err := w.service.GetTaskListsByDomain(ctx, request, opts...) + scope.handleError(err) + return result, err } func (w *workflowServiceMetricsWrapper) RefreshWorkflowTasks(ctx context.Context, request *shared.RefreshWorkflowTasksRequest, opts ...yarpc.CallOption) error { @@ -426,7 +431,7 @@ func (w *workflowServiceMetricsWrapper) RefreshWorkflowTasks(ctx context.Context } func (w *workflowServiceMetricsWrapper) RestartWorkflowExecution(ctx context.Context, request *shared.RestartWorkflowExecutionRequest, opts ...yarpc.CallOption) (*shared.RestartWorkflowExecutionResponse, error) { - scope := w.getOperationScope(scopeRefreshWorkflowTasks) + scope := w.getOperationScope(scopeRestartWorkflowExecution) resp, err := w.service.RestartWorkflowExecution(ctx, request, opts...) scope.handleError(err) return resp, err diff --git a/internal/common/metrics/service_wrapper_test.go b/internal/common/metrics/service_wrapper_test.go index 6cbc2d4b1..0ec81c6e5 100644 --- a/internal/common/metrics/service_wrapper_test.go +++ b/internal/common/metrics/service_wrapper_test.go @@ -28,6 +28,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + + "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/uber-go/tally" @@ -35,7 +39,6 @@ import ( "go.uber.org/yarpc" "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" - "go.uber.org/cadence/.gen/go/cadence/workflowservicetest" s "go.uber.org/cadence/.gen/go/shared" ) @@ -60,144 +63,124 @@ var ( ) type testCase struct { - serviceMethod string - callArgs []interface{} - mockReturns []interface{} + serviceMethod string + callArgs []interface{} +} + +type errCase struct { + err error expectedCounters []string } func Test_Wrapper(t *testing.T) { ctx, _ := thrift.NewContext(time.Minute) - tests := []testCase{ - // one case for each service call - {"DeprecateDomain", []interface{}{ctx, &s.DeprecateDomainRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"DescribeDomain", []interface{}{ctx, &s.DescribeDomainRequest{}}, []interface{}{&s.DescribeDomainResponse{}, nil}, []string{CadenceRequest}}, - {"GetWorkflowExecutionHistory", []interface{}{ctx, &s.GetWorkflowExecutionHistoryRequest{}}, []interface{}{&s.GetWorkflowExecutionHistoryResponse{}, nil}, []string{CadenceRequest}}, - {"ListClosedWorkflowExecutions", []interface{}{ctx, &s.ListClosedWorkflowExecutionsRequest{}}, []interface{}{&s.ListClosedWorkflowExecutionsResponse{}, nil}, []string{CadenceRequest}}, - {"ListOpenWorkflowExecutions", []interface{}{ctx, &s.ListOpenWorkflowExecutionsRequest{}}, []interface{}{&s.ListOpenWorkflowExecutionsResponse{}, nil}, []string{CadenceRequest}}, - {"PollForActivityTask", []interface{}{ctx, &s.PollForActivityTaskRequest{}}, []interface{}{&s.PollForActivityTaskResponse{}, nil}, []string{CadenceRequest}}, - {"PollForDecisionTask", []interface{}{ctx, &s.PollForDecisionTaskRequest{}}, []interface{}{&s.PollForDecisionTaskResponse{}, nil}, []string{CadenceRequest}}, - {"RecordActivityTaskHeartbeat", []interface{}{ctx, &s.RecordActivityTaskHeartbeatRequest{}}, []interface{}{&s.RecordActivityTaskHeartbeatResponse{}, nil}, []string{CadenceRequest}}, - {"RegisterDomain", []interface{}{ctx, &s.RegisterDomainRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RequestCancelWorkflowExecution", []interface{}{ctx, &s.RequestCancelWorkflowExecutionRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RespondActivityTaskCanceled", []interface{}{ctx, &s.RespondActivityTaskCanceledRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RespondActivityTaskCompleted", []interface{}{ctx, &s.RespondActivityTaskCompletedRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RespondActivityTaskFailed", []interface{}{ctx, &s.RespondActivityTaskFailedRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RespondActivityTaskCanceledByID", []interface{}{ctx, &s.RespondActivityTaskCanceledByIDRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RespondActivityTaskCompletedByID", []interface{}{ctx, &s.RespondActivityTaskCompletedByIDRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RespondActivityTaskFailedByID", []interface{}{ctx, &s.RespondActivityTaskFailedByIDRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"RespondDecisionTaskCompleted", []interface{}{ctx, &s.RespondDecisionTaskCompletedRequest{}}, []interface{}{nil, nil}, []string{CadenceRequest}}, - {"SignalWorkflowExecution", []interface{}{ctx, &s.SignalWorkflowExecutionRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"SignalWithStartWorkflowExecution", []interface{}{ctx, &s.SignalWithStartWorkflowExecutionRequest{}}, []interface{}{&s.StartWorkflowExecutionResponse{}, nil}, []string{CadenceRequest}}, - {"SignalWithStartWorkflowExecutionAsync", []interface{}{ctx, &s.SignalWithStartWorkflowExecutionAsyncRequest{}}, []interface{}{&s.SignalWithStartWorkflowExecutionAsyncResponse{}, nil}, []string{CadenceRequest}}, - {"StartWorkflowExecution", []interface{}{ctx, &s.StartWorkflowExecutionRequest{}}, []interface{}{&s.StartWorkflowExecutionResponse{}, nil}, []string{CadenceRequest}}, - {"StartWorkflowExecutionAsync", []interface{}{ctx, &s.StartWorkflowExecutionAsyncRequest{}}, []interface{}{&s.StartWorkflowExecutionAsyncResponse{}, nil}, []string{CadenceRequest}}, - {"TerminateWorkflowExecution", []interface{}{ctx, &s.TerminateWorkflowExecutionRequest{}}, []interface{}{nil}, []string{CadenceRequest}}, - {"ResetWorkflowExecution", []interface{}{ctx, &s.ResetWorkflowExecutionRequest{}}, []interface{}{&s.ResetWorkflowExecutionResponse{}, nil}, []string{CadenceRequest}}, - {"UpdateDomain", []interface{}{ctx, &s.UpdateDomainRequest{}}, []interface{}{&s.UpdateDomainResponse{}, nil}, []string{CadenceRequest}}, - // one case of invalid request - {"PollForActivityTask", []interface{}{ctx, &s.PollForActivityTaskRequest{}}, []interface{}{nil, &s.EntityNotExistsError{}}, []string{CadenceRequest, CadenceInvalidRequest}}, - // one case of server error - {"PollForActivityTask", []interface{}{ctx, &s.PollForActivityTaskRequest{}}, []interface{}{nil, &s.InternalServiceError{}}, []string{CadenceRequest, CadenceError}}, - {"QueryWorkflow", []interface{}{ctx, &s.QueryWorkflowRequest{}}, []interface{}{nil, &s.InternalServiceError{}}, []string{CadenceRequest, CadenceError}}, - {"RespondQueryTaskCompleted", []interface{}{ctx, &s.RespondQueryTaskCompletedRequest{}}, []interface{}{&s.InternalServiceError{}}, []string{CadenceRequest, CadenceError}}, + + typeWrapper := reflect.TypeOf(&workflowServiceMetricsWrapper{}) + tests := make([]testCase, 0, typeWrapper.NumMethod()) + for numMethod := 0; numMethod < typeWrapper.NumMethod(); numMethod++ { + method := typeWrapper.Method(numMethod) + inputs := make([]interface{}, 0, method.Type.NumIn()-1) + inputs = append(inputs, ctx) + + for i := 1; i < method.Type.NumIn(); i++ { + if method.Type.In(i).Kind() == reflect.Ptr { + inputs = append(inputs, reflect.New(method.Type.In(i).Elem()).Interface()) + } + } + + tests = append(tests, testCase{ + serviceMethod: method.Name, + callArgs: inputs, + }) } // run each test twice - once with the regular scope, once with a sanitized metrics scope for _, test := range tests { - runTest(t, test, newService, assertMetrics, fmt.Sprintf("%v_normal", test.serviceMethod)) - runTest(t, test, newPromService, assertPromMetrics, fmt.Sprintf("%v_prom_sanitized", test.serviceMethod)) + for _, errCase := range []struct { + err error + expectedCounters []string + }{ + { + err: nil, + expectedCounters: []string{CadenceRequest}, + }, + { + err: &s.EntityNotExistsError{}, + expectedCounters: []string{CadenceRequest, CadenceInvalidRequest}, + }, + { + err: &s.InternalServiceError{}, + expectedCounters: []string{CadenceRequest, CadenceError}, + }, + } { + runTest(t, test, errCase, newService, assertMetrics, fmt.Sprintf("%v_errcase_%v_normal", test.serviceMethod, errCase.err)) + runTest(t, test, errCase, newPromService, assertPromMetrics, fmt.Sprintf("%v_errcase_%v_prom_sanitized", test.serviceMethod, errCase.err)) + } + } } func runTest( t *testing.T, test testCase, + errCase errCase, serviceFunc func(*testing.T) (*workflowservicetest.MockClient, workflowserviceclient.Interface, io.Closer, *CapturingStatsReporter), validationFunc func(*testing.T, *CapturingStatsReporter, string, []string), name string, ) { t.Run(name, func(t *testing.T) { - t.Parallel() - // gomock mutates the returns slice, which leads to different test values between the two runs. - // copy the slice until gomock fixes it: https://github.com/golang/mock/issues/353 - returns := append(make([]interface{}, 0, len(test.mockReturns)), test.mockReturns...) - mockService, wrapperService, closer, reporter := serviceFunc(t) - switch test.serviceMethod { - case "DeprecateDomain": - mockService.EXPECT().DeprecateDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "DescribeDomain": - mockService.EXPECT().DescribeDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "GetWorkflowExecutionHistory": - mockService.EXPECT().GetWorkflowExecutionHistory(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "ListClosedWorkflowExecutions": - mockService.EXPECT().ListClosedWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "ListOpenWorkflowExecutions": - mockService.EXPECT().ListOpenWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "PollForActivityTask": - mockService.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "PollForDecisionTask": - mockService.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RecordActivityTaskHeartbeat": - mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RecordActivityTaskHeartbeatByID": - mockService.EXPECT().RecordActivityTaskHeartbeatByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RegisterDomain": - mockService.EXPECT().RegisterDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RequestCancelWorkflowExecution": - mockService.EXPECT().RequestCancelWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondActivityTaskCanceled": - mockService.EXPECT().RespondActivityTaskCanceled(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondActivityTaskCompleted": - mockService.EXPECT().RespondActivityTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondActivityTaskFailed": - mockService.EXPECT().RespondActivityTaskFailed(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondActivityTaskCanceledByID": - mockService.EXPECT().RespondActivityTaskCanceledByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondActivityTaskCompletedByID": - mockService.EXPECT().RespondActivityTaskCompletedByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondActivityTaskFailedByID": - mockService.EXPECT().RespondActivityTaskFailedByID(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondDecisionTaskCompleted": - mockService.EXPECT().RespondDecisionTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "SignalWorkflowExecution": - mockService.EXPECT().SignalWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "SignalWithStartWorkflowExecution": - mockService.EXPECT().SignalWithStartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "SignalWithStartWorkflowExecutionAsync": - mockService.EXPECT().SignalWithStartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "StartWorkflowExecution": - mockService.EXPECT().StartWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "StartWorkflowExecutionAsync": - mockService.EXPECT().StartWorkflowExecutionAsync(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "TerminateWorkflowExecution": - mockService.EXPECT().TerminateWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "ResetWorkflowExecution": - mockService.EXPECT().ResetWorkflowExecution(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "UpdateDomain": - mockService.EXPECT().UpdateDomain(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "QueryWorkflow": - mockService.EXPECT().QueryWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) - case "RespondQueryTaskCompleted": - mockService.EXPECT().RespondQueryTaskCompleted(gomock.Any(), gomock.Any(), gomock.Any()).Return(returns...) + mockServiceVal := reflect.ValueOf(mockService) + method, exists := mockServiceVal.Type().MethodByName(test.serviceMethod) + require.True(t, exists, "method %s does not exists", test.serviceMethod) + + expecterVals := mockServiceVal.MethodByName("EXPECT").Call(nil) + expectedMethod := expecterVals[0].MethodByName(test.serviceMethod) + mockInputs := make([]reflect.Value, 0, expectedMethod.Type().NumIn()) + for inCounter := 0; inCounter < expectedMethod.Type().NumIn(); inCounter++ { + mockInputs = append(mockInputs, reflect.ValueOf(gomock.Any())) + } + callVals := expectedMethod.Call(mockInputs) + + returnVals := make([]reflect.Value, 0, method.Type.NumOut()) + for i := 0; i < method.Type.NumOut()-1; i++ { + output := method.Type.Out(i) + if errCase.err == nil { + returnVals = append(returnVals, reflect.New(output).Elem()) + } else { + returnVals = append(returnVals, reflect.Zero(output)) + } } + if errCase.err != nil { + returnVals = append(returnVals, reflect.ValueOf(errCase.err)) + } else { + returnVals = append(returnVals, nilError) + } + + callVals[0].MethodByName("Return").Call(returnVals) + callOption := yarpc.CallOption{} inputs := make([]reflect.Value, len(test.callArgs)) for i, arg := range test.callArgs { inputs[i] = reflect.ValueOf(arg) } inputs = append(inputs, reflect.ValueOf(callOption)) - method := reflect.ValueOf(wrapperService).MethodByName(test.serviceMethod) - method.Call(inputs) + actualMethod := reflect.ValueOf(wrapperService).MethodByName(test.serviceMethod) + methodReturnVals := actualMethod.Call(inputs) + err := methodReturnVals[len(methodReturnVals)-1].Interface() + if errCase.err != nil { + assert.ErrorIs(t, err.(error), errCase.err) + } else { + assert.Nil(t, err, "error must be nil") + } require.NoError(t, closer.Close()) - validationFunc(t, reporter, test.serviceMethod, test.expectedCounters) + validationFunc(t, reporter, test.serviceMethod, errCase.expectedCounters) }) } func assertMetrics(t *testing.T, reporter *CapturingStatsReporter, methodName string, counterNames []string) { - require.Equal(t, len(counterNames), len(reporter.counts)) + assert.Equal(t, len(counterNames), len(reporter.counts), "expected %v counters, got %v", counterNames, reporter.counts) for _, name := range counterNames { counterName := CadenceMetricsPrefix + methodName + "." + name find := false @@ -208,14 +191,14 @@ func assertMetrics(t *testing.T, reporter *CapturingStatsReporter, methodName st break } } - require.True(t, find) + assert.True(t, find, "counter %v not found in counters %v", counterName, reporter.counts) } - require.Equal(t, 1, len(reporter.timers)) - require.Equal(t, CadenceMetricsPrefix+methodName+"."+CadenceLatency, reporter.timers[0].name) + assert.Equal(t, 1, len(reporter.timers), "expected 1 timer, got %v", len(reporter.timers)) + assert.Equal(t, CadenceMetricsPrefix+methodName+"."+CadenceLatency, reporter.timers[0].name, "expected timer %v, got %v", CadenceMetricsPrefix+methodName+"."+CadenceLatency, reporter.timers[0].name) } func assertPromMetrics(t *testing.T, reporter *CapturingStatsReporter, methodName string, counterNames []string) { - require.Equal(t, len(counterNames), len(reporter.counts)) + assert.Equal(t, len(counterNames), len(reporter.counts), "expected %v counters, got %v", counterNames, reporter.counts) for _, name := range counterNames { counterName := makePromCompatible(CadenceMetricsPrefix + methodName + "." + name) find := false @@ -226,11 +209,11 @@ func assertPromMetrics(t *testing.T, reporter *CapturingStatsReporter, methodNam break } } - require.True(t, find) + assert.True(t, find, "counter %v not found in counters %v", counterName, reporter.counts) } - require.Equal(t, 1, len(reporter.timers)) + assert.Equal(t, 1, len(reporter.timers), "expected 1 timer, got %v", len(reporter.timers)) expected := makePromCompatible(CadenceMetricsPrefix + methodName + "." + CadenceLatency) - require.Equal(t, expected, reporter.timers[0].name) + assert.Equal(t, expected, reporter.timers[0].name, "expected timer %v, got %v", expected, reporter.timers[0].name) } func makePromCompatible(name string) string { @@ -276,3 +259,5 @@ func newPromScope(isReplay *bool) (tally.Scope, io.Closer, *CapturingStatsReport scope, closer := tally.NewRootScope(opts, time.Second) return WrapScope(isReplay, scope, &realClock{}), closer, reporter } + +var nilError = reflect.Zero(reflect.TypeOf((*error)(nil)).Elem())