From e308b69d70d9a3321b002a21dd6f656e41e68351 Mon Sep 17 00:00:00 2001 From: relee Date: Tue, 5 Nov 2024 20:16:18 +0800 Subject: [PATCH 1/6] fix #16768: add schedule time parameter to sub-workflow instance --- .../subworkflow/SubWorkflowControlClient.java | 10 +- .../subworkflow/SubWorkflowLogicTask.java | 5 +- .../trigger/SubWorkflowManualTrigger.java | 40 -------- .../trigger/SubWorkflowTrigger.java | 98 +++++++++++++++++++ .../trigger/SubWorkflowTriggerRequest.java | 71 ++++++++++++++ .../trigger/SubWorkflowTriggerResponse.java | 47 +++++++++ 6 files changed, 224 insertions(+), 47 deletions(-) delete mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowManualTrigger.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java index 69109f09dcdc..1452b6507344 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowControlClient.java @@ -30,11 +30,11 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksResponse; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; -import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverFailureTaskTrigger; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowInstanceRecoverSuspendTaskTrigger; import org.apache.dolphinscheduler.server.master.exception.MasterTaskExecuteException; -import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowManualTrigger; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowTrigger; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowTriggerRequest; import lombok.extern.slf4j.Slf4j; @@ -49,7 +49,7 @@ public class SubWorkflowControlClient { private WorkflowInstanceDao workflowInstanceDao; @Autowired - private SubWorkflowManualTrigger subWorkflowManualTrigger; + private SubWorkflowTrigger subWorkflowTrigger; @Autowired private WorkflowInstanceRecoverFailureTaskTrigger workflowInstanceRecoverFailureTaskTrigger; @@ -57,8 +57,8 @@ public class SubWorkflowControlClient { @Autowired private WorkflowInstanceRecoverSuspendTaskTrigger workflowInstanceRecoverSuspendTaskTrigger; - public Integer triggerSubWorkflow(final WorkflowManualTriggerRequest workflowManualTriggerRequest) { - return subWorkflowManualTrigger.triggerWorkflow(workflowManualTriggerRequest).getWorkflowInstanceId(); + public Integer triggerSubWorkflow(final SubWorkflowTriggerRequest subWorkflowTriggerRequest) { + return subWorkflowTrigger.triggerWorkflow(subWorkflowTriggerRequest).getWorkflowInstanceId(); } public WorkflowInstanceRecoverFailureTasksResponse triggerFromFailureTasks( diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java index d5f21bfc5398..2a9595f43bf2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java @@ -32,7 +32,6 @@ import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceRecoverSuspendTasksRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopRequest; import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowInstanceStopResponse; -import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.parameters.SubWorkflowParameters; import org.apache.dolphinscheduler.server.master.engine.workflow.runnable.IWorkflowExecutionRunnable; @@ -40,6 +39,7 @@ import org.apache.dolphinscheduler.server.master.runner.execute.AsyncTaskExecuteFunction; import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; import org.apache.dolphinscheduler.server.master.runner.task.BaseAsyncLogicTask; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger.SubWorkflowTriggerRequest; import lombok.extern.slf4j.Slf4j; @@ -183,7 +183,8 @@ private SubWorkflowLogicTaskRuntimeContext triggerNewSubWorkflow() { final ICommandParam commandParam = JSONUtils.parseObject(workflowInstance.getCommandParam(), ICommandParam.class); - final WorkflowManualTriggerRequest workflowManualTriggerRequest = WorkflowManualTriggerRequest.builder() + final SubWorkflowTriggerRequest workflowManualTriggerRequest = SubWorkflowTriggerRequest.builder() + .scheduleTIme(workflowInstance.getScheduleTime()) .userId(taskExecutionContext.getExecutorId()) .workflowDefinitionCode(subWorkflowDefinition.getCode()) .workflowDefinitionVersion(subWorkflowDefinition.getVersion()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowManualTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowManualTrigger.java deleted file mode 100644 index 5484de2128dc..000000000000 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowManualTrigger.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; - -import org.apache.dolphinscheduler.common.enums.Flag; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; -import org.apache.dolphinscheduler.extract.master.transportor.workflow.WorkflowManualTriggerRequest; -import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.WorkflowManualTrigger; - -import org.springframework.stereotype.Component; - -/** - * Manual trigger of the workflow, used to trigger the workflow and generate the workflow instance in the manual way. - */ -@Component -public class SubWorkflowManualTrigger extends WorkflowManualTrigger { - - @Override - protected WorkflowInstance constructWorkflowInstance(final WorkflowManualTriggerRequest workflowManualTriggerRequest) { - final WorkflowInstance workflowInstance = super.constructWorkflowInstance(workflowManualTriggerRequest); - workflowInstance.setIsSubWorkflow(Flag.YES); - return workflowInstance; - } - -} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java new file mode 100644 index 000000000000..20523b48566b --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; + +import org.apache.dolphinscheduler.common.enums.CommandType; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.WarningType; +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.dao.entity.Command; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; +import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; +import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.AbstractWorkflowTrigger; + +import org.apache.commons.lang3.NotImplementedException; +import org.apache.commons.lang3.ObjectUtils; + +import java.util.Date; + +import org.springframework.stereotype.Component; + +/** + * Used to trigger the sub-workflow and generate the workflow instance. + */ +@Component +public class SubWorkflowTrigger + extends + AbstractWorkflowTrigger { + + @Override + protected WorkflowInstance constructWorkflowInstance(final SubWorkflowTriggerRequest subWorkflowTriggerRequest) { + final CommandType commandType = CommandType.START_PROCESS; + final Long workflowCode = subWorkflowTriggerRequest.getWorkflowDefinitionCode(); + final Integer workflowVersion = subWorkflowTriggerRequest.getWorkflowDefinitionVersion(); + final WorkflowDefinition workflowDefinition = getProcessDefinition(workflowCode, workflowVersion); + + final WorkflowInstance workflowInstance = new WorkflowInstance(); + workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode()); + workflowInstance.setWorkflowDefinitionVersion(workflowDefinition.getVersion()); + workflowInstance.setProjectCode(workflowDefinition.getProjectCode()); + workflowInstance.setCommandType(commandType); + workflowInstance.setStateWithDesc(WorkflowExecutionStatus.SUBMITTED_SUCCESS, commandType.name()); + workflowInstance.setRecovery(Flag.NO); + workflowInstance.setScheduleTime(subWorkflowTriggerRequest.getScheduleTIme()); + workflowInstance.setStartTime(new Date()); + workflowInstance.setRestartTime(workflowInstance.getStartTime()); + workflowInstance.setRunTimes(1); + workflowInstance.setName(String.join("-", workflowDefinition.getName(), DateUtils.getCurrentTimeStamp())); + workflowInstance.setTaskDependType(subWorkflowTriggerRequest.getTaskDependType()); + workflowInstance.setFailureStrategy(subWorkflowTriggerRequest.getFailureStrategy()); + workflowInstance.setWarningType( + ObjectUtils.defaultIfNull(subWorkflowTriggerRequest.getWarningType(), WarningType.NONE)); + workflowInstance.setWarningGroupId(subWorkflowTriggerRequest.getWarningGroupId()); + workflowInstance.setExecutorId(subWorkflowTriggerRequest.getUserId()); + workflowInstance.setExecutorName(getExecutorUser(subWorkflowTriggerRequest.getUserId()).getUserName()); + workflowInstance.setTenantCode(subWorkflowTriggerRequest.getTenantCode()); + workflowInstance.setIsSubWorkflow(Flag.YES); + workflowInstance.addHistoryCmd(commandType); + workflowInstance.setWorkflowInstancePriority(subWorkflowTriggerRequest.getWorkflowInstancePriority()); + workflowInstance.setWorkerGroup( + WorkerGroupUtils.getWorkerGroupOrDefault(subWorkflowTriggerRequest.getWorkerGroup())); + workflowInstance.setEnvironmentCode( + EnvironmentUtils.getEnvironmentCodeOrDefault(subWorkflowTriggerRequest.getEnvironmentCode())); + workflowInstance.setTimeout(workflowDefinition.getTimeout()); + workflowInstance.setDryRun(subWorkflowTriggerRequest.getDryRun().getCode()); + workflowInstance.setTestFlag(subWorkflowTriggerRequest.getTestFlag().getCode()); + return workflowInstance; + } + + @Override + protected Command constructTriggerCommand(final SubWorkflowTriggerRequest subWorkflowTriggerRequest, + final WorkflowInstance workflowInstance) { + throw new NotImplementedException(); + } + + @Override + protected SubWorkflowTriggerResponse onTriggerSuccess(WorkflowInstance workflowInstance) { + throw new NotImplementedException(); + } + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java new file mode 100644 index 000000000000..138efa40fa83 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; + +import org.apache.dolphinscheduler.plugin.task.api.model.Property; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SubWorkflowTriggerRequest { + + private Date scheduleTIme; + + private Integer userId; + + private Long workflowDefinitionCode; + + private Integer workflowDefinitionVersion; + + private List startNodes; + + @Builder.Default + private FailureStrategy failureStrategy = FailureStrategy.CONTINUE; + + @Builder.Default + private TaskDependType taskDependType = TaskDependType.TASK_POST; + + @Builder.Default + private WarningType warningType = WarningType.NONE; + + private Integer warningGroupId; + + @Builder.Default + private Priority workflowInstancePriority = Priority.MEDIUM; + + private String workerGroup; + + private String tenantCode; + + private Long environmentCode; + + @Builder.Default + private List startParamList = new ArrayList<>(); + + @Builder.Default + private Flag dryRun = Flag.NO; + + @Builder.Default + private Flag testFlag = Flag.NO; + +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java new file mode 100644 index 000000000000..f8a86a4408cc --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; + +import org.apache.commons.lang3.NotImplementedException; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class SubWorkflowTriggerResponse { + + private boolean success; + + private String message; + + private Integer workflowInstanceId; + + public static SubWorkflowTriggerResponse fail(String message) { + throw new NotImplementedException(); + } + + public static SubWorkflowTriggerResponse success(Integer workflowInstanceId) { + throw new NotImplementedException(); + } + +} From c7432f5a1baf646ae7ef1ada02bd6cd6f9f3377a Mon Sep 17 00:00:00 2001 From: relee Date: Wed, 6 Nov 2024 02:19:39 +0800 Subject: [PATCH 2/6] fix #16768: Add schedule time parameter to sub-workflow instance --- .../trigger/SubWorkflowTrigger.java | 19 ++++++++++++++++--- .../trigger/SubWorkflowTriggerRequest.java | 10 ++++++++++ .../trigger/SubWorkflowTriggerResponse.java | 12 ++++++++---- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java index 20523b48566b..00b592ca7a9d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTrigger.java @@ -22,14 +22,15 @@ import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.Command; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; import org.apache.dolphinscheduler.dao.utils.EnvironmentUtils; import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils; +import org.apache.dolphinscheduler.extract.master.command.RunWorkflowCommandParam; import org.apache.dolphinscheduler.server.master.engine.workflow.trigger.AbstractWorkflowTrigger; -import org.apache.commons.lang3.NotImplementedException; import org.apache.commons.lang3.ObjectUtils; import java.util.Date; @@ -87,12 +88,24 @@ protected WorkflowInstance constructWorkflowInstance(final SubWorkflowTriggerReq @Override protected Command constructTriggerCommand(final SubWorkflowTriggerRequest subWorkflowTriggerRequest, final WorkflowInstance workflowInstance) { - throw new NotImplementedException(); + final RunWorkflowCommandParam runWorkflowCommandParam = RunWorkflowCommandParam.builder() + .commandParams(subWorkflowTriggerRequest.getStartParamList()) + .startNodes(subWorkflowTriggerRequest.getStartNodes()) + .timeZone(DateUtils.getTimezone()) + .build(); + return Command.builder() + .commandType(CommandType.START_PROCESS) + .workflowDefinitionCode(subWorkflowTriggerRequest.getWorkflowDefinitionCode()) + .workflowDefinitionVersion(subWorkflowTriggerRequest.getWorkflowDefinitionVersion()) + .workflowInstanceId(workflowInstance.getId()) + .workflowInstancePriority(workflowInstance.getWorkflowInstancePriority()) + .commandParam(JSONUtils.toJsonString(runWorkflowCommandParam)) + .build(); } @Override protected SubWorkflowTriggerResponse onTriggerSuccess(WorkflowInstance workflowInstance) { - throw new NotImplementedException(); + return SubWorkflowTriggerResponse.success(workflowInstance.getId()); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java index 138efa40fa83..056a643e4ed2 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerRequest.java @@ -17,12 +17,22 @@ package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; +import org.apache.dolphinscheduler.common.enums.FailureStrategy; +import org.apache.dolphinscheduler.common.enums.Flag; +import org.apache.dolphinscheduler.common.enums.Priority; +import org.apache.dolphinscheduler.common.enums.TaskDependType; +import org.apache.dolphinscheduler.common.enums.WarningType; import org.apache.dolphinscheduler.plugin.task.api.model.Property; import java.util.ArrayList; import java.util.Date; import java.util.List; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + @Data @Builder @NoArgsConstructor diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java index f8a86a4408cc..1935ddd75eb8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/trigger/SubWorkflowTriggerResponse.java @@ -17,8 +17,6 @@ package org.apache.dolphinscheduler.server.master.runner.task.subworkflow.trigger; -import org.apache.commons.lang3.NotImplementedException; - import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -37,11 +35,17 @@ public class SubWorkflowTriggerResponse { private Integer workflowInstanceId; public static SubWorkflowTriggerResponse fail(String message) { - throw new NotImplementedException(); + return SubWorkflowTriggerResponse.builder() + .success(false) + .message(message) + .build(); } public static SubWorkflowTriggerResponse success(Integer workflowInstanceId) { - throw new NotImplementedException(); + return SubWorkflowTriggerResponse.builder() + .success(true) + .workflowInstanceId(workflowInstanceId) + .build(); } } From b8f25031db3eacac035ac4e2a8b98f00b24388ca Mon Sep 17 00:00:00 2001 From: relee Date: Tue, 12 Nov 2024 16:18:51 +0800 Subject: [PATCH 3/6] fix dag runs offline task --- .../server/master/engine/graph/WorkflowExecutionGraph.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java index 0c2bf92d108f..79baeaac2c0c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.engine.graph; +import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; @@ -252,7 +253,8 @@ public boolean isTaskExecutionRunnableSkipped(final ITaskExecutionRunnable taskE @Override public boolean isTaskExecutionRunnableForbidden(final ITaskExecutionRunnable taskExecutionRunnable) { - return false; + return taskExecutionRunnable.getTaskDefinition().getFlag() == Flag.NO; +// return false; } /** From 4dc5a006fe61ada2d45b68ff457d251881dc1300 Mon Sep 17 00:00:00 2001 From: relee Date: Tue, 12 Nov 2024 20:08:13 +0800 Subject: [PATCH 4/6] fix sub-workflow takeover part 1. --- .../common/enums/WorkflowExecutionStatus.java | 4 + .../master/ILogicTaskInstanceOperator.java | 10 +-- .../transportor/LogicTaskTakeoverRequest.java | 35 ++++++++ .../LogicTaskTakeoverResponse.java | 42 ++++++++++ .../task/runnable/TaskExecutionRunnable.java | 83 ++++++++++++++----- ...TaskInstanceTakeoverOperationFunction.java | 81 ++++++++++++++++++ ...cTaskInstanceOperationFunctionManager.java | 7 ++ .../rpc/LogicTaskInstanceOperatorImpl.java | 13 +-- .../MasterTaskExecutorThreadPoolManager.java | 14 ++++ 9 files changed, 256 insertions(+), 33 deletions(-) create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java create mode 100644 dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java create mode 100644 dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java index caacc22debee..63bcb70d4391 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/WorkflowExecutionStatus.java @@ -102,6 +102,10 @@ public boolean isFinished() { return isSuccess() || isFailure() || isStop() || isPause(); } + public boolean canFailover() { + return !isFinished() && !(this == SERIAL_WAIT || this == WAIT_TO_RUN); + } + /** * status is success * diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java index f85200b7a2c6..448f49758312 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java @@ -19,12 +19,7 @@ import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.*; @RpcService public interface ILogicTaskInstanceOperator { @@ -38,4 +33,7 @@ public interface ILogicTaskInstanceOperator { @RpcMethod LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest); + @RpcMethod + LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest); + } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java new file mode 100644 index 000000000000..102641b00eec --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +import java.io.Serializable; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class LogicTaskTakeoverRequest implements Serializable { + + private static final long serialVersionUID = -1L; + + private TaskExecutionContext taskExecutionContext; +} diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java new file mode 100644 index 000000000000..62e6a052b2f8 --- /dev/null +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.extract.master.transportor; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class LogicTaskTakeoverResponse { + + private Integer taskInstanceId; + + private boolean success; + + private String message; + + public static LogicTaskTakeoverResponse success(Integer taskInstanceId) { + return new LogicTaskTakeoverResponse(taskInstanceId, true, "takeover success"); + } + + public static LogicTaskTakeoverResponse failed(Integer taskInstanceId, String message) { + return new LogicTaskTakeoverResponse(taskInstanceId, false, message); + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java index 1c364a7e224f..f0422d5ccce8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java @@ -20,16 +20,18 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; -import org.apache.dolphinscheduler.dao.entity.TaskInstance; -import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; -import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.entity.*; +import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceRelationMapper; import org.apache.dolphinscheduler.extract.base.client.Clients; +import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.task.SubWorkflowLogicTaskChannelFactory; import org.apache.dolphinscheduler.plugin.task.api.utils.TaskTypeUtils; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.engine.WorkflowEventBus; @@ -157,29 +159,68 @@ private void initializeTaskExecutionContext() { .createTaskExecutionContext(request); } + private WorkflowInstance getValidSubWorkflowInstance() { + + WorkflowInstanceRelation workflowInstanceRelation = applicationContext.getBean(WorkflowInstanceRelationMapper.class) + .queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId()); + if (workflowInstanceRelation == null || workflowInstanceRelation.getWorkflowInstanceId() == 0) { + return null; + } + + WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class).queryDetailById(workflowInstanceRelation.getWorkflowInstanceId()); + + if (workflowInstance == null || !workflowInstance.getState().canFailover()){ + return null; + } + + return workflowInstance; + } + private boolean takeOverTaskFromExecutor() { checkState(isTaskInstanceInitialized(), "The task instance is null, can't take over from executor."); - if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) { - return false; - } if (StringUtils.isEmpty(taskInstance.getHost())) { log.debug("Task: {} host is empty, cannot take over the task from executor(This is normal case).", taskInstance.getName()); return false; } - try { - final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() - .taskInstanceId(taskInstance.getId()) - .workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress()) - .build(); - final TakeOverTaskResponse takeOverTaskResponse = Clients - .withService(ITaskInstanceOperator.class) - .withHost(taskInstance.getHost()) - .takeOverTask(takeOverTaskRequest); - return takeOverTaskResponse.isSuccess(); - } catch (Exception ex) { - log.warn("Take over task: {} failed", taskInstance.getName(), ex); - return false; + + if (TaskTypeUtils.isLogicTask(taskInstance.getTaskType())) { + if (!taskInstance.getTaskType().equals(SubWorkflowLogicTaskChannelFactory.NAME)) { + return false; + } + final WorkflowInstance subWorkflowInstance = getValidSubWorkflowInstance(); + if (subWorkflowInstance == null) { + return false; + } + try { + final LogicTaskTakeoverRequest takeOverTaskRequest = + new LogicTaskTakeoverRequest(taskExecutionContext); + + final LogicTaskTakeoverResponse takeOverTaskResponse = Clients + .withService(ILogicTaskInstanceOperator.class) + .withHost(taskInstance.getHost()) + .takeoverLogicTask(takeOverTaskRequest); + return takeOverTaskResponse.isSuccess(); + } catch (Exception ex) { + log.warn("Take over logic task: {} failed", taskInstance.getName(), ex); + return false; + } + } + else { + try { + final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() + .taskInstanceId(taskInstance.getId()) + .workflowHost(applicationContext.getBean(MasterConfig.class).getMasterAddress()) + .build(); + final TakeOverTaskResponse takeOverTaskResponse = Clients + .withService(ITaskInstanceOperator.class) + .withHost(taskInstance.getHost()) + .takeOverTask(takeOverTaskRequest); + return takeOverTaskResponse.isSuccess(); + } catch (Exception ex) { + log.warn("Take over task: {} failed", taskInstance.getName(), ex); + return false; + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java new file mode 100644 index 000000000000..62765e859795 --- /dev/null +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.rpc; + +import lombok.extern.slf4j.Slf4j; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; +import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder; +import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager; +import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class LogicITaskInstanceTakeoverOperationFunction + implements + ITaskInstanceOperationFunction { + + @Autowired + private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder; + + @Autowired + private MasterTaskExecutorThreadPoolManager masterTaskExecutorThreadPool; + + @Autowired + private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager; + + @Override + public LogicTaskTakeoverResponse operate(LogicTaskTakeoverRequest taskTakeoverRequest) { + log.info("Received dispatchLogicTask request: {}", taskTakeoverRequest); + TaskExecutionContext taskExecutionContext = taskTakeoverRequest.getTaskExecutionContext(); + try { + final int taskInstanceId = taskExecutionContext.getTaskInstanceId(); + final int workflowInstanceId = taskExecutionContext.getWorkflowInstanceId(); + final String taskInstanceName = taskExecutionContext.getTaskName(); + + taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); + + LogUtils.setWorkflowAndTaskInstanceIDMDC(workflowInstanceId, taskInstanceId); + LogUtils.setTaskInstanceLogFullPathMDC(taskExecutionContext.getLogPath()); + + MasterTaskExecutionContextHolder.putTaskExecutionContext(taskExecutionContext); + + final MasterTaskExecutor masterTaskExecutor = masterTaskExecutorFactoryBuilder + .createMasterTaskExecutorFactory(taskExecutionContext.getTaskType()) + .createMasterTaskExecutor(taskExecutionContext); + + if (masterTaskExecutorThreadPool.takeoverMasterTaskExecutor(masterTaskExecutor)) { + log.info("Takeover LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName); + return LogicTaskTakeoverResponse.success(taskInstanceId); + } else { + log.error("Takeover LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName); + return LogicTaskTakeoverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full"); + } + } finally { + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); + LogUtils.removeTaskInstanceLogFullPathMDC(); + } + } +} diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java index de4095831e9b..89ed27953a82 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java @@ -32,6 +32,9 @@ public class LogicTaskInstanceOperationFunctionManager { @Autowired private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction; + @Autowired + private LogicITaskInstanceTakeoverOperationFunction logicITaskInstanceTakeoverOperationFunction; + public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() { return logicITaskInstanceDispatchOperationFunction; } @@ -44,4 +47,8 @@ public LogicITaskInstancePauseOperationFunction getLogicTaskInstancePauseOperati return logicITaskInstancePauseOperationFunction; } + public LogicITaskInstanceTakeoverOperationFunction getLogicTaskInstanceTakeoverOperationFunction() { + return logicITaskInstanceTakeoverOperationFunction; + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java index 39446bd89d26..61917cba8b9a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java @@ -18,12 +18,7 @@ package org.apache.dolphinscheduler.server.master.rpc; import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.*; import lombok.extern.slf4j.Slf4j; @@ -55,4 +50,10 @@ public LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequ .operate(taskPauseRequest); } + @Override + public LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest) { + return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeoverOperationFunction() + .operate(taskTakeoverRequest); + } + } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java index 9c30d49f8fb2..e4b10cc4a310 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java @@ -51,6 +51,20 @@ public boolean submitMasterTaskExecutor(final MasterTaskExecutor masterTaskExecu throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); } + + public boolean takeoverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { + MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor); +// if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { +// return masterSyncTaskExecutorThreadPool +// .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); +// } + if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) { + return masterAsyncTaskExecutorThreadPool + .submitMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor); + } + throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); + } + public boolean removeMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { return masterSyncTaskExecutorThreadPool From 8e95c9c53fd3bdafbfff5a1eca2aef57e69701ac Mon Sep 17 00:00:00 2001 From: relee Date: Wed, 13 Nov 2024 00:34:45 +0800 Subject: [PATCH 5/6] Fix #16767 --- .../master/ILogicTaskInstanceOperator.java | 3 +-- ...est.java => LogicTaskTakeOverRequest.java} | 9 ++++--- ...se.java => LogicTaskTakeOverResponse.java} | 10 +++---- .../WorkflowFailoverCommandHandler.java | 2 ++ .../task/runnable/TaskExecutionRunnable.java | 26 +++++++++---------- ...askInstanceTakeOverOperationFunction.java} | 25 +++++++++--------- ...cTaskInstanceOperationFunctionManager.java | 6 ++--- .../rpc/LogicTaskInstanceOperatorImpl.java | 7 +++-- .../MasterTaskExecutorThreadPoolManager.java | 16 +++++++----- .../subworkflow/SubWorkflowLogicTask.java | 24 +++++++++++------ 10 files changed, 71 insertions(+), 57 deletions(-) rename dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/{LogicTaskTakeoverRequest.java => LogicTaskTakeOverRequest.java} (94%) rename dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/{LogicTaskTakeoverResponse.java => LogicTaskTakeOverResponse.java} (78%) rename dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/{LogicITaskInstanceTakeoverOperationFunction.java => LogicITaskInstanceTakeOverOperationFunction.java} (82%) diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java index 448f49758312..0d3f6300e102 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java @@ -19,7 +19,6 @@ import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; -import org.apache.dolphinscheduler.extract.master.transportor.*; @RpcService public interface ILogicTaskInstanceOperator { @@ -34,6 +33,6 @@ public interface ILogicTaskInstanceOperator { LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequest); @RpcMethod - LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest); + LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest); } diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java similarity index 94% rename from dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java rename to dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java index 102641b00eec..3381c48f7f4d 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverRequest.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverRequest.java @@ -17,17 +17,18 @@ package org.apache.dolphinscheduler.extract.master.transportor; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import java.io.Serializable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + @Data @NoArgsConstructor @AllArgsConstructor -public class LogicTaskTakeoverRequest implements Serializable { +public class LogicTaskTakeOverRequest implements Serializable { private static final long serialVersionUID = -1L; diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java similarity index 78% rename from dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java rename to dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java index 62e6a052b2f8..e800fb09e5b3 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeoverResponse.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/transportor/LogicTaskTakeOverResponse.java @@ -24,7 +24,7 @@ @Data @NoArgsConstructor @AllArgsConstructor -public class LogicTaskTakeoverResponse { +public class LogicTaskTakeOverResponse { private Integer taskInstanceId; @@ -32,11 +32,11 @@ public class LogicTaskTakeoverResponse { private String message; - public static LogicTaskTakeoverResponse success(Integer taskInstanceId) { - return new LogicTaskTakeoverResponse(taskInstanceId, true, "takeover success"); + public static LogicTaskTakeOverResponse success(Integer taskInstanceId) { + return new LogicTaskTakeOverResponse(taskInstanceId, true, "take over success"); } - public static LogicTaskTakeoverResponse failed(Integer taskInstanceId, String message) { - return new LogicTaskTakeoverResponse(taskInstanceId, false, message); + public static LogicTaskTakeOverResponse failed(Integer taskInstanceId, String message) { + return new LogicTaskTakeOverResponse(taskInstanceId, false, message); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java index 506fcdffb72a..a5999e293659 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/handler/WorkflowFailoverCommandHandler.java @@ -93,6 +93,8 @@ protected void assembleWorkflowInstance( throw new IllegalArgumentException( "The WorkflowFailoverCommandParam: " + command.getCommandParam() + " is invalid"); } + workflowInstance.setCommandType(command.getCommandType()); + workflowInstance.addHistoryCmd(command.getCommandType()); workflowInstance.setState(workflowFailoverCommandParam.getWorkflowExecutionStatus()); workflowInstanceDao.updateById(workflowInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java index f0422d5ccce8..345f33efcda7 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java @@ -20,13 +20,12 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; -import org.apache.dolphinscheduler.dao.entity.*; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceRelationMapper; import org.apache.dolphinscheduler.extract.base.client.Clients; import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; import org.apache.dolphinscheduler.extract.worker.ITaskInstanceOperator; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TakeOverTaskResponse; @@ -161,15 +160,17 @@ private void initializeTaskExecutionContext() { private WorkflowInstance getValidSubWorkflowInstance() { - WorkflowInstanceRelation workflowInstanceRelation = applicationContext.getBean(WorkflowInstanceRelationMapper.class) - .queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId()); + WorkflowInstanceRelation workflowInstanceRelation = + applicationContext.getBean(WorkflowInstanceRelationMapper.class) + .queryByParentId(taskInstance.getWorkflowInstanceId(), taskInstance.getId()); if (workflowInstanceRelation == null || workflowInstanceRelation.getWorkflowInstanceId() == 0) { return null; } - WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class).queryDetailById(workflowInstanceRelation.getWorkflowInstanceId()); + WorkflowInstance workflowInstance = applicationContext.getBean(WorkflowInstanceMapper.class) + .queryDetailById(workflowInstanceRelation.getWorkflowInstanceId()); - if (workflowInstance == null || !workflowInstance.getState().canFailover()){ + if (workflowInstance == null || !workflowInstance.getState().canFailover()) { return null; } @@ -193,20 +194,19 @@ private boolean takeOverTaskFromExecutor() { return false; } try { - final LogicTaskTakeoverRequest takeOverTaskRequest = - new LogicTaskTakeoverRequest(taskExecutionContext); + final LogicTaskTakeOverRequest takeOverTaskRequest = + new LogicTaskTakeOverRequest(taskExecutionContext); - final LogicTaskTakeoverResponse takeOverTaskResponse = Clients + final LogicTaskTakeOverResponse takeOverTaskResponse = Clients .withService(ILogicTaskInstanceOperator.class) .withHost(taskInstance.getHost()) - .takeoverLogicTask(takeOverTaskRequest); + .takeOverLogicTask(takeOverTaskRequest); return takeOverTaskResponse.isSuccess(); } catch (Exception ex) { log.warn("Take over logic task: {} failed", taskInstance.getName(), ex); return false; } - } - else { + } else { try { final TakeOverTaskRequest takeOverTaskRequest = TakeOverTaskRequest.builder() .taskInstanceId(taskInstance.getId()) diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java similarity index 82% rename from dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java rename to dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java index 62765e859795..35432d54ab24 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeoverOperationFunction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicITaskInstanceTakeOverOperationFunction.java @@ -17,25 +17,26 @@ package org.apache.dolphinscheduler.server.master.rpc; -import lombok.extern.slf4j.Slf4j; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverRequest; -import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeoverResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutionContextHolder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutor; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorFactoryBuilder; import org.apache.dolphinscheduler.server.master.runner.execute.MasterTaskExecutorThreadPoolManager; import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; + +import lombok.extern.slf4j.Slf4j; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Slf4j @Component -public class LogicITaskInstanceTakeoverOperationFunction +public class LogicITaskInstanceTakeOverOperationFunction implements - ITaskInstanceOperationFunction { + ITaskInstanceOperationFunction { @Autowired private MasterTaskExecutorFactoryBuilder masterTaskExecutorFactoryBuilder; @@ -47,7 +48,7 @@ public class LogicITaskInstanceTakeoverOperationFunction private LogicTaskInstanceExecutionEventSenderManager logicTaskInstanceExecutionEventSenderManager; @Override - public LogicTaskTakeoverResponse operate(LogicTaskTakeoverRequest taskTakeoverRequest) { + public LogicTaskTakeOverResponse operate(LogicTaskTakeOverRequest taskTakeoverRequest) { log.info("Received dispatchLogicTask request: {}", taskTakeoverRequest); TaskExecutionContext taskExecutionContext = taskTakeoverRequest.getTaskExecutionContext(); try { @@ -66,12 +67,12 @@ public LogicTaskTakeoverResponse operate(LogicTaskTakeoverRequest taskTakeoverRe .createMasterTaskExecutorFactory(taskExecutionContext.getTaskType()) .createMasterTaskExecutor(taskExecutionContext); - if (masterTaskExecutorThreadPool.takeoverMasterTaskExecutor(masterTaskExecutor)) { - log.info("Takeover LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName); - return LogicTaskTakeoverResponse.success(taskInstanceId); + if (masterTaskExecutorThreadPool.takeOverMasterTaskExecutor(masterTaskExecutor)) { + log.info("Take over LogicTask: {} to MasterTaskExecutorThreadPool success", taskInstanceName); + return LogicTaskTakeOverResponse.success(taskInstanceId); } else { - log.error("Takeover LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName); - return LogicTaskTakeoverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full"); + log.error("Take over LogicTask: {} to MasterTaskExecutorThreadPool failed", taskInstanceName); + return LogicTaskTakeOverResponse.failed(taskInstanceId, "MasterTaskExecutorThreadPool is full"); } } finally { LogUtils.removeWorkflowAndTaskInstanceIdMDC(); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java index 89ed27953a82..fe5e8a1704dc 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperationFunctionManager.java @@ -33,7 +33,7 @@ public class LogicTaskInstanceOperationFunctionManager { private LogicITaskInstancePauseOperationFunction logicITaskInstancePauseOperationFunction; @Autowired - private LogicITaskInstanceTakeoverOperationFunction logicITaskInstanceTakeoverOperationFunction; + private LogicITaskInstanceTakeOverOperationFunction logicITaskInstanceTakeOverOperationFunction; public LogicITaskInstanceDispatchOperationFunction getLogicTaskInstanceDispatchOperationFunction() { return logicITaskInstanceDispatchOperationFunction; @@ -47,8 +47,8 @@ public LogicITaskInstancePauseOperationFunction getLogicTaskInstancePauseOperati return logicITaskInstancePauseOperationFunction; } - public LogicITaskInstanceTakeoverOperationFunction getLogicTaskInstanceTakeoverOperationFunction() { - return logicITaskInstanceTakeoverOperationFunction; + public LogicITaskInstanceTakeOverOperationFunction getLogicTaskInstanceTakeOverOperationFunction() { + return logicITaskInstanceTakeOverOperationFunction; } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java index 61917cba8b9a..faa65d6223ce 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java @@ -18,7 +18,6 @@ package org.apache.dolphinscheduler.server.master.rpc; import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; -import org.apache.dolphinscheduler.extract.master.transportor.*; import lombok.extern.slf4j.Slf4j; @@ -51,9 +50,9 @@ public LogicTaskPauseResponse pauseLogicTask(LogicTaskPauseRequest taskPauseRequ } @Override - public LogicTaskTakeoverResponse takeoverLogicTask(LogicTaskTakeoverRequest taskTakeoverRequest) { - return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeoverOperationFunction() - .operate(taskTakeoverRequest); + public LogicTaskTakeOverResponse takeOverLogicTask(LogicTaskTakeOverRequest taskTakeOverRequest) { + return logicTaskInstanceOperationFunctionManager.getLogicTaskInstanceTakeOverOperationFunction() + .operate(taskTakeOverRequest); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java index e4b10cc4a310..5c7f04d319a4 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/MasterTaskExecutorThreadPoolManager.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.server.master.runner.execute; import org.apache.dolphinscheduler.server.master.runner.message.LogicTaskInstanceExecutionEventSenderManager; +import org.apache.dolphinscheduler.server.master.runner.task.subworkflow.SubWorkflowLogicTask; import lombok.extern.slf4j.Slf4j; @@ -51,13 +52,16 @@ public boolean submitMasterTaskExecutor(final MasterTaskExecutor masterTaskExecu throw new IllegalArgumentException("Unknown type of MasterTaskExecutor: " + masterTaskExecutor); } - - public boolean takeoverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { + public boolean takeOverMasterTaskExecutor(final MasterTaskExecutor masterTaskExecutor) { + if (!(masterTaskExecutor.getLogicTask() instanceof SubWorkflowLogicTask)) { + throw new IllegalArgumentException( + "Only SubWorkflowLogicTask can be take over: " + masterTaskExecutor.getLogicTask()); + } MasterTaskExecutorHolder.putMasterTaskExecuteRunnable(masterTaskExecutor); -// if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { -// return masterSyncTaskExecutorThreadPool -// .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); -// } + if (masterTaskExecutor instanceof SyncMasterTaskExecutor) { + return masterSyncTaskExecutorThreadPool + .submitMasterTaskExecutor((SyncMasterTaskExecutor) masterTaskExecutor); + } if (masterTaskExecutor instanceof AsyncMasterTaskExecutor) { return masterAsyncTaskExecutorThreadPool .submitMasterTaskExecutor((AsyncMasterTaskExecutor) masterTaskExecutor); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java index 2a9595f43bf2..6e923ccbcc2c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/subworkflow/SubWorkflowLogicTask.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.server.master.runner.task.subworkflow; +import org.apache.dolphinscheduler.common.enums.CommandType; import org.apache.dolphinscheduler.common.enums.Flag; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; @@ -71,14 +72,21 @@ public SubWorkflowLogicTask(final TaskExecutionContext taskExecutionContext, @Override public AsyncTaskExecuteFunction getAsyncTaskExecuteFunction() { - subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance(); - upsertSubWorkflowRelation(); - taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext)); - - applicationContext - .getBean(LogicTaskInstanceExecutionEventSenderManager.class) - .runningEventSender() - .sendMessage(taskExecutionContext); + // if the sub-workflow instance is already exists and in fail over process, + // there should take over the task without create a new sub-workflow instance + if (subWorkflowLogicTaskRuntimeContext == null + || workflowExecutionRunnable.getWorkflowInstance() + .getCommandType() != CommandType.RECOVER_TOLERANCE_FAULT_PROCESS) { + + subWorkflowLogicTaskRuntimeContext = initializeSubWorkflowInstance(); + upsertSubWorkflowRelation(); + taskExecutionContext.setAppIds(JSONUtils.toJsonString(subWorkflowLogicTaskRuntimeContext)); + + applicationContext + .getBean(LogicTaskInstanceExecutionEventSenderManager.class) + .runningEventSender() + .sendMessage(taskExecutionContext); + } return new SubWorkflowAsyncTaskExecuteFunction( subWorkflowLogicTaskRuntimeContext, From 40a3d13cce098e8f2de3d44b5f65ca3e24ea881f Mon Sep 17 00:00:00 2001 From: relee Date: Wed, 13 Nov 2024 01:19:37 +0800 Subject: [PATCH 6/6] Fix #16767 --- .../extract/master/ILogicTaskInstanceOperator.java | 8 ++++++++ .../engine/task/runnable/TaskExecutionRunnable.java | 6 ++++++ .../server/master/rpc/LogicTaskInstanceOperatorImpl.java | 8 ++++++++ 3 files changed, 22 insertions(+) diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java index 0d3f6300e102..79eec5e61ed1 100644 --- a/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java +++ b/dolphinscheduler-extract/dolphinscheduler-extract-master/src/main/java/org/apache/dolphinscheduler/extract/master/ILogicTaskInstanceOperator.java @@ -19,6 +19,14 @@ import org.apache.dolphinscheduler.extract.base.RpcMethod; import org.apache.dolphinscheduler.extract.base.RpcService; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; @RpcService public interface ILogicTaskInstanceOperator { diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java index 345f33efcda7..a05f58f1cd33 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java @@ -20,6 +20,12 @@ import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import org.apache.dolphinscheduler.dao.entity.Project; +import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; +import org.apache.dolphinscheduler.dao.entity.WorkflowInstanceRelation; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceMapper; import org.apache.dolphinscheduler.dao.mapper.WorkflowInstanceRelationMapper; import org.apache.dolphinscheduler.extract.base.client.Clients; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java index faa65d6223ce..8ecf353040f6 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/LogicTaskInstanceOperatorImpl.java @@ -18,6 +18,14 @@ package org.apache.dolphinscheduler.server.master.rpc; import org.apache.dolphinscheduler.extract.master.ILogicTaskInstanceOperator; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskDispatchResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskKillResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskPauseResponse; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverRequest; +import org.apache.dolphinscheduler.extract.master.transportor.LogicTaskTakeOverResponse; import lombok.extern.slf4j.Slf4j;