From 3b93639bdce29b49ae12d5f90633a9637927ffaf Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 16 Jan 2025 14:42:37 -0800 Subject: [PATCH 1/3] populate task for empty polls as well --- internal/internal_poller_autoscaler.go | 4 +- internal/internal_task_handlers.go | 2 +- internal/internal_task_pollers.go | 6 +- internal/internal_task_pollers_test.go | 115 +++++++++++++++++++++++++ internal/internal_utils.go | 8 ++ 5 files changed, 129 insertions(+), 6 deletions(-) diff --git a/internal/internal_poller_autoscaler.go b/internal/internal_poller_autoscaler.go index 2dc81e7ba..139a6efd3 100644 --- a/internal/internal_poller_autoscaler.go +++ b/internal/internal_poller_autoscaler.go @@ -174,9 +174,9 @@ func (m *pollerUsageEstimator) CollectUsage(data interface{}) error { func isTaskEmpty(task interface{}) (bool, error) { switch t := task.(type) { case *workflowTask: - return t == nil || t.task == nil, nil + return t == nil || isEmptyDecisionTask(t.task), nil case *activityTask: - return t == nil || t.task == nil, nil + return t == nil || isEmptyActivityTask(t.task), nil case *localActivityTask: return t == nil || t.workflowTask == nil, nil default: diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index a1b206f26..d4407e407 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -744,7 +744,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask( workflowTask *workflowTask, heartbeatFunc decisionHeartbeatFunc, ) (completeRequest interface{}, errRet error) { - if workflowTask == nil || workflowTask.task == nil { + if workflowTask == nil || isEmptyDecisionTask(workflowTask.task) { return nil, errors.New("nil workflow task provided") } task := workflowTask.task diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index a69b8765d..e2e32e49a 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -848,7 +848,7 @@ func (wtp *workflowTaskPoller) poll(ctx context.Context) (interface{}, error) { if response == nil || len(response.TaskToken) == 0 { wtp.metricsScope.Counter(metrics.DecisionPollNoTaskCounter).Inc(1) wtp.updateBacklog(request.TaskList.GetKind(), 0) - return &workflowTask{}, nil + return &workflowTask{task: response}, nil } wtp.updateBacklog(request.TaskList.GetKind(), response.GetBacklogCountHint()) @@ -1095,7 +1095,7 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (*s.PollForActivityTask } if response == nil || len(response.TaskToken) == 0 { atp.metricsScope.Counter(metrics.ActivityPollNoTaskCounter).Inc(1) - return nil, startTime, nil + return response, startTime, nil } return response, startTime, err @@ -1116,7 +1116,7 @@ func (atp *activityTaskPoller) pollWithMetrics(ctx context.Context, return nil, err } if response == nil || len(response.TaskToken) == 0 { - return &activityTask{}, nil + return &activityTask{task: response}, nil } workflowType := response.WorkflowType.GetName() diff --git a/internal/internal_task_pollers_test.go b/internal/internal_task_pollers_test.go index ed0a4e779..041553e12 100644 --- a/internal/internal_task_pollers_test.go +++ b/internal/internal_task_pollers_test.go @@ -61,6 +61,104 @@ func Test_newWorkflowTaskPoller(t *testing.T) { }) } +func TestWorkflowTaskPoller(t *testing.T) { + t.Run("PollTask", func(t *testing.T) { + task := &s.PollForDecisionTaskResponse{ + TaskToken: []byte("some value"), + AutoConfigHint: &s.AutoConfigHint{ + common.PtrOf(true), + common.PtrOf(int64(1000)), + }, + } + emptyTask := &s.PollForDecisionTaskResponse{ + TaskToken: nil, + AutoConfigHint: &s.AutoConfigHint{ + common.PtrOf(true), + common.PtrOf(int64(1000)), + }, + } + for _, tt := range []struct { + name string + response *s.PollForDecisionTaskResponse + expected *workflowTask + }{ + { + "success with task", + task, + &workflowTask{ + task: task, + }, + }, + { + "success with empty task", + emptyTask, + &workflowTask{ + task: emptyTask, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + poller, client, _, _ := buildWorkflowTaskPoller(t) + client.EXPECT().PollForDecisionTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.response, nil) + result, err := poller.PollTask() + assert.NoError(t, err) + resultTask, ok := result.(*workflowTask) + assert.True(t, ok) + assert.Equal(t, tt.expected.task, resultTask.task) + }) + } + }) +} + +func TestActivityTaskPoller(t *testing.T) { + t.Run("PollTask", func(t *testing.T) { + task := &s.PollForActivityTaskResponse{ + TaskToken: []byte("some value"), + AutoConfigHint: &s.AutoConfigHint{ + common.PtrOf(true), + common.PtrOf(int64(1000)), + }, + } + emptyTask := &s.PollForActivityTaskResponse{ + TaskToken: nil, + AutoConfigHint: &s.AutoConfigHint{ + common.PtrOf(true), + common.PtrOf(int64(1000)), + }, + } + for _, tt := range []struct { + name string + response *s.PollForActivityTaskResponse + expected *activityTask + }{ + { + "success with task", + task, + &activityTask{ + task: task, + }, + }, + { + "success with empty task", + emptyTask, + &activityTask{ + task: emptyTask, + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + poller, client := buildActivityTaskPoller(t) + client.EXPECT().PollForActivityTask(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.response, nil) + result, err := poller.PollTask() + assert.NoError(t, err) + resultTask, ok := result.(*activityTask) + assert.True(t, ok) + assert.Equal(t, tt.expected.task, resultTask.task) + }) + } + }) +} + func TestLocalActivityPanic(t *testing.T) { // regression: panics in local activities should not terminate the process s := WorkflowTestSuite{logger: testlogger.NewZap(t)} @@ -213,3 +311,20 @@ func buildWorkflowTaskPoller(t *testing.T) (*workflowTaskPoller, *workflowservic featureFlags: FeatureFlags{}, }, mockService, taskHandler, lda } + +func buildActivityTaskPoller(t *testing.T) (*activityTaskPoller, *workflowservicetest.MockClient) { + ctrl := gomock.NewController(t) + mockService := workflowservicetest.NewMockClient(ctrl) + return &activityTaskPoller{ + basePoller: basePoller{ + shutdownC: make(<-chan struct{}), + }, + domain: _testDomainName, + taskListName: _testTaskList, + identity: _testIdentity, + service: mockService, + metricsScope: &metrics.TaggedScope{Scope: tally.NewTestScope("test", nil)}, + logger: testlogger.NewZap(t), + featureFlags: FeatureFlags{}, + }, mockService +} diff --git a/internal/internal_utils.go b/internal/internal_utils.go index f3a3695aa..8bb6a5a6f 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -492,3 +492,11 @@ func getLengthOfStringPointer(s *string) int { } return len(*s) } + +func isEmptyDecisionTask(r *s.PollForDecisionTaskResponse) bool { + return r == nil || len(r.TaskToken) == 0 +} + +func isEmptyActivityTask(r *s.PollForActivityTaskResponse) bool { + return r == nil || len(r.TaskToken) == 0 +} From 5f75c34a33d65b7fc545041c37b8b5083718ebde Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 16 Jan 2025 15:28:36 -0800 Subject: [PATCH 2/3] fix test case and revert task handler change --- internal/internal_poller_autoscaler_test.go | 4 ++-- internal/internal_task_handlers.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/internal_poller_autoscaler_test.go b/internal/internal_poller_autoscaler_test.go index 4a441b642..53cfb790d 100644 --- a/internal/internal_poller_autoscaler_test.go +++ b/internal/internal_poller_autoscaler_test.go @@ -278,10 +278,10 @@ type unrelatedPolledTask struct{} func generateRandomPollResults(noTaskPoll, taskPoll, unrelated int) <-chan interface{} { var result []interface{} for i := 0; i < noTaskPoll; i++ { - result = append(result, &activityTask{}) + result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}}) } for i := 0; i < taskPoll; i++ { - result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{}}) + result = append(result, &activityTask{task: &s.PollForActivityTaskResponse{TaskToken: []byte("some value")}}) } for i := 0; i < unrelated; i++ { result = append(result, &unrelatedPolledTask{}) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index d4407e407..a1b206f26 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -744,7 +744,7 @@ func (wth *workflowTaskHandlerImpl) ProcessWorkflowTask( workflowTask *workflowTask, heartbeatFunc decisionHeartbeatFunc, ) (completeRequest interface{}, errRet error) { - if workflowTask == nil || isEmptyDecisionTask(workflowTask.task) { + if workflowTask == nil || workflowTask.task == nil { return nil, errors.New("nil workflow task provided") } task := workflowTask.task From f17dce39c362e7aa28e33d3eb1577f131e827687 Mon Sep 17 00:00:00 2001 From: Shijie Sheng Date: Thu, 16 Jan 2025 15:32:19 -0800 Subject: [PATCH 3/3] simply change --- internal/internal_poller_autoscaler.go | 4 ++-- internal/internal_utils.go | 8 -------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/internal/internal_poller_autoscaler.go b/internal/internal_poller_autoscaler.go index 139a6efd3..c8dfb299a 100644 --- a/internal/internal_poller_autoscaler.go +++ b/internal/internal_poller_autoscaler.go @@ -174,9 +174,9 @@ func (m *pollerUsageEstimator) CollectUsage(data interface{}) error { func isTaskEmpty(task interface{}) (bool, error) { switch t := task.(type) { case *workflowTask: - return t == nil || isEmptyDecisionTask(t.task), nil + return t == nil || t.task == nil || len(t.task.TaskToken) == 0, nil case *activityTask: - return t == nil || isEmptyActivityTask(t.task), nil + return t == nil || t.task == nil || len(t.task.TaskToken) == 0, nil case *localActivityTask: return t == nil || t.workflowTask == nil, nil default: diff --git a/internal/internal_utils.go b/internal/internal_utils.go index 8bb6a5a6f..f3a3695aa 100644 --- a/internal/internal_utils.go +++ b/internal/internal_utils.go @@ -492,11 +492,3 @@ func getLengthOfStringPointer(s *string) int { } return len(*s) } - -func isEmptyDecisionTask(r *s.PollForDecisionTaskResponse) bool { - return r == nil || len(r.TaskToken) == 0 -} - -func isEmptyActivityTask(r *s.PollForActivityTaskResponse) bool { - return r == nil || len(r.TaskToken) == 0 -}