diff --git a/common/persistence/dataManagerInterfaces.go b/common/persistence/dataManagerInterfaces.go index ef41ef2d3ed..6a37c8240ec 100644 --- a/common/persistence/dataManagerInterfaces.go +++ b/common/persistence/dataManagerInterfaces.go @@ -594,7 +594,6 @@ type ( TargetDomainID string TargetWorkflowID string TargetRunID string - InitiatedID int64 Version int64 } diff --git a/common/persistence/nosql/nosqlExecutionStoreUtil.go b/common/persistence/nosql/nosqlExecutionStoreUtil.go index 66d2c35fc24..e3714d163c9 100644 --- a/common/persistence/nosql/nosqlExecutionStoreUtil.go +++ b/common/persistence/nosql/nosqlExecutionStoreUtil.go @@ -363,7 +363,6 @@ func (d *nosqlExecutionStore) prepareCrossClusterTasksForWorkflowTxn( if targetRunID == "" { targetRunID = p.CrossClusterTaskDefaultTargetRunID } - scheduleID = task.(*p.CrossClusterRecordChildExecutionCompletedTask).InitiatedID case p.CrossClusterTaskTypeApplyParentClosePolicy: targetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster @@ -495,7 +494,6 @@ func (d *nosqlExecutionStore) prepareTransferTasksForWorkflowTxn( if targetRunID == "" { targetRunID = p.TransferTaskTransferTargetRunID } - scheduleID = task.(*p.RecordChildExecutionCompletedTask).InitiatedID case p.TransferTaskTypeApplyParentClosePolicy: targetDomainIDs = task.(*p.ApplyParentClosePolicyTask).TargetDomainIDs diff --git a/common/persistence/persistence-tests/executionManagerTest.go b/common/persistence/persistence-tests/executionManagerTest.go index 598528fa4fa..c38cc9a87cd 100644 --- a/common/persistence/persistence-tests/executionManagerTest.go +++ b/common/persistence/persistence-tests/executionManagerTest.go @@ -2429,7 +2429,6 @@ func (s *ExecutionManagerSuite) validateCrossClusterTasks( s.Equal(task.TargetDomainID, loadedTaskInfo[index].TargetDomainID) s.Equal(task.TargetWorkflowID, loadedTaskInfo[index].TargetWorkflowID) s.Equal(task.TargetRunID, loadedTaskInfo[index].TargetRunID) - s.Equal(task.InitiatedID, loadedTaskInfo[index].ScheduleID) case *p.CrossClusterApplyParentClosePolicyTask: s.Equal(task.TargetDomainIDs, loadedTaskInfo[index].GetTargetDomainIDs()) default: @@ -2493,7 +2492,7 @@ func (s *ExecutionManagerSuite) TestTransferTasksComplete() { &p.SignalExecutionTask{now, currentTransferID + 10005, targetDomainID, targetWorkflowID, targetRunID, true, scheduleID, 555}, &p.StartChildExecutionTask{now, currentTransferID + 10006, targetDomainID, targetWorkflowID, scheduleID, 666}, &p.RecordWorkflowClosedTask{now, currentTransferID + 10007, 777}, - &p.RecordChildExecutionCompletedTask{now, currentTransferID + 10008, targetDomainID, targetWorkflowID, targetRunID, scheduleID, 888}, + &p.RecordChildExecutionCompletedTask{now, currentTransferID + 10008, targetDomainID, targetWorkflowID, targetRunID, 888}, &p.ApplyParentClosePolicyTask{now, currentTransferID + 10009, map[string]struct{}{targetDomainID: {}}, 999}, } versionHistory := p.NewVersionHistory([]byte{}, []*p.VersionHistoryItem{ diff --git a/common/persistence/sql/sqlExecutionStoreUtil.go b/common/persistence/sql/sqlExecutionStoreUtil.go index 893da224af3..5f12631ded8 100644 --- a/common/persistence/sql/sqlExecutionStoreUtil.go +++ b/common/persistence/sql/sqlExecutionStoreUtil.go @@ -817,7 +817,6 @@ func createCrossClusterTasks( if targetRunID := task.(*p.CrossClusterRecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" { info.TargetRunID = serialization.MustParseUUID(targetRunID) } - info.ScheduleID = task.(*p.CrossClusterRecordChildExecutionCompletedTask).InitiatedID case p.CrossClusterTaskTypeApplyParentClosePolicy: crossClusterTasksRows[i].TargetCluster = task.(*p.CrossClusterApplyParentClosePolicyTask).TargetCluster @@ -932,7 +931,6 @@ func createTransferTasks( if targetRunID := task.(*p.RecordChildExecutionCompletedTask).TargetRunID; targetRunID != "" { info.TargetRunID = serialization.MustParseUUID(targetRunID) } - info.ScheduleID = task.(*p.RecordChildExecutionCompletedTask).InitiatedID case p.TransferTaskTypeApplyParentClosePolicy: for targetDomainID := range task.(*p.ApplyParentClosePolicyTask).TargetDomainIDs { diff --git a/service/history/execution/mutable_state_task_generator.go b/service/history/execution/mutable_state_task_generator.go index 313d591f8e4..2d9aaa2c436 100644 --- a/service/history/execution/mutable_state_task_generator.go +++ b/service/history/execution/mutable_state_task_generator.go @@ -195,7 +195,6 @@ func (r *mutableStateTaskGeneratorImpl) GenerateWorkflowCloseTasks( TargetDomainID: executionInfo.ParentDomainID, TargetWorkflowID: executionInfo.ParentWorkflowID, TargetRunID: executionInfo.ParentRunID, - InitiatedID: executionInfo.InitiatedID, Version: closeEvent.GetVersion(), } @@ -642,7 +641,6 @@ func (r *mutableStateTaskGeneratorImpl) GenerateCrossClusterRecordChildCompleted TargetDomainID: parentInfo.DomainUUID, TargetWorkflowID: parentInfo.GetExecution().GetWorkflowID(), TargetRunID: parentInfo.GetExecution().GetRunID(), - InitiatedID: parentInfo.GetInitiatedID(), Version: task.Version, }, }) @@ -884,7 +882,6 @@ func (r *mutableStateTaskGeneratorImpl) GenerateFromCrossClusterTask( TargetDomainID: task.TargetDomainID, TargetWorkflowID: task.TargetWorkflowID, TargetRunID: task.TargetRunID, - InitiatedID: task.ScheduleID, } if generateTransferTask { newTask = recordChildExecutionCompletedTask diff --git a/service/history/execution/mutable_state_task_generator_test.go b/service/history/execution/mutable_state_task_generator_test.go index f33be54a169..f2a94d68d7a 100644 --- a/service/history/execution/mutable_state_task_generator_test.go +++ b/service/history/execution/mutable_state_task_generator_test.go @@ -222,7 +222,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateWorkflowCloseTasks() { TargetDomainID: constants.TestParentDomainID, TargetWorkflowID: "parent workflowID", TargetRunID: "parent runID", - InitiatedID: 101, Version: version, }, &persistence.ApplyParentClosePolicyTask{ @@ -284,7 +283,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateWorkflowCloseTasks() { TargetDomainID: constants.TestRemoteTargetDomainID, TargetWorkflowID: "parent workflowID", TargetRunID: "parent runID", - InitiatedID: 101, Version: version, }, }, @@ -332,7 +330,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateWorkflowCloseTasks() { TargetDomainID: constants.TestRemoteTargetDomainID, TargetWorkflowID: "parent workflowID", TargetRunID: "parent runID", - InitiatedID: 101, Version: version, }, }, @@ -514,7 +511,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateCrossClusterRecordChildComp TargetDomainID: constants.TestParentDomainID, TargetWorkflowID: constants.TestWorkflowID, TargetRunID: constants.TestRunID, - InitiatedID: 123, Version: 101, }, } @@ -651,7 +647,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateFromCrossClusterTask() { TargetDomainID: constants.TestRemoteTargetDomainID, TargetWorkflowID: constants.TestWorkflowID, TargetRunID: constants.TestRunID, - InitiatedID: int64(123), }, }, }, @@ -670,7 +665,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateFromCrossClusterTask() { TargetDomainID: constants.TestRemoteTargetDomainID, TargetWorkflowID: constants.TestWorkflowID, TargetRunID: constants.TestRunID, - InitiatedID: int64(123), }, }, }, @@ -688,7 +682,6 @@ func (s *mutableStateTaskGeneratorSuite) TestGenerateFromCrossClusterTask() { TargetDomainID: constants.TestTargetDomainID, TargetWorkflowID: constants.TestWorkflowID, TargetRunID: constants.TestRunID, - InitiatedID: int64(123), }, }, }, diff --git a/service/history/task/cross_cluster_source_task_executor_test.go b/service/history/task/cross_cluster_source_task_executor_test.go index 3dda53ee4ec..0ed6885c3c0 100644 --- a/service/history/task/cross_cluster_source_task_executor_test.go +++ b/service/history/task/cross_cluster_source_task_executor_test.go @@ -213,7 +213,9 @@ func getWorkflowCloseTestCases() []struct { expectedTaskState: ctask.TaskStateAcked, willGenerateNewTask: false, }, - // UNEXPECTED ERROR + // UNEXPECTED ERROR for target, + // no error should be returned otherwise task will retry forever, + // task should still in pending state so it can be fetched again { targetError: types.CrossClusterTaskFailedCauseWorkflowAlreadyRunning.Ptr(), // for unexpected errors we return errContinueExecution which is converted to nil diff --git a/service/history/task/cross_cluster_target_task_executor.go b/service/history/task/cross_cluster_target_task_executor.go index 26f4495c2a2..9b97ea99052 100644 --- a/service/history/task/cross_cluster_target_task_executor.go +++ b/service/history/task/cross_cluster_target_task_executor.go @@ -41,7 +41,6 @@ var ( errUnknownTaskProcessingState = errors.New("unknown cross cluster task processing state") errMissingTaskRequestAttributes = errors.New("request attributes not specified") errDomainNotExists = errors.New("domain not exists") - errUnexpectedErrorFromTarget = errors.New("unexpected target error") ) type ( diff --git a/service/history/task/cross_cluster_task.go b/service/history/task/cross_cluster_task.go index 9cf71961bf6..ffc3fdebebf 100644 --- a/service/history/task/cross_cluster_task.go +++ b/service/history/task/cross_cluster_task.go @@ -544,28 +544,28 @@ func (t *crossClusterSourceTask) getRequestForApplyParentPolicy( return nil, t.processingState, err } for _, childInfo := range children { - targetDomainEntry, err := execution.GetChildExecutionDomainEntry(childInfo, t.shard.GetDomainCache(), domainEntry) + // we already filtered the children so that child domainID is in task.TargetDomainIDs + // don't check if child domain is active or not here, + // we need to send the request even if the child domain is not active in target cluster + targetDomainID, err := execution.GetChildExecutionDomainID(childInfo, t.shard.GetDomainCache(), domainEntry) if err != nil { return nil, t.processingState, err } - targetCluster := targetDomainEntry.GetReplicationConfig().ActiveClusterName - if targetCluster == t.targetCluster { - - attributes.Children = append( - attributes.Children, - &types.ApplyParentClosePolicyRequest{ - Child: &types.ApplyParentClosePolicyAttributes{ - ChildDomainID: targetDomainEntry.GetInfo().ID, - ChildWorkflowID: childInfo.StartedWorkflowID, - ChildRunID: childInfo.StartedRunID, - ParentClosePolicy: &childInfo.ParentClosePolicy, - }, - Status: &types.ApplyParentClosePolicyStatus{ - Completed: false, - }, + + attributes.Children = append( + attributes.Children, + &types.ApplyParentClosePolicyRequest{ + Child: &types.ApplyParentClosePolicyAttributes{ + ChildDomainID: targetDomainID, + ChildWorkflowID: childInfo.StartedWorkflowID, + ChildRunID: childInfo.StartedRunID, + ParentClosePolicy: &childInfo.ParentClosePolicy, }, - ) - } + Status: &types.ApplyParentClosePolicyStatus{ + Completed: false, + }, + }, + ) } return attributes, t.processingState, nil } @@ -786,6 +786,8 @@ func (t *crossClusterSourceTask) RecordResponse(response *types.CrossClusterTask case persistence.CrossClusterTaskTypeApplyParentClosePolicy: taskTypeMatch = response.GetTaskType() == types.CrossClusterTaskTypeApplyParentPolicy emptyResponse = response.ApplyParentClosePolicyAttributes == nil + default: + return fmt.Errorf("unknown task type: %v", t.GetTaskType()) } if !taskTypeMatch { @@ -793,7 +795,7 @@ func (t *crossClusterSourceTask) RecordResponse(response *types.CrossClusterTask } if emptyResponse && response.FailedCause == nil { - return errors.New("empty cross cluster task response") + return fmt.Errorf("empty cross cluster task response, task type: %v", t.GetTaskType()) } if response.FailedCause != nil {